| name | rabbitmq-messaging |
| description | RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem |
| license | MIT |
| compatibility | opencode |
| metadata | {"messaging":"rabbitmq","protocols":"streams,amqp","version":"4.1.4"} |
What I Do
Provide guidance for RabbitMQ Streams and AMQP messaging patterns used in the crypto-scout ecosystem for real-time data flow.
Messaging Architecture
Topology Overview
┌─────────────────────────────────────────────────────────────┐
│ crypto-scout-exchange │
│ (direct type) │
└──────────────┬──────────────────────────────┬───────────────┘
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ bybit-stream │ │crypto-scout-stream
│ (Stream) │ │ (Stream) │
└────────┬────────┘ └────────┬────────┘
│ │
│ Payload<Map<String,Object>> │
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ crypto-scout │ │ crypto-scout │
│ -client │ │ -collector │
│ (Publisher) │ │ (Consumer) │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ crypto-scout │
│ -analyst │
│ (Consumer) │
└─────────────────┘
AMQP Queues
| Queue | Purpose | Arguments |
|---|
collector-queue | Command/control messages | lazy mode, TTL 6h, max 2500 |
chatbot-queue | Chatbot notifications | lazy mode, TTL 6h, max 2500 |
dlx-queue | Dead letter handling | lazy mode, TTL 7d |
Streams
| Stream | Purpose | Retention |
|---|
bybit-stream | Bybit market data | 1 day, 2GB max |
crypto-scout-stream | CMC/parser data | 1 day, 2GB max |
Streams Protocol (Port 5552)
Publisher Implementation (crypto-scout-client)
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Producer bybitStream;
private volatile Producer cryptoScoutStream;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = Environment.builder()
.host("crypto-scout-mq")
.port(5552)
.username("crypto_scout_mq")
.password("password")
.build();
bybitStream = environment.producerBuilder()
.name("bybit-stream")
.stream("bybit-stream")
.build();
cryptoScoutStream = environment.producerBuilder()
.name("crypto-scout-stream")
.stream("crypto-scout-stream")
.build();
});
}
public Promise<Void> publish(final Payload<Map<String, Object>> payload) {
final var producer = getProducer(payload.getProvider());
final var settablePromise = new SettablePromise<Void>();
final var message = producer.messageBuilder()
.addData(JsonUtils.object2Bytes(payload))
.build();
producer.send(message, status -> {
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
return settablePromise;
}
private Producer getProducer(final Provider provider) {
return switch (provider) {
case BYBIT -> bybitStream;
case CMC -> cryptoScoutStream;
};
}
}
Consumer Implementation with Offset Management (crypto-scout-collector)
public final class StreamService extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Consumer bybitConsumer;
private volatile Consumer cryptoScoutConsumer;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = AmqpConfig.getEnvironment();
bybitConsumer = environment.consumerBuilder()
.stream("bybit-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleBybitMessage)
.build();
cryptoScoutConsumer = environment.consumerBuilder()
.stream("crypto-scout-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleCryptoScoutMessage)
.build();
});
}
private void updateOffset(final SubscriptionContext context) {
final var streamName = context.stream();
final var savedOffset = offsetRepository.getOffset(streamName);
if (savedOffset.isPresent()) {
context.offsetSpecification(
OffsetSpecification.offset(savedOffset.getAsLong() + 1)
);
} else {
context.offsetSpecification(OffsetSpecification.first());
}
}
private void handleBybitMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
bybitStreamService.save(payload, context.offset());
}
private void handleCryptoScoutMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
cryptoScoutService.save(payload, context.offset());
}
}
Stream Transformer Pattern (crypto-scout-analyst)
public final class AnalystTransformer extends AbstractStreamTransformer<StreamPayload, StreamPayload> {
private final AnalystEngine engine;
@Override
protected StreamDataAcceptor<StreamPayload> onResumed(final StreamDataAcceptor<StreamPayload> output) {
return in -> {
try {
final var payload = in.payload();
final var result = engine.analyze(payload);
output.accept(new StreamPayload(in.stream(), in.offset(), result));
} catch (final Exception ex) {
LOGGER.error("Analysis failed", ex);
output.accept(new StreamPayload(in.stream(), in.offset(), null));
}
};
}
}
AMQP Protocol (Port 5672)
Consumer Implementation (crypto-scout-collector)
public final class AmqpConsumer extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
final var factory = new ConnectionFactory();
factory.setHost("crypto-scout-mq");
factory.setPort(5672);
factory.setUsername("crypto_scout_mq");
factory.setPassword("password");
connection = factory.newConnection();
channel = connection.createChannel();
channel.basicQos(1);
final var consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
try {
final var message = JsonUtils.bytes2Object(body, Map.class);
messageHandler.accept(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
channel.basicConsume("collector-queue", false, consumer);
});
}
@Override
public Promise<Void> stop() {
return Promise.ofBlocking(executor, () -> {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
});
}
}
Publisher Implementation (crypto-scout-collector)
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
public Promise<Void> publish(final Map<String, Object> message,
final String routingKey) {
return Promise.ofBlocking(executor, () -> {
final var bytes = JsonUtils.object2Bytes(message);
final var props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.build();
channel.basicPublish(
"crypto-scout-exchange",
routingKey,
props,
bytes
);
});
}
}
Payload Structure
Standard Payload Format (jcryptolib)
public class Payload<T> {
private final Provider provider;
private final Source source;
private final Event event;
private final long timestamp;
private final String symbol;
private final T data;
}
{
"provider": "BYBIT",
"source": "PMST",
"event": "TICKERS",
"timestamp": 1704067200000,
"symbol": "BTCUSDT",
"data": {
"lastPrice": "42000.50",
"highPrice24h": "43500.00",
"lowPrice24h": "41000.00"
}
}
Provider Enum
public enum Provider {
BYBIT,
CMC
}
Source Enum
public enum Source {
PMST,
PML,
API,
}
Configuration
Environment Variables
| Variable | Default | Description |
|---|
AMQP_RABBITMQ_HOST | localhost | RabbitMQ host |
AMQP_RABBITMQ_PORT | 5672 | AMQP port |
AMQP_STREAM_PORT | 5552 | Streams port |
AMQP_RABBITMQ_USERNAME | crypto_scout_mq | Username |
AMQP_RABBITMQ_PASSWORD | - | Password |
AMQP_BYBIT_STREAM | bybit-stream | Bybit stream name |
AMQP_CRYPTO_SCOUT_STREAM | crypto-scout-stream | CMC stream name |
Java Configuration Pattern
final static class AmqpConfig {
static final String AMQP_RABBITMQ_HOST = System.getProperty("amqp.rabbitmq.host", "localhost");
static final int AMQP_RABBITMQ_PORT = Integer.parseInt(System.getProperty("amqp.rabbitmq.port", "5672"));
static final int AMQP_STREAM_PORT = Integer.parseInt(System.getProperty("amqp.stream.port", "5552"));
static final String AMQP_RABBITMQ_USERNAME = System.getProperty("amqp.rabbitmq.username", "crypto_scout_mq");
static final String AMQP_RABBITMQ_PASSWORD = System.getProperty("amqp.rabbitmq.password", "");
}
Stream Retention Policy (definitions.json)
{
"vhosts": [
{
"name": "/",
"policies": [
{
"pattern": ".*-stream$",
"definition": {
"max-length-bytes": 2000000000,
"max-age": "1D",
"stream-max-segment-size-bytes": 100000000
}
}
]
}
]
}
Error Handling
Connection Recovery
private Promise<Void> startWithRetry() {
return Promise.ofBlocking(executor, () -> {
int attempts = 0;
while (attempts < MAX_RETRIES) {
try {
connect();
return;
} catch (Exception e) {
attempts++;
if (attempts >= MAX_RETRIES) {
throw new IllegalStateException("Failed to connect after retries", e);
}
Thread.sleep(RETRY_DELAY_MS * attempts);
}
}
});
}
Publisher Confirm Timeout
producer.send(message, status -> {
reactor.scheduleAfter(Duration.ofSeconds(30), () -> {
if (!settablePromise.isComplete()) {
settablePromise.setException(
new IllegalStateException("Publish confirmation timeout")
);
}
});
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
Offset Management Error Handling
private void handleMessage(final Context context, final Message message) {
try {
final var payload = parsePayload(message);
service.save(payload, context.offset());
} catch (final Exception e) {
LOGGER.error("Failed to process message at offset {}", context.offset(), e);
}
}
Monitoring
Health Checks
public boolean isReady() {
return environment != null &&
bybitStream != null &&
cryptoScoutStream != null;
}
curl http:
# Returns: ok (200) or not-ready (503)
Stream Statistics
podman exec crypto-scout-mq rabbitmq-streams stream_info bybit-stream
# Management UI
curl -u crypto_scout_mq:password http:
Management UI
Test Utilities (crypto-scout-test)
Stream Test Publisher
final var publisher = StreamTestPublisher.create("bybit-stream");
publisher.start().get();
publisher.publish(payload);
Stream Test Consumer
final var consumer = StreamTestConsumer.create("bybit-stream", handler);
consumer.start().get();
AMQP Test Utilities
final var amqpPublisher = AmqpTestPublisher.create();
amqpPublisher.publish(message, "collector-queue");
final var amqpConsumer = AmqpTestConsumer.create("collector-queue", handler);
When to Use Me
Use this skill when:
- Implementing RabbitMQ Streams publishers or consumers
- Configuring AMQP queues and exchanges
- Managing stream offsets for exactly-once processing
- Handling connection failures and retries
- Designing message payload structures
- Setting up stream retention policies
- Monitoring messaging health
- Writing integration tests with messaging
- Implementing stream transformers