// Expert knowledge for building event-driven systems with Composable Rust framework. Use when implementing reducers, designing state machines, working with effects, creating environment traits for dependency injection, building stores, or answering questions about core architectural patterns and the unidirectional data flow model.
| name | composable-rust-architecture |
| description | Expert knowledge for building event-driven systems with Composable Rust framework. Use when implementing reducers, designing state machines, working with effects, creating environment traits for dependency injection, building stores, or answering questions about core architectural patterns and the unidirectional data flow model. |
Expert knowledge for building event-driven systems using the Composable Rust framework - core architectural patterns, reducer design, effect composition, and the unidirectional data flow model.
Automatically apply when:
Every Composable Rust application is built on these five fundamental types:
(State, Action, Environment) → (State, Effects)These compose together to create a complete system.
Actions flow through the system in a self-sustaining cycle:
External Input → Action
↓
Reducer: (State, Action, Env) → (New State, Effects)
↓
Store executes Effects
↓
Effects produce new Actions:
- Effect::Future returns 0 or 1 action
- Effect::Stream yields 0..N actions over time
↓
Loop back to Reducer
Key Insight: Everything is an Action. Commands are Actions. Events are Actions. External events are Actions. This creates a unified data flow where the reducer is the single source of state transitions.
pub trait Reducer: Send + Sync {
type State: Clone + Send + Sync;
type Action: Send + Sync;
type Environment: Send + Sync;
fn reduce(
&self,
state: &mut Self::State, // Mutable for performance
action: Self::Action,
env: &Self::Environment,
) -> SmallVec<[Effect<Self::Action>; 4]>;
}
Effect valuesActions represent ALL inputs to the system. Structure them by intent:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderAction {
// Commands (external requests)
PlaceOrder { customer_id: String, items: Vec<Item> },
CancelOrder { order_id: String, reason: String },
// Events (things that happened)
OrderPlaced { order_id: String, timestamp: DateTime<Utc> },
OrderCancelled { order_id: String, reason: String },
// Cross-aggregate events (from other aggregates)
PaymentCompleted { order_id: String, payment_id: String },
InventoryReserved { order_id: String, items: Vec<Item> },
// System events
TimerExpired { timer_id: String },
RetryFailed { attempt: u32, error: String },
}
Pattern: Use descriptive names that express intent. Group related actions in the same enum. Past tense for events, imperative for commands.
State is owned, cloneable data that represents the current snapshot:
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OrderState {
pub order_id: Option<String>,
pub customer_id: Option<String>,
pub items: Vec<Item>,
pub status: OrderStatus,
pub created_at: Option<DateTime<Utc>>,
pub version: i64, // For optimistic concurrency
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderStatus {
Draft,
Placed,
PaymentPending,
Confirmed,
Cancelled,
}
Pattern: Use Option for fields that may not be set. Include version for event sourcing. Use enums for status/state machine states.
pub struct OrderReducer;
impl Reducer for OrderReducer {
type State = OrderState;
type Action = OrderAction;
type Environment = OrderEnvironment;
fn reduce(
&self,
state: &mut Self::State,
action: Self::Action,
env: &Self::Environment,
) -> SmallVec<[Effect<Self::Action>; 4]> {
match action {
// Command: Validate, update state, return effects
OrderAction::PlaceOrder { customer_id, items } => {
// 1. Validation
if items.is_empty() {
return smallvec![Effect::None];
}
// 2. State update
let order_id = format!("order-{}", env.clock.now().timestamp());
state.order_id = Some(order_id.clone());
state.customer_id = Some(customer_id.clone());
state.items = items.clone();
state.status = OrderStatus::Placed;
state.created_at = Some(env.clock.now());
// 3. Return effects (describe what should happen)
smallvec![
Effect::Database(DatabaseEffect::Save(state.clone())),
Effect::PublishEvent(OrderEvent::Placed {
order_id,
customer_id,
items,
}),
]
}
// Event: Update state idempotently
OrderAction::PaymentCompleted { order_id, payment_id } => {
if state.order_id.as_ref() == Some(&order_id) {
state.status = OrderStatus::Confirmed;
smallvec![Effect::Database(DatabaseEffect::Save(state.clone()))]
} else {
smallvec![Effect::None]
}
}
// Other actions...
_ => smallvec![Effect::None],
}
}
}
Pattern: Match on action type. Validate first. Update state. Return effects. Keep each arm focused.
pub enum Effect<Action> {
None,
Future(Pin<Box<dyn Future<Output = Option<Action>> + Send>>),
Stream(Pin<Box<dyn Stream<Item = Action> + Send>>), // Phase 8
Delay { duration: Duration, action: Box<Action> },
Parallel(Vec<Effect<Action>>),
Sequential(Vec<Effect<Action>>),
}
Effect Variants:
None: No side effect neededFuture: Async operation yielding 0 or 1 actionStream: Streaming operation yielding 0..N actions over time (Phase 8)Delay: Scheduled action after a durationParallel: Execute multiple effects concurrentlySequential: Execute effects in order, waiting for each to complete1. No side effect needed:
use smallvec::smallvec;
smallvec![Effect::None]
2. Async operation (database, HTTP, etc.):
use composable_rust_core::async_effect;
smallvec![async_effect! {
database.save(&data).await?;
Some(OrderAction::OrderSaved { order_id })
}]
3. Delayed action (timers, retries):
use composable_rust_core::delay;
smallvec![delay! {
duration: Duration::from_secs(30),
action: OrderAction::TimerExpired { order_id }
}]
4. Streaming actions (LLM tokens, WebSocket messages, etc.):
use futures::stream;
// Stream multiple actions over time
smallvec![Effect::Stream(Box::pin(stream::iter(
items.into_iter().map(|item| OrderAction::ItemProcessed { item })
)))]
// Async stream with delays
smallvec![Effect::Stream(Box::pin(async_stream::stream! {
let mut response_stream = llm_client.messages_stream(request).await?;
while let Some(chunk) = response_stream.next().await {
yield AgentAction::StreamChunk {
content: chunk?.delta.text
};
}
yield AgentAction::StreamComplete;
}))]
Use cases: LLM token streaming, WebSocket message streams, database cursors, SSE, multi-agent progress tracking.
5. Multiple parallel effects:
smallvec![Effect::Parallel(smallvec![
Effect::Database(SaveOrder),
Effect::PublishEvent(event),
Effect::Http(notify_customer),
])]
6. Sequential effects (order matters):
smallvec![Effect::Sequential(smallvec![
Effect::Database(ReserveInventory),
Effect::Database(ChargePayment),
Effect::PublishEvent(OrderConfirmed),
])]
// Merge multiple effects into one
let effects = vec![effect1, effect2, effect3];
let merged = Effect::merge(effects); // Returns Effect::Parallel
// Chain effects sequentially
let chained = effect1.then(effect2); // Returns Effect::Sequential
#[derive(State)] - Version TrackingAuto-generates version tracking methods for event-sourced state:
use composable_rust_macros::State;
use composable_rust_core::stream::Version;
#[derive(State, Clone, Debug)]
pub struct OrderState {
pub order_id: Option<String>,
pub items: Vec<Item>,
#[version] // Mark version field
pub version: Option<Version>,
}
// Auto-generated methods:
state.version(); // Get version
state.set_version(v); // Set version
Use when: Implementing event-sourced aggregates with optimistic concurrency.
#[derive(Action)] - Command/Event HelpersAuto-generates type-safe helpers for distinguishing commands vs events:
use composable_rust_macros::Action;
#[derive(Action, Clone, Debug, Serialize, Deserialize)]
pub enum OrderAction {
#[command]
PlaceOrder { customer_id: String, items: Vec<Item> },
#[event]
OrderPlaced { order_id: String, timestamp: DateTime<Utc> },
}
// Auto-generated methods:
action.is_command(); // true for PlaceOrder
action.is_event(); // true for OrderPlaced
action.event_type(); // "OrderPlaced.v1" (versioned)
Benefits: Type-safe CQRS, automatic event versioning, zero boilerplate.
append_events! - Event Store OperationsSimplify event appending with declarative syntax:
use composable_rust_core::append_events;
// Before (18 lines):
Effect::EventStore(EventStoreOperation::AppendEvents {
event_store: Arc::clone(&env.event_store),
stream_id: StreamId::new("order-123"),
expected_version: Some(Version::new(5)),
events: vec![event],
on_success: Box::new(move |v| Some(Action::Success { v })),
on_error: Box::new(|e| Some(Action::Failed { e })),
})
// After (7 lines - 60% reduction):
append_events! {
store: env.event_store,
stream: "order-123",
expected_version: Some(Version::new(5)),
events: vec![event],
on_success: |v| Some(Action::Success { v }),
on_error: |e| Some(Action::Failed { e })
}
async_effect! - Async Operationsuse composable_rust_core::async_effect;
async_effect! {
let response = http_client.get("https://api.example.com").await?;
Some(OrderAction::ResponseReceived { response })
}
delay! - Scheduled Actionsuse composable_rust_core::delay;
delay! {
duration: Duration::from_secs(30),
action: OrderAction::TimeoutExpired
}
When to use: Production code where conciseness matters. These macros have zero runtime cost.
Define traits for all dependencies:
// Database trait
pub trait Database: Send + Sync {
async fn save(&self, data: &[u8]) -> Result<(), Error>;
async fn load(&self, id: &str) -> Result<Vec<u8>, Error>;
}
// Clock trait (for deterministic testing)
pub trait Clock: Send + Sync {
fn now(&self) -> DateTime<Utc>;
}
// HTTP client trait
pub trait HttpClient: Send + Sync {
async fn post(&self, url: &str, body: &[u8]) -> Result<Response, Error>;
}
Compose traits into an environment:
pub struct OrderEnvironment<D, C, H>
where
D: Database,
C: Clock,
H: HttpClient,
{
pub database: D,
pub clock: C,
pub http_client: H,
}
pub struct PostgresDatabase { pool: PgPool }
pub struct SystemClock;
pub struct ReqwestClient;
pub struct MockDatabase { /* ... */ }
pub struct FixedClock { time: DateTime<Utc> }
pub struct MockHttpClient { /* ... */ }
pub struct LoggingDatabase<D> { inner: D }
pub struct MetricsDatabase<D> { inner: D }
Pattern: Use static dispatch (generics), not dynamic dispatch (trait objects), for zero-cost abstractions.
// Create store
let environment = OrderEnvironment {
database: PostgresDatabase::new(pool),
clock: SystemClock,
http_client: ReqwestClient::new(),
};
let store = Store::new(
OrderState::default(),
OrderReducer,
environment,
);
// Send action
let action = OrderAction::PlaceOrder {
customer_id: "cust-123".to_string(),
items: vec![item1, item2],
};
store.send(action).await;
// Get current state
let state = store.state().await;
For actions that need to wait for a result:
// Send action and wait for specific response
let result = store
.send_and_wait_for(
OrderAction::PlaceOrder { ... },
|action| matches!(action, OrderAction::OrderPlaced { .. }),
Duration::from_secs(5),
)
.await?;
❌ WRONG - Executing side effects:
fn reduce(...) -> SmallVec<[Effect; 4]> {
env.database.save(state).await; // ❌ Side effect!
smallvec![Effect::None]
}
✅ CORRECT - Returning effect description:
fn reduce(...) -> SmallVec<[Effect; 4]> {
smallvec![Effect::Database(SaveState)] // ✅ Description!
}
Why: Reducers must be pure and fast. Side effects are executed by the Store runtime.
✅ ALLOWED - Mutating state for performance:
fn reduce(&self, state: &mut State, ...) -> SmallVec<[Effect; 4]> {
state.field = new_value; // ✅ OK!
state.items.push(item); // ✅ OK!
}
Why: Performance matters. Tests are still deterministic because reducers are pure (no I/O).
✅ CORRECT - Everything is an Action:
pub enum Action {
Command(CommandType),
Event(EventType),
ExternalEvent(ExternalEventType),
}
Why: Unified type simplifies the reducer signature and enables the feedback loop.
✅ CORRECT - Generic types:
struct Store<S, A, E, R>
where
R: Reducer<State = S, Action = A, Environment = E>
{
reducer: R,
}
❌ AVOID (unless needed) - Trait objects:
struct Store {
reducer: Box<dyn Reducer>, // Runtime cost
}
Why: Static dispatch compiles to direct function calls. Zero runtime overhead.
fn reduce(...) {
println!("Logging"); // ❌ I/O in reducer
std::thread::sleep(Duration::from_secs(1)); // ❌ Blocking
}
// Effect execution should be simple dispatch
Effect::Database(op) => {
// ❌ Don't put business logic here
if should_retry && attempt < 3 {
// Complex retry logic in executor
}
}
Solution: Encode retry logic as actions/effects in the reducer.
// ❌ Giant monolithic reducer
fn reduce(...) {
match (state.order_status, state.payment_status, state.shipping_status) {
// 100s of match arms
}
}
Solution: Use reducer composition (see saga patterns skill).
use composable_rust_core::async_effect;
// ❌ Not returning actions from effects
async_effect! {
database.save(&data).await?;
None // ❌ Missing feedback!
}
Solution: Return actions from futures to feed back into the system.
// Combine two reducers that operate on the same state
let combined = combine_reducers(reducer1, reducer2);
// Scope a reducer to a sub-state
let scoped = scope_reducer(
child_reducer,
|parent_state| &mut parent_state.child,
|child_action| ParentAction::Child(child_action),
);
// Parallel execution
let parallel = Effect::Parallel(vec![
effect1,
effect2,
effect3,
]);
// Sequential execution
let sequential = Effect::Sequential(vec![
effect1, // Executes first
effect2, // Then this
effect3, // Finally this
]);
// Nested composition
let complex = Effect::Parallel(vec![
Effect::Sequential(vec![step1, step2]),
Effect::Sequential(vec![step3, step4]),
]);
#[test]
fn test_place_order() {
// Arrange
let env = OrderEnvironment {
database: MockDatabase::new(),
clock: FixedClock::new(test_time()),
http_client: MockHttpClient::new(),
};
let mut state = OrderState::default();
let action = OrderAction::PlaceOrder {
customer_id: "cust-123".to_string(),
items: vec![item],
};
// Act
let effects = OrderReducer.reduce(&mut state, action, &env);
// Assert
assert_eq!(state.status, OrderStatus::Placed);
assert_eq!(state.customer_id, Some("cust-123".to_string()));
assert_eq!(effects.len(), 2);
assert!(matches!(effects[0], Effect::Database(_)));
assert!(matches!(effects[1], Effect::PublishEvent(_)));
}
Key: Reducers test at memory speed. No I/O needed.
#[tokio::test]
async fn test_order_flow() {
let env = OrderEnvironment {
database: InMemoryDatabase::new(),
clock: SystemClock,
http_client: MockHttpClient::new(),
};
let store = Store::new(OrderState::default(), OrderReducer, env);
// Send action
store.send(OrderAction::PlaceOrder { ... }).await;
// Wait for result
let state = store.state().await;
assert_eq!(state.status, OrderStatus::Placed);
}
When designing a new feature:
| Concept | Purpose | Key Trait/Type |
|---|---|---|
| State | Current snapshot | Clone + Send + Sync |
| Action | All inputs | Send + Sync (often enum) |
| Reducer | State transitions | Reducer trait |
| Effect | Side effect descriptions | Effect<Action> enum |
| Environment | Dependencies | Custom struct with trait bounds |
| Store | Runtime coordination | Store<S, A, E, R> |
specs/architecture.md for comprehensive designcomposable-rust-event-sourcing skillcomposable-rust-sagas skillcomposable-rust-web skillcomposable-rust-testing skillmodern-rust-expert skillRemember: The architecture is simple but powerful. State + Action + Reducer → (New State, Effects). The Store coordinates the feedback loop. Everything else builds on this foundation.