بنقرة واحدة
rabbitmq-messaging
RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem
التثبيت باستخدام Codex أو Claude انسخ هذا Prompt والصقه في Codex أو Claude أو مساعد آخر ليراجع صفحة Skill ويثبّتها لك.
القائمة
RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem
التثبيت باستخدام Codex أو Claude انسخ هذا Prompt والصقه في Codex أو Claude أو مساعد آخر ليراجع صفحة Skill ويثبّتها لك.
استنادا إلى تصنيف SOC المهني
Java 25 development patterns for the crypto-scout ecosystem including microservices, ActiveJ, and async I/O
Maven build configuration for the crypto-scout multi-module Java 25 project
Podman Compose orchestration for crypto-scout services including RabbitMQ, TimescaleDB, and microservices
High-level architecture and design patterns for the crypto-scout ecosystem
TimescaleDB data modeling and operations for crypto market time-series data
| name | rabbitmq-messaging |
| description | RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem |
| license | MIT |
| compatibility | opencode |
| metadata | {"messaging":"rabbitmq","protocols":"streams,amqp","version":"4.1.4"} |
Provide guidance for RabbitMQ Streams and AMQP messaging patterns used in the crypto-scout ecosystem for real-time data flow.
┌─────────────────────────────────────────────────────────────┐
│ crypto-scout-exchange │
│ (direct type) │
└──────────────┬──────────────────────────────┬───────────────┘
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ bybit-stream │ │crypto-scout-stream
│ (Stream) │ │ (Stream) │
└────────┬────────┘ └────────┬────────┘
│ │
│ Payload<Map<String,Object>> │
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ crypto-scout │ │ crypto-scout │
│ -client │ │ -collector │
│ (Publisher) │ │ (Consumer) │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ crypto-scout │
│ -analyst │
│ (Consumer) │
└─────────────────┘
| Queue | Purpose | Arguments |
|---|---|---|
collector-queue | Command/control messages | lazy mode, TTL 6h, max 2500 |
chatbot-queue | Chatbot notifications | lazy mode, TTL 6h, max 2500 |
dlx-queue | Dead letter handling | lazy mode, TTL 7d |
| Stream | Purpose | Retention |
|---|---|---|
bybit-stream | Bybit market data | 1 day, 2GB max |
crypto-scout-stream | CMC/parser data | 1 day, 2GB max |
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Producer bybitStream;
private volatile Producer cryptoScoutStream;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = Environment.builder()
.host("crypto-scout-mq")
.port(5552)
.username("crypto_scout_mq")
.password("password")
.build();
bybitStream = environment.producerBuilder()
.name("bybit-stream")
.stream("bybit-stream")
.build();
cryptoScoutStream = environment.producerBuilder()
.name("crypto-scout-stream")
.stream("crypto-scout-stream")
.build();
});
}
public Promise<Void> publish(final Payload<Map<String, Object>> payload) {
final var producer = getProducer(payload.getProvider());
final var settablePromise = new SettablePromise<Void>();
final var message = producer.messageBuilder()
.addData(JsonUtils.object2Bytes(payload))
.build();
producer.send(message, status -> {
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
return settablePromise;
}
private Producer getProducer(final Provider provider) {
return switch (provider) {
case BYBIT -> bybitStream;
case CMC -> cryptoScoutStream;
};
}
}
public final class StreamService extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Consumer bybitConsumer;
private volatile Consumer cryptoScoutConsumer;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = AmqpConfig.getEnvironment();
bybitConsumer = environment.consumerBuilder()
.stream("bybit-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleBybitMessage)
.build();
cryptoScoutConsumer = environment.consumerBuilder()
.stream("crypto-scout-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleCryptoScoutMessage)
.build();
});
}
private void updateOffset(final SubscriptionContext context) {
final var streamName = context.stream();
final var savedOffset = offsetRepository.getOffset(streamName);
if (savedOffset.isPresent()) {
context.offsetSpecification(
OffsetSpecification.offset(savedOffset.getAsLong() + 1)
);
} else {
context.offsetSpecification(OffsetSpecification.first());
}
}
private void handleBybitMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
bybitStreamService.save(payload, context.offset());
}
private void handleCryptoScoutMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
cryptoScoutService.save(payload, context.offset());
}
}
public final class AnalystTransformer extends AbstractStreamTransformer<StreamPayload, StreamPayload> {
private final AnalystEngine engine;
@Override
protected StreamDataAcceptor<StreamPayload> onResumed(final StreamDataAcceptor<StreamPayload> output) {
return in -> {
try {
final var payload = in.payload();
final var result = engine.analyze(payload);
output.accept(new StreamPayload(in.stream(), in.offset(), result));
} catch (final Exception ex) {
LOGGER.error("Analysis failed", ex);
output.accept(new StreamPayload(in.stream(), in.offset(), null));
}
};
}
}
public final class AmqpConsumer extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
final var factory = new ConnectionFactory();
factory.setHost("crypto-scout-mq");
factory.setPort(5672);
factory.setUsername("crypto_scout_mq");
factory.setPassword("password");
connection = factory.newConnection();
channel = connection.createChannel();
channel.basicQos(1); // Fair dispatch
final var consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
try {
final var message = JsonUtils.bytes2Object(body, Map.class);
messageHandler.accept(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
channel.basicConsume("collector-queue", false, consumer);
});
}
@Override
public Promise<Void> stop() {
return Promise.ofBlocking(executor, () -> {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
});
}
}
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
public Promise<Void> publish(final Map<String, Object> message,
final String routingKey) {
return Promise.ofBlocking(executor, () -> {
final var bytes = JsonUtils.object2Bytes(message);
final var props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) // Persistent
.build();
channel.basicPublish(
"crypto-scout-exchange",
routingKey,
props,
bytes
);
});
}
}
public class Payload<T> {
private final Provider provider; // BYBIT, CMC
private final Source source; // PMST, PML, API, etc.
private final Event event; // TICKERS, KLINE, TRADE, etc.
private final long timestamp;
private final String symbol; // BTCUSDT, ETHUSDT
private final T data;
}
// Example JSON
{
"provider": "BYBIT",
"source": "PMST",
"event": "TICKERS",
"timestamp": 1704067200000,
"symbol": "BTCUSDT",
"data": {
"lastPrice": "42000.50",
"highPrice24h": "43500.00",
"lowPrice24h": "41000.00"
}
}
public enum Provider {
BYBIT, // Bybit exchange data
CMC // CoinMarketCap data
}
public enum Source {
PMST, // Bybit public spot tickers
PML, // Bybit public linear tickers
API, // REST API data
// ... etc
}
| Variable | Default | Description |
|---|---|---|
AMQP_RABBITMQ_HOST | localhost | RabbitMQ host |
AMQP_RABBITMQ_PORT | 5672 | AMQP port |
AMQP_STREAM_PORT | 5552 | Streams port |
AMQP_RABBITMQ_USERNAME | crypto_scout_mq | Username |
AMQP_RABBITMQ_PASSWORD | - | Password |
AMQP_BYBIT_STREAM | bybit-stream | Bybit stream name |
AMQP_CRYPTO_SCOUT_STREAM | crypto-scout-stream | CMC stream name |
final static class AmqpConfig {
static final String AMQP_RABBITMQ_HOST = System.getProperty("amqp.rabbitmq.host", "localhost");
static final int AMQP_RABBITMQ_PORT = Integer.parseInt(System.getProperty("amqp.rabbitmq.port", "5672"));
static final int AMQP_STREAM_PORT = Integer.parseInt(System.getProperty("amqp.stream.port", "5552"));
static final String AMQP_RABBITMQ_USERNAME = System.getProperty("amqp.rabbitmq.username", "crypto_scout_mq");
static final String AMQP_RABBITMQ_PASSWORD = System.getProperty("amqp.rabbitmq.password", "");
}
{
"vhosts": [
{
"name": "/",
"policies": [
{
"pattern": ".*-stream$",
"definition": {
"max-length-bytes": 2000000000,
"max-age": "1D",
"stream-max-segment-size-bytes": 100000000
}
}
]
}
]
}
private Promise<Void> startWithRetry() {
return Promise.ofBlocking(executor, () -> {
int attempts = 0;
while (attempts < MAX_RETRIES) {
try {
connect();
return;
} catch (Exception e) {
attempts++;
if (attempts >= MAX_RETRIES) {
throw new IllegalStateException("Failed to connect after retries", e);
}
Thread.sleep(RETRY_DELAY_MS * attempts);
}
}
});
}
producer.send(message, status -> {
reactor.scheduleAfter(Duration.ofSeconds(30), () -> {
if (!settablePromise.isComplete()) {
settablePromise.setException(
new IllegalStateException("Publish confirmation timeout")
);
}
});
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
private void handleMessage(final Context context, final Message message) {
try {
final var payload = parsePayload(message);
service.save(payload, context.offset());
} catch (final Exception e) {
LOGGER.error("Failed to process message at offset {}", context.offset(), e);
// Don't ack - will retry on restart
// Or send to DLX based on error type
}
}
public boolean isReady() {
return environment != null &&
bybitStream != null &&
cryptoScoutStream != null;
}
// HTTP endpoint
curl http://localhost:8081/health
# Returns: ok (200) or not-ready (503)
// Check stream info
podman exec crypto-scout-mq rabbitmq-streams stream_info bybit-stream
# Management UI
curl -u crypto_scout_mq:password http://localhost:15672/api/queues
final var publisher = StreamTestPublisher.create("bybit-stream");
publisher.start().get();
publisher.publish(payload);
final var consumer = StreamTestConsumer.create("bybit-stream", handler);
consumer.start().get();
final var amqpPublisher = AmqpTestPublisher.create();
amqpPublisher.publish(message, "collector-queue");
final var amqpConsumer = AmqpTestConsumer.create("collector-queue", handler);
Use this skill when: