// Design event-driven architectures using Kafka, RabbitMQ, event sourcing, CQRS, and saga patterns. Activates when users need help with event-driven design, message queues, event sourcing, or asynchronous communication patterns.
| name | event-driven-architect |
| description | Design event-driven architectures using Kafka, RabbitMQ, event sourcing, CQRS, and saga patterns. Activates when users need help with event-driven design, message queues, event sourcing, or asynchronous communication patterns. |
| allowed-tools | ["Read","Write","Edit","Bash","Grep","Glob"] |
Design robust event-driven architectures for scalable, loosely-coupled microservices.
Store all changes as events, reconstruct state by replaying events.
Separate read and write models for better scalability.
Manage distributed transactions across microservices.
Process continuous streams of events in real-time.
# Kafka topic configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: order-events
spec:
partitions: 10
replicas: 3
config:
retention.ms: 604800000 # 7 days
compression.type: snappy
max.message.bytes: 1048576
---
# Event schema (Avro)
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "items", "type": {"type": "array", "items": "OrderItem"}},
{"name": "totalAmount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
// Event store
class OrderEventStore {
async appendEvent(event: DomainEvent): Promise<void> {
await this.eventStore.append({
aggregateId: event.aggregateId,
eventType: event.constructor.name,
eventData: JSON.stringify(event),
timestamp: new Date(),
version: event.version
});
// Publish to event bus
await this.eventBus.publish(event);
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const events = await this.eventStore.find({
aggregateId,
orderBy: { version: 'asc' }
});
return events.map(e => this.deserialize(e));
}
reconstructAggregate(events: DomainEvent[]): Order {
const order = new Order();
events.forEach(event => order.apply(event));
return order;
}
}
// Domain events
class OrderCreatedEvent {
constructor(
public orderId: string,
public customerId: string,
public items: OrderItem[],
public totalAmount: number
) {}
}
class OrderConfirmedEvent {
constructor(public orderId: string) {}
}
// Aggregate
class Order {
private id: string;
private status: OrderStatus;
private items: OrderItem[];
apply(event: DomainEvent) {
if (event instanceof OrderCreatedEvent) {
this.id = event.orderId;
this.status = 'pending';
this.items = event.items;
} else if (event instanceof OrderConfirmedEvent) {
this.status = 'confirmed';
}
}
}
// Command side (write model)
class CreateOrderCommand {
constructor(
public customerId: string,
public items: OrderItem[]
) {}
}
class OrderCommandHandler {
async handle(command: CreateOrderCommand): Promise<string> {
// Validate
this.validateCommand(command);
// Create event
const orderId = uuid();
const event = new OrderCreatedEvent(
orderId,
command.customerId,
command.items,
this.calculateTotal(command.items)
);
// Store event
await this.eventStore.appendEvent(event);
return orderId;
}
}
// Query side (read model)
class OrderQueryModel {
async getOrderById(orderId: string): Promise<OrderDTO> {
// Read from optimized read database (e.g., MongoDB)
return await this.orderReadRepo.findById(orderId);
}
async getOrdersByCustomer(customerId: string): Promise<OrderDTO[]> {
return await this.orderReadRepo.find({ customerId });
}
}
// Projection (updates read model from events)
class OrderProjection {
@EventHandler(OrderCreatedEvent)
async onOrderCreated(event: OrderCreatedEvent) {
await this.orderReadRepo.create({
id: event.orderId,
customerId: event.customerId,
items: event.items,
totalAmount: event.totalAmount,
status: 'pending',
createdAt: new Date()
});
}
@EventHandler(OrderConfirmedEvent)
async onOrderConfirmed(event: OrderConfirmedEvent) {
await this.orderReadRepo.update(event.orderId, {
status: 'confirmed'
});
}
}
// Saga orchestrator for order creation
class OrderCreationSaga {
async execute(orderId: string) {
try {
// Step 1: Reserve inventory
await this.inventoryService.reserve(orderId);
await this.sagaStore.recordStep(orderId, 'inventory_reserved');
// Step 2: Process payment
await this.paymentService.charge(orderId);
await this.sagaStore.recordStep(orderId, 'payment_processed');
// Step 3: Confirm order
await this.orderService.confirm(orderId);
await this.sagaStore.recordStep(orderId, 'order_confirmed');
// Success
await this.sagaStore.complete(orderId);
} catch (error) {
// Compensate (rollback)
await this.compensate(orderId);
throw new SagaFailedError(orderId, error);
}
}
private async compensate(orderId: string) {
const steps = await this.sagaStore.getCompletedSteps(orderId);
// Rollback in reverse order
if (steps.includes('payment_processed')) {
await this.paymentService.refund(orderId);
}
if (steps.includes('inventory_reserved')) {
await this.inventoryService.release(orderId);
}
await this.orderService.cancel(orderId);
}
}
microservices-orchestrator - Service designdistributed-tracing-setup - Event tracingchaos-engineering-setup - Resilience testing