Implementing Real-Time Data Processing in Python
Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing.
0 likes
10 views
Rule Content
# Title: Implementing Real-Time Data Processing in Python # Description: Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing. # Category: Python Cursor Rules ## 1. Utilize Generators for Memory Efficiency - **Rule**: Use generator functions to process large data streams without loading the entire dataset into memory. - **Example**: def read_large_file(file_path): with open(file_path, 'r') as file: for line in file: yield line.strip() ``` ## 2. Implement Micro-Batching - **Rule**: Process data in small batches to balance real-time processing needs with system performance. - **Example**: def process_data_in_batches(data_stream, batch_size=100): batch = [] for data in data_stream: batch.append(data) if len(batch) == batch_size: process_batch(batch) batch = [] if batch: process_batch(batch) ``` ## 3. Optimize Data Serialization - **Rule**: Choose efficient serialization formats like Avro or Protobuf to minimize latency in data transmission. - **Example**: import avro.schema from avro.io import DatumWriter, DatumReader from io import BytesIO schema = avro.schema.parse(open("schema.avsc", "r").read()) def serialize_data(data): writer = DatumWriter(schema) bytes_writer = BytesIO() encoder = avro.io.BinaryEncoder(bytes_writer) writer.write(data, encoder) return bytes_writer.getvalue() ``` ## 4. Leverage Parallel Processing - **Rule**: Use Python's `concurrent.futures` or `multiprocessing` modules to parallelize data processing tasks. - **Example**: from concurrent.futures import ProcessPoolExecutor def process_data_chunk(chunk): # Processing logic here return result with ProcessPoolExecutor() as executor: results = list(executor.map(process_data_chunk, data_chunks)) ``` ## 5. Implement Fault Tolerance - **Rule**: Design data processing operations to be idempotent and use checkpointing to recover from failures. - **Example**: def process_message(message): try: # Processing logic except Exception as e: log_error(e) # Retry logic or move to dead-letter queue ``` ## 6. Monitor System Performance - **Rule**: Regularly monitor system metrics to identify and address performance bottlenecks. - **Example**: import psutil def monitor_system(): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent # Log or alert based on metrics ``` ## 7. Use Efficient Data Storage Formats - **Rule**: Store data in columnar formats like Parquet to enhance read and write performance. - **Example**: import pandas as pd df = pd.DataFrame(data) df.to_parquet('data.parquet', engine='pyarrow') ``` ## 8. Ensure Data Lineage Documentation - **Rule**: Maintain detailed records of data flow and transformations for troubleshooting and compliance. - **Example**: { "data_lineage": { "source": "sensor_stream", "transformation": "aggregation", "destination": "real_time_dashboard", "timestamp": "2025-06-03T19:41:58Z" } } ```