Lesson 5.3: Real-Time Ingestion from ML Systems
Streaming Writes Pattern
CREATE TABLE model_events (
id BIGSERIAL,
model_id TEXT NOT NULL,
event_type TEXT NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
CREATE TABLE model_events_2024_01_01_00 PARTITION OF model_events
FOR VALUES FROM ('2024-01-01 00:00') TO ('2024-01-01 01:00');
INSERT INTO model_events (model_id, event_type, data)
VALUES ('gpt-4', 'prediction', '{"latency_ms": 150}');
Buffer in Application
class EventBuffer:
def __init__(self, max_size=1000, max_wait=1.0):
self.buffer = []
self.max_size = max_size
self.max_wait = max_wait
self.last_flush = time.time()
def add_event(self, event):
self.buffer.append(event)
if len(self.buffer) >= self.max_size or (time.time() - self.last_flush) > self.max_wait:
self.flush()
def flush(self):
if not self.buffer:
return
db.executemany("INSERT INTO model_events VALUES (%s, %s, %s)", self.buffer)
self.buffer = []
self.last_flush = time.time()
Managing Write Throughput
Connection pooling:
Async writes (don't block inference):
import asyncio
async def log_prediction(prediction):
await asyncio.create_task(db.execute_async("INSERT INTO predictions ..."))
result = model.predict(input)
asyncio.create_task(log_prediction(result))
return result
Key Takeaways
- Partition time-series tables by hour or day for ML event logs
- Buffer events in application memory before batch inserting
- Use connection pooling (PgBouncer) to handle high connection counts
- Async writes prevent logging from blocking ML inference
- Batch writes reduce database round-trips
- Real-time ingestion requires balancing latency vs throughput