com um clique
batch-processing
// Collect-then-batch pattern for database operations achieving 30-40% throughput improvement. Includes graceful fallback to sequential processing when batch operations fail.
// Collect-then-batch pattern for database operations achieving 30-40% throughput improvement. Includes graceful fallback to sequential processing when batch operations fail.
Two-phase commit matchmaking that verifies both player connections before creating a match. Handles disconnections gracefully with automatic re-queue of healthy players.
Implement robust background job processing with dead letter queues, retries, and state machines. Use when building async workflows, scheduled tasks, or any work that shouldn't block the request/response cycle.
Manage data flow when producers outpace consumers. Bounded buffers, adaptive flushing, and graceful degradation prevent OOM crashes and data loss.
Implement multi-layer caching with Redis, in-memory, and HTTP caching. Covers cache invalidation, stampede prevention, and cache-aside patterns.
Exactly-once processing semantics with distributed coordination for file-based data pipelines. Atomic file claiming, status tracking, and automatic retry with in-memory fallback.
Cloud storage integration with signed URLs, visibility control, multi-tenant path conventions, and presigned uploads for direct client uploads.
| name | batch-processing |
| description | Collect-then-batch pattern for database operations achieving 30-40% throughput improvement. Includes graceful fallback to sequential processing when batch operations fail. |
| license | MIT |
| compatibility | TypeScript/JavaScript, Python |
| metadata | {"category":"data-access","time":"4h","source":"drift-masterguide"} |
30-40% throughput improvement by batching database operations with graceful fallback.
Sequential processing is slow because each item requires multiple DB round trips. The solution is to collect all data first, then execute batch operations:
Sequential (slow):
Item 1 → DB → DB → DB
Item 2 → DB → DB → DB
Item 3 → DB → DB → DB
Batched (fast):
Item 1 → collect
Item 2 → collect
Item 3 → collect
All items → BATCH INSERT
Key insight: Sequential mapping (fuzzy matching needs context), but batched writes (independent operations).
from decimal import Decimal
from typing import Dict, List
import time
class BatchProcessor:
"""
Batch-optimized processor with fallback
"""
def process_batch(self, items: List[Dict], user_id: str) -> Dict:
start_time = time.perf_counter()
# Collectors for batch operations
transactions_to_create = []
inventory_updates = {}
failed_items = []
items_processed = 0
# Step 1: Process mappings sequentially (context-dependent)
for idx, item in enumerate(items, 1):
try:
# Business logic that needs context
mapping = self.find_or_create_mapping(item)
# Collect for batch insert
transactions_to_create.append({
"user_id": user_id,
"item_id": mapping['item_id'],
"quantity": float(item['quantity']),
"unit_cost": float(item['unit_price']),
})
# Aggregate inventory updates by item
item_id = mapping['item_id']
if item_id not in inventory_updates:
inventory_updates[item_id] = Decimal('0')
inventory_updates[item_id] += Decimal(str(item['quantity']))
items_processed += 1
except Exception as e:
failed_items.append({
"line": idx,
"error": str(e)
})
continue
# Step 2: BATCH INSERT transactions
if transactions_to_create:
try:
self.client.table("transactions").insert(
transactions_to_create
).execute()
except Exception as e:
# CRITICAL: Fallback to sequential on batch failure
return self._fallback_to_sequential(items, user_id)
# Step 3: BATCH UPDATE inventory (aggregate first)
if inventory_updates:
self._batch_update_inventory(inventory_updates)
return {
"status": "partial_success" if failed_items else "success",
"items_processed": items_processed,
"items_failed": len(failed_items),
"failed_items": failed_items or None,
"processing_time_seconds": round(time.perf_counter() - start_time, 2)
}
def _batch_update_inventory(self, updates: Dict[str, Decimal]):
"""Batch query, individual updates (Supabase limitation)"""
item_ids = list(updates.keys())
# Get current quantities in one query
current = self.client.table("inventory").select(
"id, quantity"
).in_("id", item_ids).execute()
# Apply updates
for item in current.data:
item_id = item['id']
new_qty = Decimal(str(item['quantity'])) + updates[item_id]
self.client.table("inventory").update({
"quantity": float(new_qty)
}).eq("id", item_id).execute()
def _fallback_to_sequential(self, items: List[Dict], user_id: str) -> Dict:
"""Fallback ensures data integrity when batch fails"""
logger.warning("Falling back to sequential processing")
# Process one at a time
for item in items:
self.process_single(item, user_id)
interface BatchResult {
status: 'success' | 'partial_success' | 'failed';
itemsProcessed: number;
itemsFailed: number;
failedItems?: { line: number; error: string }[];
processingTimeMs: number;
}
class BatchProcessor {
async processBatch(items: Item[], userId: string): Promise<BatchResult> {
const startTime = Date.now();
const transactionsToCreate: Transaction[] = [];
const inventoryUpdates = new Map<string, number>();
const failedItems: { line: number; error: string }[] = [];
let itemsProcessed = 0;
// Step 1: Process mappings sequentially
for (let idx = 0; idx < items.length; idx++) {
try {
const mapping = await this.findOrCreateMapping(items[idx]);
transactionsToCreate.push({
userId,
itemId: mapping.itemId,
quantity: items[idx].quantity,
unitCost: items[idx].unitPrice,
});
// Aggregate updates
const current = inventoryUpdates.get(mapping.itemId) || 0;
inventoryUpdates.set(mapping.itemId, current + items[idx].quantity);
itemsProcessed++;
} catch (error) {
failedItems.push({ line: idx + 1, error: error.message });
}
}
// Step 2: Batch insert
if (transactionsToCreate.length > 0) {
try {
await this.db.transactions.insertMany(transactionsToCreate);
} catch (error) {
return this.fallbackToSequential(items, userId);
}
}
// Step 3: Batch update inventory
await this.batchUpdateInventory(inventoryUpdates);
return {
status: failedItems.length > 0 ? 'partial_success' : 'success',
itemsProcessed,
itemsFailed: failedItems.length,
failedItems: failedItems.length > 0 ? failedItems : undefined,
processingTimeMs: Date.now() - startTime,
};
}
}
processor = BatchProcessor()
result = processor.process_batch(
items=invoice_data['line_items'],
user_id=user_id
)
if result['status'] == 'partial_success':
logger.warning(f"Some items failed: {result['failed_items']}")