Implementing Real-Time Data Processing in Python

Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing.

0 likes
10 views

Rule Content

# Title: Implementing Real-Time Data Processing in Python
# Description: Strategies for writing Python code that handles real-time data streams efficiently, focusing on low-latency processing.
# Category: Python Cursor Rules

## 1. Utilize Generators for Memory Efficiency
- **Rule**: Use generator functions to process large data streams without loading the entire dataset into memory.
- **Example**:
  
  def read_large_file(file_path):
      with open(file_path, 'r') as file:
          for line in file:
              yield line.strip()
  ```

## 2. Implement Micro-Batching
- **Rule**: Process data in small batches to balance real-time processing needs with system performance.
- **Example**:
  
  def process_data_in_batches(data_stream, batch_size=100):
      batch = []
      for data in data_stream:
          batch.append(data)
          if len(batch) == batch_size:
              process_batch(batch)
              batch = []
      if batch:
          process_batch(batch)
  ```

## 3. Optimize Data Serialization
- **Rule**: Choose efficient serialization formats like Avro or Protobuf to minimize latency in data transmission.
- **Example**:
  
  import avro.schema
  from avro.io import DatumWriter, DatumReader
  from io import BytesIO

  schema = avro.schema.parse(open("schema.avsc", "r").read())

  def serialize_data(data):
      writer = DatumWriter(schema)
      bytes_writer = BytesIO()
      encoder = avro.io.BinaryEncoder(bytes_writer)
      writer.write(data, encoder)
      return bytes_writer.getvalue()
  ```

## 4. Leverage Parallel Processing
- **Rule**: Use Python's `concurrent.futures` or `multiprocessing` modules to parallelize data processing tasks.
- **Example**:
  
  from concurrent.futures import ProcessPoolExecutor

  def process_data_chunk(chunk):
      # Processing logic here
      return result

  with ProcessPoolExecutor() as executor:
      results = list(executor.map(process_data_chunk, data_chunks))
  ```

## 5. Implement Fault Tolerance
- **Rule**: Design data processing operations to be idempotent and use checkpointing to recover from failures.
- **Example**:
  
  def process_message(message):
      try:
          # Processing logic
      except Exception as e:
          log_error(e)
          # Retry logic or move to dead-letter queue
  ```

## 6. Monitor System Performance
- **Rule**: Regularly monitor system metrics to identify and address performance bottlenecks.
- **Example**:
  
  import psutil

  def monitor_system():
      cpu_usage = psutil.cpu_percent()
      memory_usage = psutil.virtual_memory().percent
      # Log or alert based on metrics
  ```

## 7. Use Efficient Data Storage Formats
- **Rule**: Store data in columnar formats like Parquet to enhance read and write performance.
- **Example**:
  
  import pandas as pd

  df = pd.DataFrame(data)
  df.to_parquet('data.parquet', engine='pyarrow')
  ```

## 8. Ensure Data Lineage Documentation
- **Rule**: Maintain detailed records of data flow and transformations for troubleshooting and compliance.
- **Example**:
  
  {
    "data_lineage": {
      "source": "sensor_stream",
      "transformation": "aggregation",
      "destination": "real_time_dashboard",
      "timestamp": "2025-06-03T19:41:58Z"
    }
  }
  ```