mit einem Klick
kafka-messaging-architect
// Design Kafka architectures with exactly-once semantics, Kafka Streams, ksqlDB, Schema Registry (Avro/Protobuf), performance tuning, and KRaft.
// Design Kafka architectures with exactly-once semantics, Kafka Streams, ksqlDB, Schema Registry (Avro/Protobuf), performance tuning, and KRaft.
Analyzes and optimizes frontend performance using Core Web Vitals, bundle analysis, lazy loading, image optimization, and caching strategies
Design RESTful APIs with OpenAPI 3.1/3.2, resource modeling, HTTP semantics, versioning, pagination, HATEOAS, and OWASP API Security.
Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows.
Validate WCAG 2.2 compliance (A/AA/AAA) with ARIA, color contrast, keyboard navigation, screen readers, and automated testing via axe-core/Pa11y.
Design RabbitMQ architectures with exchanges, quorum queues, routing patterns, clustering, dead letter exchanges, and AMQP best practices.
Configure Prometheus with alerting, recording rules, service discovery (K8s, Consul, EC2), federation, PromQL optimization, and Alertmanager.
| name | Kafka Messaging Architect |
| slug | messaging-kafka-architect |
| description | Design Kafka architectures with exactly-once semantics, Kafka Streams, ksqlDB, Schema Registry (Avro/Protobuf), performance tuning, and KRaft. |
| capabilities | ["Kafka topology design (topics, partitions, replication factor, KRaft vs ZooKeeper)","Producer patterns (idempotent, transactional, batching, compression, exactly-once)","Consumer patterns (consumer groups, offset management, manual commit, exactly-once)","Kafka Streams and ksqlDB (stateful processing, windowing, joins, aggregations)","Schema Registry integration (Avro, Protobuf, JSON Schema evolution)","Performance tuning (batch.size, linger.ms, buffer.memory, fetch.min.bytes, compression)","High availability and disaster recovery (replication, MirrorMaker 2, multi-DC)","Security (SASL, TLS, ACLs, encryption at rest)","Monitoring and observability (JMX metrics, lag monitoring, consumer group health)","Event-driven patterns (event sourcing, CQRS, saga, outbox pattern)"] |
| inputs | {"use_case":{"type":"string","description":"Use case (event streaming, log aggregation, messaging, CDC, stream processing)","required":true},"throughput":{"type":"object","description":"Expected throughput (messages/sec, MB/sec, peak vs average)","required":true},"data_schema":{"type":"object","description":"Message schema (Avro, Protobuf, JSON) and evolution requirements","required":false},"durability_requirements":{"type":"string","description":"Durability level (at-most-once, at-least-once, exactly-once)","required":false},"deployment_env":{"type":"string","description":"Deployment environment (on-prem, cloud, managed service)","required":false}} |
| outputs | {"kafka_topology":{"type":"object","description":"Topic design, partitions, replication factor, retention policy"},"producer_config":{"type":"object","description":"Producer settings (idempotence, compression, batching, acks)"},"consumer_config":{"type":"object","description":"Consumer settings (group.id, auto.offset.reset, isolation.level)"},"schema_registry_config":{"type":"object","description":"Schema Registry setup with Avro/Protobuf schemas"},"performance_tuning":{"type":"object","description":"Tuned settings for throughput and latency targets"},"monitoring_setup":{"type":"object","description":"JMX metrics, lag alerts, consumer group health checks"}} |
| keywords | ["kafka","messaging","event-streaming","kafka-streams","ksqldb","schema-registry","avro","protobuf","exactly-once","consumer-groups","kraft","performance-tuning","event-driven"] |
| version | 1.0.0 |
| owner | cognitive-toolworks |
| license | Apache-2.0 |
| security | {"secrets":"Use SASL/SCRAM or mTLS for authentication; store credentials in secrets manager","compliance":"Encryption in transit (TLS), encryption at rest, audit logging for sensitive data"} |
| links | [{"title":"Apache Kafka 3.9 Release Notes","url":"https://www.confluent.io/blog/introducing-apache-kafka-3-9/","accessed":"2025-10-26"},{"title":"Kafka Exactly-Once Semantics","url":"https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/","accessed":"2025-10-26"},{"title":"Confluent Schema Registry","url":"https://docs.confluent.io/platform/current/schema-registry/index.html","accessed":"2025-10-26"},{"title":"Kafka Performance Tuning","url":"https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html","accessed":"2025-10-26"},{"title":"Kafka Streams Documentation","url":"https://kafka.apache.org/documentation/streams/","accessed":"2025-10-26"}] |
Purpose: Design Apache Kafka 3.9 architectures for event streaming, messaging, and stream processing with producer/consumer patterns (idempotent, transactional, exactly-once semantics), Kafka Streams/ksqlDB for stateful processing, Schema Registry (Avro, Protobuf) for schema evolution, performance tuning (batch.size, linger.ms, compression), and KRaft deployment (ZooKeeper deprecated in 3.9, final version with ZooKeeper support).
When to Use:
Complements:
integration-messagequeue-designer: Covers RabbitMQ, SQS, Service Bus; this focuses on Kafka-specific patterns.data-pipeline-designer: Uses Kafka as data transport; this designs Kafka topology and tuning.observability-stack-configurator: Monitors Kafka with Prometheus JMX exporter, Grafana dashboards.Mandatory Inputs:
use_case: At least one use case (event streaming, messaging, CDC, stream processing).throughput: Expected throughput (messages/sec or MB/sec).Validation Steps:
Goal: Design basic Kafka topology with topic, partition count, replication factor, and producer/consumer configs for standard messaging use case.
Steps:
orders (single topic for simplicity).<domain>.<entity>.<event-type> (e.g., ecommerce.orders.created).acks=all # Wait for all replicas (strongest durability)
retries=Integer.MAX_VALUE # Retry indefinitely on transient errors
enable.idempotence=true # Prevent duplicates (requires acks=all)
compression.type=lz4 # LZ4 is fastest; snappy, gzip also available
group.id=order-processor # Consumer group for parallel processing
enable.auto.commit=false # Manual commit for at-least-once guarantees
auto.offset.reset=earliest # Start from beginning if no offset stored
Token Budget: ≤2k tokens (single topic, basic config).
Goal: Design Kafka architecture with idempotent producers, transactional writes, exactly-once consumers, and Schema Registry for Avro/Protobuf.
Steps:
enable.idempotence=true (requires acks=all, retries > 0, max.in.flight.requests.per.connection ≤ 5).Properties props = new Properties();
props.put("transactional.id", "order-producer-1"); // Unique per producer instance
props.put("enable.idempotence", true);
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", order.getId(), order));
producer.send(new ProducerRecord<>("inventory", order.getProductId(), inventoryUpdate));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Properties props = new Properties();
props.put("isolation.level", "read_committed"); // Only read committed transactions
props.put("enable.auto.commit", false); // Manual offset commit
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
// Read-process-write pattern (consume → process → produce)
KafkaProducer<String, OrderResult> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
for (ConsumerRecord<String, Order> record : records) {
OrderResult result = processOrder(record.value());
producer.send(new ProducerRecord<>("order-results", result.getId(), result));
}
producer.sendOffsetsToTransaction(getOffsets(records), consumer.groupMetadata());
producer.commitTransaction(); // Atomically commit processed results + consumer offsets
}
CREATE TABLE processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMP
);
-- Consumer logic:
if (db.exists("SELECT 1 FROM processed_messages WHERE message_id = ?", record.key())) {
continue; // Skip duplicate
}
processMessage(record.value());
db.insert("INSERT INTO processed_messages (message_id, processed_at) VALUES (?, NOW())", record.key());
consumer.commitSync(); // Commit offset after DB insert
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
Schema schema = new Schema.Parser().parse(new File("order.avsc"));
GenericRecord order = new GenericData.Record(schema);
order.put("orderId", "123");
order.put("amount", 99.99);
producer.send(new ProducerRecord<>("orders", order));
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://schema-registry:8081");
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value();
String orderId = order.get("orderId").toString();
Double amount = (Double) order.get("amount");
}
batch.size=16384 # Default 16KB; increase to 32KB-64KB for higher throughput
linger.ms=10 # Wait 10ms to batch messages (trade latency for throughput)
compression.type=lz4 # LZ4 fastest; snappy good balance; gzip highest compression
buffer.memory=33554432 # 32MB buffer; increase if many partitions
max.in.flight.requests.per.connection=5 # Max for idempotent producer
fetch.min.bytes=1 # Minimum data to fetch (increase to 10KB-50KB for higher throughput)
fetch.max.wait.ms=500 # Max wait for fetch.min.bytes (trade latency for throughput)
max.partition.fetch.bytes=1048576 # 1MB per partition per fetch
max.poll.records=500 # Records per poll() call
Token Budget: ≤6k tokens (exactly-once patterns, Schema Registry, performance tuning).
Goal: Design stream processing application with Kafka Streams or ksqlDB, multi-DC replication, monitoring, and event-driven patterns (event sourcing, CQRS, saga).
Steps:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
// Stateless: filter, map
KStream<String, Order> largeOrders = orders.filter((key, order) -> order.getAmount() > 100);
// Stateful: aggregation with windowing
KTable<Windowed<String>, Long> orderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Join streams
KStream<String, OrderResult> enrichedOrders = orders.join(
users, // KTable<String, User>
(order, user) -> new OrderResult(order, user),
Joined.with(Serdes.String(), orderSerde, userSerde)
);
-- Create stream from topic
CREATE STREAM orders (orderId VARCHAR, userId VARCHAR, amount DOUBLE)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
-- Continuous query: filter and transform
CREATE STREAM large_orders AS
SELECT orderId, userId, amount
FROM orders
WHERE amount > 100
EMIT CHANGES;
-- Aggregation with windowing
CREATE TABLE order_counts AS
SELECT userId, COUNT(*) as order_count
FROM orders
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY userId
EMIT CHANGES;
-- Stream-table join
CREATE STREAM enriched_orders AS
SELECT o.orderId, o.amount, u.userName, u.email
FROM orders o
LEFT JOIN users u ON o.userId = u.userId
EMIT CHANGES;
-- ksqlDB auto-retrieves schema from Schema Registry
CREATE STREAM orders WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
-- No need to define columns; inferred from Avro schema
account-events (append-only, no deletion).BEGIN TRANSACTION;
INSERT INTO orders (order_id, amount) VALUES ('123', 99.99);
INSERT INTO outbox (event_type, payload) VALUES ('OrderCreated', '{"orderId":"123","amount":99.99}');
COMMIT;
-- Debezium CDC connector reads outbox, publishes to Kafka, deletes outbox row
clusters = primary, secondary
primary.bootstrap.servers = primary-kafka:9092
secondary.bootstrap.servers = secondary-kafka:9092
primary->secondary.enabled = true
primary->secondary.topics = orders.*, users.*
replication.factor = 3
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group order-processor
# Check LAG column; lag > 1000 = consumer falling behind
process.roles=broker,controller # Combined mode (or separate broker/controller nodes)
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
Token Budget: ≤12k tokens (Streams, ksqlDB, event patterns, monitoring, KRaft).
Ambiguity Resolution:
durability_requirements not specified:
Stop Conditions:
Thresholds:
Required Fields:
{
kafka_topology: {
topics: Array<{
name: string; // orders, users, payments
partitions: number; // 10, 30, 100
replication_factor: number; // 3 (production), 1 (dev)
retention_ms: number; // 604800000 (7 days)
cleanup_policy: "delete" | "compact" | "compact,delete";
min_insync_replicas: number; // replication_factor - 1
}>;
kafka_version: string; // 3.9.1
deployment_mode: "kraft" | "zookeeper";
};
producer_config: {
acks: "all" | "1" | "0";
retries: number;
enable_idempotence: boolean;
transactional_id?: string; // For exactly-once multi-partition
compression_type: "none" | "gzip" | "snappy" | "lz4" | "zstd";
batch_size: number; // bytes
linger_ms: number; // milliseconds
buffer_memory: number; // bytes
max_in_flight_requests: number; // ≤5 for idempotent
};
consumer_config: {
group_id: string;
enable_auto_commit: boolean;
auto_offset_reset: "earliest" | "latest" | "none";
isolation_level: "read_uncommitted" | "read_committed";
fetch_min_bytes: number;
fetch_max_wait_ms: number;
max_poll_records: number;
};
schema_registry_config?: {
url: string;
schemas: Array<{
subject: string; // orders-value, users-value
format: "avro" | "protobuf" | "json";
schema: string; // Avro JSON or Protobuf definition
evolution: "backward" | "forward" | "full" | "none";
}>;
};
performance_tuning: {
producer_throughput_mbps: number;
consumer_throughput_mbps: number;
latency_p99_ms: number;
recommendations: Array<string>;
};
monitoring_setup: {
jmx_metrics: Array<string>; // kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
lag_alert_threshold: number; // 1000
dashboards: Array<{
name: string; // Kafka Overview, Consumer Lag
metrics: string[];
}>;
};
}
Optional Fields:
kafka_streams_app: Kafka Streams application code (Java/Scala).ksqldb_queries: ksqlDB CREATE STREAM/TABLE statements.event_driven_patterns: Event sourcing, CQRS, saga implementations.mirrormaker2_config: Multi-DC replication setup.security_config: SASL, TLS, ACLs configuration.Format: YAML or JSON for configs, Java/SQL for application code.
Input:
use_case: "event streaming + messaging"
throughput: {messages_per_sec: 10000, peak_mbps: 50}
durability_requirements: "exactly-once"
data_schema: {format: "avro"}
Output (T2 Summary):
Kafka Topology:
Topics:
- orders: 30 partitions, replication=3, retention=7d, min.insync.replicas=2
- inventory: 30 partitions, replication=3, retention=7d
Kafka Version: 3.9.1 (KRaft mode, no ZooKeeper)
Producer Config (Exactly-Once):
enable.idempotence=true, transactional.id=order-producer-1, acks=all
compression.type=lz4, batch.size=32KB, linger.ms=10ms
Consumer Config (Exactly-Once):
group.id=order-processor, isolation.level=read_committed
enable.auto.commit=false (manual commit in transaction)
Schema Registry:
Format: Avro, Subject: orders-value
Evolution: Backward compatible (add fields with defaults)
Performance:
Producer: 50 MB/sec, Latency p99: 15ms
Consumer: 50 MB/sec, Lag: <100 messages
Pattern: Read-process-write (consume orders → update inventory → produce confirmation)
Link to Full Example: See skills/messaging-kafka-architect/examples/ecommerce-exactly-once.txt
Token Budget Compliance:
Validation Checklist:
Safety & Auditability:
Determinism:
Official Documentation:
Performance Tuning:
Stream Processing:
Complementary Skills:
integration-messagequeue-designer: Covers RabbitMQ, SQS, Service Bus patterns.data-pipeline-designer: Uses Kafka for data transport in ETL/ELT pipelines.observability-stack-configurator: Monitors Kafka with Prometheus JMX exporter.