with one click
libatapp-module-connector
// Use when: writing libatapp modules, connectors, endpoint management, connection handles, message routing, suspend_stop logic, or multi-node tests.
// Use when: writing libatapp modules, connectors, endpoint management, connection handles, message routing, suspend_stop logic, or multi-node tests.
Use when: auditing or optimizing AI agent prompts, bridge files, skills, SKILL.md metadata, and cross-tool compatibility.
Use when: configuring or building libatapp with CMake, changing shared/static builds, or adjusting build type options.
Use when: editing libatapp enable_expression config fields, environment-variable expansion syntax, YAML/INI/env loading, or atapp_conf.proto config metadata.
Use when: working on libatapp etcd integration, service discovery sets, topology management, keepalive actors, watchers, node selection, or etcd_module.
Use when: running or writing libatapp unit tests, filtering private test cases, using multi-node/debug-time patterns, testing etcd, or fixing Windows DLL/PATH startup issues.
| name | libatapp-module-connector |
| description | Use when: writing libatapp modules, connectors, endpoint management, connection handles, message routing, suspend_stop logic, or multi-node tests. |
Modules inherit atframework::atapp::module_impl and are registered with app.add_module(ptr).
Lifecycle call order:
on_bind() ā Module added to app (get_app() available after)
ā
setup(conf) ā Config loaded, before init (can modify conf)
ā
init() ā Pure virtual; must implement (return 0 = success)
ā
ready() ā All modules initialized; app fully ready
ā
āāāā tick loop āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā tick() ā Called every timer.tick_interval ā
ā Return > 0 = busy (prevents sleep) ā
ā prereload() ā Before config reload (if triggered) ā
ā reload() ā After config reloaded ā
ā setup_log() ā Log config changed ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā
stop() ā App stopping; return non-zero = async stop
ā
timeout() ā Called if stop exceeds deadline
ā
cleanup() ā Final resource release
ā
on_unbind() ā Module removed from app
#include <atframe/atapp_module_impl.h>
class my_module : public atframework::atapp::module_impl {
public:
int init() override {
// Access app config: get_app()->get_origin_configure()
// Access bus node: get_app()->get_bus_node()
return 0; // 0 = success
}
int reload() override {
// Re-read configuration values
return 0;
}
int stop() override {
if (!all_done()) {
// Async stop: will be called again
suspend_stop(std::chrono::seconds(30), [this]() {
return all_done();
});
return 1; // non-zero = still stopping
}
return 0; // 0 = stop complete
}
int timeout() override {
// Force-stop: deadline exceeded
force_close_everything();
return 0;
}
const char* name() const override { return "my_module"; }
int tick() override {
// Periodic work
int busy = process_pending();
return busy; // > 0 = busy
}
};
Delays module shutdown until a condition is met or timeout expires:
suspend_stop(std::chrono::seconds(30), []() -> bool {
return condition_met; // return true = done, resume stop
});
timeout() is called// Inside a module:
get_app() // ā app& (available after on_bind)
get_app()->get_id() // ā app_id_t (bus ID)
get_app()->get_app_name() // ā const std::string&
get_app()->get_origin_configure() // ā const atapp::protocol::atapp_configure&
get_app()->get_bus_node() // ā atbus::node*
get_app()->is_running() // ā bool
get_app()->is_closing() // ā bool
get_app()->get_last_tick_time() // ā time_point
atapp_connector_impl (base)
āāā atapp_connector_atbus (libatbus message bus)
āāā atapp_connector_loopback (self-message delivery)
class atapp_connector_impl {
public:
// Address classification (pure virtual)
virtual uint32_t get_address_type(const channel_address_t &addr) const noexcept = 0;
// Lifecycle hooks
virtual int32_t on_start_listen(const channel_address_t &addr);
virtual int32_t on_start_connect(
const etcd_discovery_node &discovery,
atapp_endpoint &endpoint,
const channel_address_t &addr,
const atapp_connection_handle::ptr_t &handle);
virtual int32_t on_close_connection(atapp_connection_handle &handle);
// Message I/O
virtual int32_t on_send_forward_request(
atapp_connection_handle *handle,
int32_t type,
uint64_t *msg_sequence,
gsl::span<const unsigned char> data,
const atapp::protocol::atapp_metadata *metadata);
virtual void on_receive_forward_response(...);
// Discovery events
virtual void on_discovery_event(etcd_discovery_action_t, const etcd_discovery_node::ptr_t &);
};
address_type_t:
kNone = 0x0000 // Unknown / not handled
kDuplex = 0x0001 // Full duplex (e.g., TCP)
kSimplex = 0x0002 // One-way only
kLocalHost = 0x0004 // Same machine (e.g., shm://, unix://)
kLocalProcess = 0x0008 // Same process (e.g., mem://)
Address scheme to type mapping (atbus connector):
| Scheme | Address Type |
|---|---|
mem:// | kDuplex | kLocalProcess |
shm:// | kDuplex | kLocalHost |
unix:// | kDuplex | kLocalHost |
ipv4://, ipv6://, dns:// | kDuplex |
atcp:// | kDuplex |
class my_connector : public atframework::atapp::atapp_connector_impl {
public:
uint32_t get_address_type(const channel_address_t &addr) const noexcept override {
if (addr.scheme == "myproto") {
return address_type_t::kDuplex;
}
return address_type_t::kNone; // Don't handle this scheme
}
int32_t on_start_connect(
const etcd_discovery_node &discovery,
atapp_endpoint &endpoint,
const channel_address_t &addr,
const atapp_connection_handle::ptr_t &handle) override {
// Establish connection to remote node
// When ready: handle->set_ready()
return 0;
}
int32_t on_send_forward_request(
atapp_connection_handle *handle,
int32_t type,
uint64_t *msg_sequence,
gsl::span<const unsigned char> data,
const atapp::protocol::atapp_metadata *metadata) override {
// Send data via custom transport
return 0;
}
};
The atapp_connector_atbus performs topology-aware routing in try_connect_to():
Target resolution:
1. Is target self? ā loopback connector
2. Is target a known peer? ā direct atbus connection
3. Same upstream parent? ā direct connection attempt
4. Target is upstream? ā connect directly to upstream
5. Target is downstream? ā route down the tree
6. Otherwise: ā forward to upstream proxy
reconnect_tick_interval (base interval), reconnect_max_try_times (max retries)jiffies_timer_watcher_t (can cancel with remove_custom_timer)The atbus connector maintains a handle map keyed by bus_id:
// Internal: bus_id ā connection_handle mapping
// Used for fast lookup when routing messages
// Handles are added on connection, removed on disconnect
atapp_connection_handle wraps an individual connection:
struct flags_t {
kReady = 0, // Connection is ready for I/O
kClosing = 1, // Connection is shutting down
};
// Key methods:
handle->check_flag(flags_t::kReady) // Is connection ready?
handle->set_flag(flags_t::kReady, true)
handle->get_private_data<MyType>() // Typed private data
handle->set_private_data(ptr, destructor_fn)
handle->on_destroy = callback // Cleanup callback
atapp_endpoint represents a remote node with message queuing:
Discovery event (node discovered)
ā
ā¼
Create endpoint ā bind discovery_node
ā
ā¼
Connector attempts connection ā creates connection_handle
ā
ā¼
Connection ready (handle.kReady = true)
āāā Retry pending messages ā send via connector
āāā New messages sent directly
ā
ā¼
Connection lost
āāā New messages queued as pending
āāā Connector reconnects (if applicable)
ā
ā¼
GC timeout ā endpoint removed (if no activity)
When no ready connection exists, messages are queued:
struct pending_message_t {
raw_time_t expired_timepoint; // When message expires
int32_t type; // Message type
uint64_t message_sequence; // Unique sequence ID
std::vector<unsigned char> data;
std::unique_ptr<atapp::protocol::atapp_metadata> metadata;
};
push_forward_message() ā queue a messageretry_pending_messages() ā send queued messages when connection becomes readyget_pending_message_count() / get_pending_message_size() ā inspect queueexpired_timepoint; expired messages are discarded// From atapp_direct_connect_test.cpp
struct direct_three_node_apps {
app node1; // 0x201
app node2; // 0x202
app upstream; // 0x203
void full_setup_and_connect() {
// 1. Init upstream first
// 2. Init node1 and node2
// 3. Inject discovery nodes
// 4. Pump until all connected
}
};
// From atapp_upstream_forward_test.cpp
struct three_node_apps {
app node1; // 0x101 (allow_direct_connection: false)
app upstream; // 0x102
app node3; // 0x103
// node1 and node3 communicate via upstream proxy
};
// Pump event loop until condition or timeout
void pump_until(std::function<bool()> condition, std::chrono::milliseconds timeout) {
auto deadline = std::chrono::steady_clock::now() + timeout;
while (!condition() && std::chrono::steady_clock::now() < deadline) {
for (auto &a : apps) {
a.run_noblock();
}
CASE_THREAD_SLEEP_MS(8);
}
}
// Debug-only: virtual time control for timer tests
app::set_sys_now(future_time_point); // Jump clock forward
app.tick(); // Process timers (no I/O)
Shared uv_default_loop(): When multiple apps use init(nullptr, ...), they share the default libuv loop. run_noblock() fires timers for ALL apps. Use tick() in time-advance sections.
set_sys_now() is static: Affects ALL app instances. Set large bus timeouts (e.g., first_idle_timeout: 3600) in test configs to prevent atbus idle disconnect.
Jiffies timer + set_sys_now(): Only one timer callback fires per tick() call when time is jumped. Use multiple set_sys_now() + tick() rounds for multi-timer scenarios.
Discovery injection: Mock discovery by injecting nodes directly:
auto node = std::make_shared<etcd_discovery_node>();
node->copy_from(discovery_info);
etcd_module->get_global_discovery().add_node(node);
Module init order: init() is called in registration order. If module B depends on module A, register A first.
Pending message overflow: If connection never becomes ready and messages have long expiry, memory grows. Monitor get_pending_message_size().
Connector address overlap: If two connectors handle the same scheme, the first registered one wins. Check get_address_type() returns kNone for schemes you don't handle.
stop() return value: Return 0 = stop complete. Return non-zero = will be called again. Forgetting to eventually return 0 causes the app to hang until timeout.
Timer lazy init: add_custom_timer() before the first tick() may fail because the jiffies timer controller isn't initialized yet. Call is safe after first tick() or after process_custom_timers() runs.