Lesson 5.2: Handling Massive Writes
Bulk Loading Strategies
Slow: Row-by-row inserts:
for row in data:
db.execute("INSERT INTO embeddings VALUES (%s, %s, %s)", row)
Fast: COPY command:
import csv
import io
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
for row in data:
writer.writerow(row)
csv_buffer.seek(0)
cursor.copy_expert("COPY embeddings FROM STDIN WITH CSV", csv_buffer)
Faster: DROP indexes, then rebuild:
BEGIN;
DROP INDEX embeddings_vector_idx;
DROP INDEX embeddings_document_id_idx;
COPY embeddings FROM '/path/to/data.csv' WITH CSV;
CREATE INDEX embeddings_vector_idx ON embeddings USING hnsw(vector vector_cosine_ops);
CREATE INDEX embeddings_document_id_idx ON embeddings(document_id);
COMMIT;
Write Optimization Techniques
1. Batch commits:
for row in data:
db.execute("INSERT ...")
db.commit()
db.execute("BEGIN")
for i, row in enumerate(data):
db.execute("INSERT ...")
if i % 1000 == 0:
db.commit()
db.execute("BEGIN")
db.commit()
2. UNLOGGED tables (temporary data):
CREATE UNLOGGED TABLE temp_import (
id BIGINT,
vector VECTOR(1536)
);
COPY temp_import FROM '/data.csv';
INSERT INTO embeddings SELECT * FROM temp_import;
3. Parallel writes (multiple connections):
from concurrent.futures import ThreadPoolExecutor
def insert_chunk(chunk):
conn = get_db_connection()
conn.execute("COPY embeddings FROM STDIN", chunk)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(insert_chunk, data_chunks)
Key Takeaways
- COPY is 500x faster than row-by-row INSERT for bulk loads
- Drop indexes before bulk inserts, rebuild after
- Batch commits reduce WAL overhead
- UNLOGGED tables skip durability for temporary staging data
- Parallel writes use multiple connections for throughput
- ML embedding generation often requires bulk loading millions of vectors