一键导入
new-broker
// Complete step-by-step guide for implementing a new protocol Broker in RobustMQ. Use when the user asks to add a new broker, implement a new protocol, or scaffold a new broker crate.
// Complete step-by-step guide for implementing a new protocol Broker in RobustMQ. Use when the user asks to add a new broker, implement a new protocol, or scaffold a new broker crate.
| name | new-broker |
| description | Complete step-by-step guide for implementing a new protocol Broker in RobustMQ. Use when the user asks to add a new broker, implement a new protocol, or scaffold a new broker crate. |
Complete steps for implementing a brand-new protocol Broker in RobustMQ.
Each Broker's responsibility is: protocol parsing + orchestration of shared infrastructure components. It does NOT implement storage, routing, or cluster communication itself.
Protocol packet → broker handler → shared components (storage-adapter / broker-core / node-call / rate-limit)
src/protocol/src/<proto>/packet.rs — define all protocol packet data structures (refer to NATS ServerInfo / ClientConnect / NatsPacket)codec.rs — implement tokio_util::codec::Decoder + Encoder<Packet>, requires #[derive(Clone)]mod.rs — pub mod codec; pub mod packet;pub mod <proto>; to src/protocol/src/lib.rsCodec notes:
#[derive(Clone)] required on the codec struct and all internal enumsBox<> to avoid clippy::large_enum_variantsrc/protocol/src/robust.rsAdd one variant in each location:
| Location | What to add |
|---|---|
RobustMQProtocol enum | PROTO variant; to_u8() assigns a unique u8; to_str() returns the name; from_u8() adds a branch; add is_proto() method |
XxxWrapperExtend struct | Create pub struct ProtoWrapperExtend {} |
RobustMQWrapperExtend enum | PROTO(ProtoWrapperExtend) variant; add branch in to_mqtt_protocol() |
RobustMQPacket enum | PROTO(ProtoPacket) variant; add get_proto_packet() method |
src/protocol/src/codec.rsPROTO(ProtoPacket) variant to RobustMQCodecWrapper enum; add branch in Display implPROTO(ProtoCodec) variant to RobustMQCodecEnum enumRobustMQProtocol::PROTO to PROTOCOL_PROBE_ORDER constant arrayproto_codec: ProtoCodec field to RobustMQCodec struct; initialize in new()Some(RobustMQProtocol::PROTO) arm in decode_data() match on self.protocolRobustMQCodecWrapper::PROTO(pkt) arm in encode_data() matchThe following files have exhaustive matches — add a PROTO branch in each (search for RobustMQPacket::StorageEngine to locate them):
src/common/network-server/src/common/handler.rs — two places: write_response() and write_websocket_response()src/common/network-server/src/common/write.rs — two places: write_tcp_frame() and write_quic_frame()src/common/network-server/src/common/tcp_acceptor.rssrc/common/network-server/src/common/tls_acceptor.rssrc/common/network-server/src/quic/acceptor.rssrc/common/network-server/src/websocket/server.rssrc/<proto>-broker/src/<proto>-broker/
├── Cargo.toml
└── src/
├── lib.rs — pub mod broker/handler/nats/server
├── broker.rs — XxxBrokerServerParams + XxxBrokerServer (DEFAULT_PORT)
├── server/mod.rs — XxxServer, TcpServer + handler_process
├── handler/
│ ├── mod.rs
│ └── command.rs — XxxHandlerCommand impl Command, match dispatches to <proto>/ functions
└── <proto>/
├── mod.rs
├── connect.rs — process_connect()
├── publish.rs — process_pub()
├── subscribe.rs — process_sub() / process_unsub()
└── ping.rs — process_ping() / process_pong() (if the protocol has heartbeats)
<proto>/ directory design principles:
Option<ProtoPacket>command.rs pattern:
let resp_packet = match &packet {
ProtoPacket::Connect(req) => connect::process_connect(req),
ProtoPacket::Pub { subject, payload, .. } => publish::process_pub(subject, payload),
// ...
}?;
Some(ResponsePackage::new(connection_id, RobustMQPacket::PROTO(resp_packet)))
src/common/config/src/config.rs — create pub struct ProtoRuntime { pub network: Network } with Default impl; add field to BrokerConfig; initialize in BrokerConfig::default()default.rs — add pub fn default_proto_runtime() -> ProtoRuntime; add ProtoRuntime to use importsCargo.toml — add "src/<proto>-broker" to [workspace] members; add <proto>-broker = { path = "src/<proto>-broker" } to [workspace.dependencies]src/broker-server/Cargo.toml — add <proto>-broker.workspace = truesrc/broker-server/src/lib.rsuse <proto>_broker::broker::{XxxBrokerServer, XxxBrokerServerParams};proto_params: XxxBrokerServerParams field to BrokerServer structproto_params in new() (refer to kafka_params / amqp_params)self.start_proto_broker(app_stop.clone()); in start() Phase 7start_proto_broker() method (refer to start_kafka_broker)cargo build --workspace
Successful compilation confirms the scaffold is complete. Fill in business logic in the <proto>/ directory functions afterward.
| Problem | Cause | Fix |
|---|---|---|
Clone compile error | Internal enum in codec struct missing #[derive(Clone)] | Add Clone to all enums inside the codec |
large_enum_variant clippy error | Enum variant size difference exceeds threshold | Wrap large variants in Box<> |
non-exhaustive patterns | match in network-server missing new branch | Search StorageEngine to locate all matches, add branch to each |
unresolved module metadata_struct | Missing Cargo.toml dependency | Add metadata-struct.workspace = true |
| PR template CLA link broken | Relative path doesn't render correctly on GitHub | Change to https://github.com/robustmq/robustmq/blob/main/CLA.md |
7×24 chaos testing for RobustMQ. Injects broker-kill and network-delay faults, validates SDK client resilience across Python/Go/Rust/Java, and publishes a Markdown + JSON report to GitHub after each run.
Audit and update the HTTP API documentation under docs/zh/Api/ and docs/en/Api/ against the actual route definitions in src/admin-server/src/. Uses path.rs as the single source of truth. Fixes wrong URI prefixes, non-existent routes, wrong request/response fields, and syncs the English docs to match the Chinese ones.
Deep analysis and iterative fixing of a Rust source file. Finds logic errors, lock/concurrency issues, and simplification opportunities, then fixes them one by one until the file is clean.
Implements new RobustMQ MQTT connector integrations end-to-end using project conventions. Use when the user asks to add, implement, or support a new connector type such as webhook, opentsdb, clickhouse, influxdb, cassandra, mqtt bridge, or protocol-compatible targets.
Designs and implements minimal, high-value metrics for RobustMQ services and dashboards. Use when the user asks to add metrics, improve observability, or update Grafana panels for core processing pipelines.
Create GitHub issues for the RobustMQ project. Use when the user asks to create an issue, file a bug, propose a feature, or track a task.