원클릭으로
rabbitmq-messaging
RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem
Codex 또는 Claude로 설치 이 Prompt를 복사해 Codex, Claude 또는 다른 어시스턴트에 붙여 넣으면 Skill 페이지를 검토하고 설치를 진행할 수 있습니다.
메뉴
RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem
Codex 또는 Claude로 설치 이 Prompt를 복사해 Codex, Claude 또는 다른 어시스턴트에 붙여 넣으면 Skill 페이지를 검토하고 설치를 진행할 수 있습니다.
SOC 직업 분류 기준
Java 25 development patterns for the crypto-scout ecosystem including microservices, ActiveJ, and async I/O
Maven build configuration for the crypto-scout multi-module Java 25 project
Podman Compose orchestration for crypto-scout services including RabbitMQ, TimescaleDB, and microservices
High-level architecture and design patterns for the crypto-scout ecosystem
TimescaleDB data modeling and operations for crypto market time-series data
| name | rabbitmq-messaging |
| description | RabbitMQ Streams and AMQP messaging patterns for the crypto-scout ecosystem |
| license | MIT |
| compatibility | opencode |
| metadata | {"messaging":"rabbitmq","protocols":"streams,amqp","version":"4.1.4"} |
Provide guidance for RabbitMQ Streams and AMQP messaging patterns used in the crypto-scout ecosystem for real-time data flow.
┌─────────────────────────────────────────────────────────────┐
│ crypto-scout-exchange │
│ (direct type) │
└──────────────┬──────────────────────────────┬───────────────┘
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ bybit-stream │ │crypto-scout-stream
│ (Stream) │ │ (Stream) │
└────────┬────────┘ └────────┬────────┘
│ │
│ Payload<Map<String,Object>> │
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ crypto-scout │ │ crypto-scout │
│ -client │ │ -collector │
│ (Publisher) │ │ (Consumer) │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ crypto-scout │
│ -analyst │
│ (Consumer) │
└─────────────────┘
| Queue | Purpose | Arguments |
|---|---|---|
collector-queue | Command/control messages | lazy mode, TTL 6h, max 2500 |
chatbot-queue | Chatbot notifications | lazy mode, TTL 6h, max 2500 |
dlx-queue | Dead letter handling | lazy mode, TTL 7d |
| Stream | Purpose | Retention |
|---|---|---|
bybit-stream | Bybit market data | 1 day, 2GB max |
crypto-scout-stream | CMC/parser data | 1 day, 2GB max |
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Producer bybitStream;
private volatile Producer cryptoScoutStream;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = Environment.builder()
.host("crypto-scout-mq")
.port(5552)
.username("crypto_scout_mq")
.password("password")
.build();
bybitStream = environment.producerBuilder()
.name("bybit-stream")
.stream("bybit-stream")
.build();
cryptoScoutStream = environment.producerBuilder()
.name("crypto-scout-stream")
.stream("crypto-scout-stream")
.build();
});
}
public Promise<Void> publish(final Payload<Map<String, Object>> payload) {
final var producer = getProducer(payload.getProvider());
final var settablePromise = new SettablePromise<Void>();
final var message = producer.messageBuilder()
.addData(JsonUtils.object2Bytes(payload))
.build();
producer.send(message, status -> {
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
return settablePromise;
}
private Producer getProducer(final Provider provider) {
return switch (provider) {
case BYBIT -> bybitStream;
case CMC -> cryptoScoutStream;
};
}
}
public final class StreamService extends AbstractReactive implements ReactiveService {
private volatile Environment environment;
private volatile Consumer bybitConsumer;
private volatile Consumer cryptoScoutConsumer;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
environment = AmqpConfig.getEnvironment();
bybitConsumer = environment.consumerBuilder()
.stream("bybit-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleBybitMessage)
.build();
cryptoScoutConsumer = environment.consumerBuilder()
.stream("crypto-scout-stream")
.noTrackingStrategy()
.subscriptionListener(this::updateOffset)
.messageHandler(this::handleCryptoScoutMessage)
.build();
});
}
private void updateOffset(final SubscriptionContext context) {
final var streamName = context.stream();
final var savedOffset = offsetRepository.getOffset(streamName);
if (savedOffset.isPresent()) {
context.offsetSpecification(
OffsetSpecification.offset(savedOffset.getAsLong() + 1)
);
} else {
context.offsetSpecification(OffsetSpecification.first());
}
}
private void handleBybitMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
bybitStreamService.save(payload, context.offset());
}
private void handleCryptoScoutMessage(final Context context, final Message message) {
final var payload = JsonUtils.bytes2Object(
message.getBodyAsBinary(),
Payload.class
);
cryptoScoutService.save(payload, context.offset());
}
}
public final class AnalystTransformer extends AbstractStreamTransformer<StreamPayload, StreamPayload> {
private final AnalystEngine engine;
@Override
protected StreamDataAcceptor<StreamPayload> onResumed(final StreamDataAcceptor<StreamPayload> output) {
return in -> {
try {
final var payload = in.payload();
final var result = engine.analyze(payload);
output.accept(new StreamPayload(in.stream(), in.offset(), result));
} catch (final Exception ex) {
LOGGER.error("Analysis failed", ex);
output.accept(new StreamPayload(in.stream(), in.offset(), null));
}
};
}
}
public final class AmqpConsumer extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
@Override
public Promise<Void> start() {
return Promise.ofBlocking(executor, () -> {
final var factory = new ConnectionFactory();
factory.setHost("crypto-scout-mq");
factory.setPort(5672);
factory.setUsername("crypto_scout_mq");
factory.setPassword("password");
connection = factory.newConnection();
channel = connection.createChannel();
channel.basicQos(1); // Fair dispatch
final var consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
try {
final var message = JsonUtils.bytes2Object(body, Map.class);
messageHandler.accept(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
channel.basicConsume("collector-queue", false, consumer);
});
}
@Override
public Promise<Void> stop() {
return Promise.ofBlocking(executor, () -> {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
});
}
}
public final class AmqpPublisher extends AbstractReactive implements ReactiveService {
private volatile Connection connection;
private volatile Channel channel;
public Promise<Void> publish(final Map<String, Object> message,
final String routingKey) {
return Promise.ofBlocking(executor, () -> {
final var bytes = JsonUtils.object2Bytes(message);
final var props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) // Persistent
.build();
channel.basicPublish(
"crypto-scout-exchange",
routingKey,
props,
bytes
);
});
}
}
public class Payload<T> {
private final Provider provider; // BYBIT, CMC
private final Source source; // PMST, PML, API, etc.
private final Event event; // TICKERS, KLINE, TRADE, etc.
private final long timestamp;
private final String symbol; // BTCUSDT, ETHUSDT
private final T data;
}
// Example JSON
{
"provider": "BYBIT",
"source": "PMST",
"event": "TICKERS",
"timestamp": 1704067200000,
"symbol": "BTCUSDT",
"data": {
"lastPrice": "42000.50",
"highPrice24h": "43500.00",
"lowPrice24h": "41000.00"
}
}
public enum Provider {
BYBIT, // Bybit exchange data
CMC // CoinMarketCap data
}
public enum Source {
PMST, // Bybit public spot tickers
PML, // Bybit public linear tickers
API, // REST API data
// ... etc
}
| Variable | Default | Description |
|---|---|---|
AMQP_RABBITMQ_HOST | localhost | RabbitMQ host |
AMQP_RABBITMQ_PORT | 5672 | AMQP port |
AMQP_STREAM_PORT | 5552 | Streams port |
AMQP_RABBITMQ_USERNAME | crypto_scout_mq | Username |
AMQP_RABBITMQ_PASSWORD | - | Password |
AMQP_BYBIT_STREAM | bybit-stream | Bybit stream name |
AMQP_CRYPTO_SCOUT_STREAM | crypto-scout-stream | CMC stream name |
final static class AmqpConfig {
static final String AMQP_RABBITMQ_HOST = System.getProperty("amqp.rabbitmq.host", "localhost");
static final int AMQP_RABBITMQ_PORT = Integer.parseInt(System.getProperty("amqp.rabbitmq.port", "5672"));
static final int AMQP_STREAM_PORT = Integer.parseInt(System.getProperty("amqp.stream.port", "5552"));
static final String AMQP_RABBITMQ_USERNAME = System.getProperty("amqp.rabbitmq.username", "crypto_scout_mq");
static final String AMQP_RABBITMQ_PASSWORD = System.getProperty("amqp.rabbitmq.password", "");
}
{
"vhosts": [
{
"name": "/",
"policies": [
{
"pattern": ".*-stream$",
"definition": {
"max-length-bytes": 2000000000,
"max-age": "1D",
"stream-max-segment-size-bytes": 100000000
}
}
]
}
]
}
private Promise<Void> startWithRetry() {
return Promise.ofBlocking(executor, () -> {
int attempts = 0;
while (attempts < MAX_RETRIES) {
try {
connect();
return;
} catch (Exception e) {
attempts++;
if (attempts >= MAX_RETRIES) {
throw new IllegalStateException("Failed to connect after retries", e);
}
Thread.sleep(RETRY_DELAY_MS * attempts);
}
}
});
}
producer.send(message, status -> {
reactor.scheduleAfter(Duration.ofSeconds(30), () -> {
if (!settablePromise.isComplete()) {
settablePromise.setException(
new IllegalStateException("Publish confirmation timeout")
);
}
});
if (status.isConfirmed()) {
settablePromise.set(null);
} else {
settablePromise.setException(
new IllegalStateException("Publish not confirmed: " + status)
);
}
});
private void handleMessage(final Context context, final Message message) {
try {
final var payload = parsePayload(message);
service.save(payload, context.offset());
} catch (final Exception e) {
LOGGER.error("Failed to process message at offset {}", context.offset(), e);
// Don't ack - will retry on restart
// Or send to DLX based on error type
}
}
public boolean isReady() {
return environment != null &&
bybitStream != null &&
cryptoScoutStream != null;
}
// HTTP endpoint
curl http://localhost:8081/health
# Returns: ok (200) or not-ready (503)
// Check stream info
podman exec crypto-scout-mq rabbitmq-streams stream_info bybit-stream
# Management UI
curl -u crypto_scout_mq:password http://localhost:15672/api/queues
final var publisher = StreamTestPublisher.create("bybit-stream");
publisher.start().get();
publisher.publish(payload);
final var consumer = StreamTestConsumer.create("bybit-stream", handler);
consumer.start().get();
final var amqpPublisher = AmqpTestPublisher.create();
amqpPublisher.publish(message, "collector-queue");
final var amqpConsumer = AmqpTestConsumer.create("collector-queue", handler);
Use this skill when: