Implementing Real-Time Data Processing in Python
Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing.
0 likes
131 views
Rule Content
# Title: Implementing Real-Time Data Processing in Python
# Description: Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing.
# Category: Python Cursor Rules
## 1. Utilize Generators for Memory Efficiency
- **Rule**: Use generator functions to process large data streams without loading the entire dataset into memory.
- **Example**:
def read_large_file(file_path):
with open(file_path, 'r') as file:
for line in file:
yield line.strip()
```
## 2. Implement Micro-Batching
- **Rule**: Process data in small batches to balance real-time processing needs with system performance.
- **Example**:
def process_data_in_batches(data_stream, batch_size=100):
batch = []
for data in data_stream:
batch.append(data)
if len(batch) == batch_size:
process_batch(batch)
batch = []
if batch:
process_batch(batch)
```
## 3. Optimize Data Serialization
- **Rule**: Choose efficient serialization formats like Avro or Protobuf to minimize latency in data transmission.
- **Example**:
import avro.schema
from avro.io import DatumWriter, DatumReader
from io import BytesIO
schema = avro.schema.parse(open("schema.avsc", "r").read())
def serialize_data(data):
writer = DatumWriter(schema)
bytes_writer = BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(data, encoder)
return bytes_writer.getvalue()
```
## 4. Leverage Parallel Processing
- **Rule**: Use Python's `concurrent.futures` or `multiprocessing` modules to parallelize data processing tasks.
- **Example**:
from concurrent.futures import ProcessPoolExecutor
def process_data_chunk(chunk):
# Processing logic here
return result
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_data_chunk, data_chunks))
```
## 5. Implement Fault Tolerance
- **Rule**: Design data processing operations to be idempotent and use checkpointing to recover from failures.
- **Example**:
def process_message(message):
try:
# Processing logic
except Exception as e:
log_error(e)
# Retry logic or move to dead-letter queue
```
## 6. Monitor System Performance
- **Rule**: Regularly monitor system metrics to identify and address performance bottlenecks.
- **Example**:
import psutil
def monitor_system():
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
# Log or alert based on metrics
```
## 7. Use Efficient Data Storage Formats
- **Rule**: Store data in columnar formats like Parquet to enhance read and write performance.
- **Example**:
import pandas as pd
df = pd.DataFrame(data)
df.to_parquet('data.parquet', engine='pyarrow')
```
## 8. Ensure Data Lineage Documentation
- **Rule**: Maintain detailed records of data flow and transformations for troubleshooting and compliance.
- **Example**:
{
"data_lineage": {
"source": "sensor_stream",
"transformation": "aggregation",
"destination": "real_time_dashboard",
"timestamp": "2025-06-03T19:41:58Z"
}
}
```