en un clic
walkeros-understanding-flow
// Use when learning walkerOS architecture, understanding data flow, or designing composable event pipelines. Covers Source→Collector→Destination pattern and separation of concerns.
// Use when learning walkerOS architecture, understanding data flow, or designing composable event pipelines. Covers Source→Collector→Destination pattern and separation of concerns.
| name | walkeros-understanding-flow |
| description | Use when learning walkerOS architecture, understanding data flow, or designing composable event pipelines. Covers Source→Collector→Destination pattern and separation of concerns. |
walkerOS follows a Source → Collector → Destination(s) architecture for composable, modular event processing.
Core principle: Separation of concerns. Each component has one job. Components are composable and replaceable.
[Source.before] → Sources → [Source.next] → Collector → [Dest.before] → Destinations → [Dest.next]
(Preprocessing) (Capture) (Pre-chain) (Processing) (Post-chain) (Delivery) (Post-push)
Consent-exempt: Post-consent:
- Decode - Validation - Event creation - Validation - Push - Audit logging
- Validate format - Enrichment - Enrichment - Enrichment - Send - Response parsing
- Authenticate - Redaction - Consent check - Routing - Store - Webhooks
A Flow combines components. You can:
See packages/core/src/types/flow.ts for the canonical interface.
// Conceptual structure (see source for full type)
interface Flow {
stores?: Record<string, Store>;
sources?: Record<string, Source>;
transformers?: Record<string, Transformer>;
destinations?: Record<string, Destination>;
collector?: Collector.InitConfig;
}
All components communicate via push functions:
| Component | Push Signature | Purpose |
|---|---|---|
| Source | push(input) → events | Capture external data |
| Collector | push(event) → void | Process and route |
| Destination | push(event, context) → void | Transform and deliver |
The elb() function is an alias for collector.push - used for component
wiring.
See packages/collector/src/flow.ts for
the startFlow function.
import { startFlow } from '@walkeros/collector';
const { collector, elb } = await startFlow({
stores: {
/* key-value storage, init first, destroy last */
},
sources: {
/* ... */
},
transformers: {
/* ... */
},
destinations: {
/* ... */
},
});
walkerOS uses a two-layer data model:
Any step can read and write arbitrary keys on ingest. The runtime manages
_meta:
_meta.hops - increments per step (safety valve at 256)_meta.path - ordered list of step IDs visited (path[0] = source ID)// In a transformer
push: async (event, context) => {
context.ingest.botScore = 0.95; // Write freely
context.ingest.geo = { country: 'DE' };
console.log(context.ingest._meta.path); // ['express', 'validate', 'enrich']
return { event };
};
Ingest is cloned per destination to prevent cross-contamination in parallel
processing. After a destination push, the response is available at
ingest._response.
Three pipeline components (Source / Transformer / Destination) plus Stores as
passive infrastructure. Stores are not a fourth pipeline stage - they're
key-value storage that other components consume via env. They have no push,
no next, no before; they sit alongside the pipeline rather than inside it.
$store.storeId in env values (bundled mode) or passed
directly as store instances (integrated mode)env.@walkeros/server-store-fs (async, filesystem),
@walkeros/server-store-s3 (async, S3-compatible),
@walkeros/server-store-gcs, @walkeros/server-store-sheets. The collector
ships a built-in in-memory cache tier — enable it on any store via
Flow.Store.cache instead of declaring a separate memory store.{
"stores": {
"data": { "package": "@walkeros/server-store-fs" }
},
"transformers": {
"fingerprint": {
"package": "@walkeros/server-transformer-fingerprint",
"env": { "store": "$store.data" }
}
}
}
See walkeros-understanding-stores for the full store interface and lifecycle.
| Concern | Handled By | NOT Handled By |
|---|---|---|
| Event capture | Sources | Collector, Destinations |
| Event structure | Event model | Components |
| Consent checking | Collector | Sources, Destinations |
| Transformation | Mapping system | Raw push calls |
| Delivery | Destinations | Sources, Collector |
Every step (source, transformer, destination) supports a small set of inline
primitives alongside its package wiring. cache, mapping, and consent are
the long-established ones; validate? is the newest. validate: declares
validation intent (format check, per-event JSON Schemas, or a single generic
schema) inline on a step, just like cache declares caching intent. It is a
declarative description: consumers (CLI tooling, MCP, custom runners) decide how
to enforce it. See
Website: Validate for
the full shape.
Every step picks exactly one of three forms to point at its implementation:
package: "<npm-package>" alone loads the package's default export. The
common case for sources and destinations.package: "<npm-package>" plus import: "<exportName>" loads a specific
named export from that package. Use when a package ships multiple named
exports or has no default export.code: { push, type?, init? } is inline implementation, no package wiring.
Useful for one-off custom logic in TypeScript flows.There is no string form of code. A bare step with no package, import, or
code is a valid no-op for all four step kinds (handy as a chain-only or
mapping-only transformer).
Transformers run at two points in the pipeline, configured via next and
before:
Runs after source captures event, before collector processing:
Bundled mode (flow.json):
{
"sources": {
"browser": {
"package": "@walkeros/web-source-browser",
"next": "enrich"
}
},
"transformers": {
"enrich": {
"package": "@walkeros/transformer-enricher",
"next": "redact"
},
"redact": {
"package": "@walkeros/transformer-redact"
}
}
}
Integrated mode (TypeScript):
sources: {
browser: {
code: sourceBrowser,
next: 'enrich'
}
},
transformers: {
enrich: {
code: transformerEnrich,
config: { next: 'redact' }
},
redact: {
code: transformerRedact
}
}
Note: In flow.json, next is at the reference level. The CLI bundler
automatically moves it into config.next for runtime - you don't need to handle
this yourself.
Each transformer can have its own before chain that runs before its push
function:
{
"transformers": {
"enrich": {
"before": "lookup",
"package": "@walkeros/transformer-enricher"
},
"lookup": {
"package": "@walkeros/transformer-lookup"
}
}
}
Runs after collector enrichment, before destination receives event:
Bundled mode (flow.json):
{
"destinations": {
"gtag": {
"package": "@walkeros/web-destination-gtag",
"before": "redact"
}
},
"transformers": {
"redact": {
"package": "@walkeros/transformer-redact"
}
}
}
Integrated mode (TypeScript):
destinations: {
gtag: {
code: destinationGtag,
before: 'redact'
}
},
transformers: {
redact: {
code: transformerRedact
}
}
destination.next)Runs after destination push completes. The push response is available at
context.ingest._response:
Bundled mode (flow.json):
{
"destinations": {
"api": {
"package": "@walkeros/server-destination-api",
"next": "auditLog"
}
},
"transformers": {
"auditLog": {
"package": "@walkeros/transformer-audit"
}
}
}
Integrated mode (TypeScript):
destinations: {
api: {
code: destinationApi,
next: 'auditLog'
}
},
transformers: {
auditLog: {
code: transformerAudit
}
}
source.before → consent-exempt preprocessing chainsource.next → starts pre-collector chaintransformer.before → pre-transform enrichment chaintransformer.next (flow.json) or transformer.config.next (runtime) → links
transformersdestination.before → starts post-collector chain per destinationdestination.next → post-push processing chain$flow)When a single flow.json defines multiple flows, any flow can pull values from
another flow's config block via $flow.<name>(.<path>)?. The most common case
is wiring a web flow's API destination to a server flow's deployed URL, so the
two stay in sync without duplication.
{
"version": 4,
"flows": {
"server": {
"config": {
"platform": "server",
"url": "https://collect.example.com"
},
"sources": {
"http": { "package": "@walkeros/server-source-express" }
}
},
"web": {
"config": { "platform": "web" },
"destinations": {
"api": {
"package": "@walkeros/web-destination-api",
"config": { "settings": { "url": "$flow.server.url" } }
}
}
}
}
}
validate warns on unresolved $flow references (lenient), bundle errors out
(strict), so production builds never ship with an empty cross-flow value.
Each step in a flow (source, transformer, destination) can ship step
examples -- structured { in, out } pairs that define expected input/output
behavior.
Steps sit at boundaries between arbitrary formats and walkerOS events:
in (HTTP request, DOM event) → walkerOS event outin → walkerOS event out (or false)in → arbitrary out (vendor API call)See using-step-examples for the full ASCII diagram and detailed explanation.
{
"destinations": {
"gtag": {
"package": "@walkeros/web-destination-gtag",
"config": { "measurementId": "G-XXXXXX" },
"examples": {
"purchase": {
"in": {
"name": "order complete",
"data": { "id": "ORD-123", "total": 149.97 }
},
"out": [
"event",
"purchase",
{ "transaction_id": "ORD-123", "value": 149.97 }
]
}
}
}
}
}
Step examples enable it.each testing, CLI simulation with --example, and
deep validation with --deep. See
using-step-examples for the complete
lifecycle.
This section defines which components can connect to which, and how chains are resolved at runtime. Use it as the canonical reference for building flow graphs, validating configurations, and rendering UI visualizations.
| From | To | Via Field | Valid? |
|---|---|---|---|
| Source | Transformer | source.before | Yes (consent-exempt) |
| Source | Transformer | source.next | Yes (pre-collector) |
| Source | Collector | (implicit, no next) | Yes |
| Source | Source | - | No |
| Source | Destination | - | No |
| Transformer | Transformer | transformer.before | Yes (pre-transform) |
| Transformer | Transformer | transformer.next | Yes (chain continues) |
| Transformer | Collector | (implicit, pre-chain ends) | Yes |
| Transformer | Destination | (implicit, post-chain ends) | Yes |
| Collector | Destination | (implicit, no before) | Yes |
| Collector | Transformer | destination.before | Yes (post-chain) |
| Destination | Transformer | destination.next | Yes (post-push) |
| Collector | Source | - | No |
source.next)source.next: "transformerId" or source.next: ["t1", "t2"]transformer.next: "nextId" walks forward; array stops walkingnext = source connects directly to collectordestination.before)destination.before: "transformerId" or
destination.before: ["t1", "t2"]transformer.next chain-walking logic as pre-chainsbefore = collector connects directly to destinationgetNextSteps)See
packages/collector/src/transformer.ts
for the implementation. getNextSteps is the public dispatch helper that
replaces the previous walkChain entry point.
transformer.next links until chain endsnext inside chain: appends array elements and stops walkingNote: getNextSteps is deterministic for the supplied event context. Static
analyzers without a real event can only enumerate reachability under "match may
pass or fail" speculation.
one operator)The next and before properties accept a Route
(string | Route[] | RouteConfig). A RouteConfig is a disjoint union:
each config sets at most one of next (gated link), one (first-match
dispatch), or many (all-match dispatch), never more than one. The one
operator enables conditional routing evaluated against ingest data and picks the
first entry whose match succeeds:
"next": {
"one": [
{ "match": { "key": "ingest.path", "operator": "prefix", "value": "/api" }, "next": "api-handler" },
{ "next": "default" }
]
}
one entries are evaluated in order, first match winsmatch always matches, use it as the fallbacksource.before, source.next,
transformer.before, transformer.next, destination.before, and
destination.nextcompileNext() and resolveNext()many operator)Use many when every matching entry should produce an independent parallel flow
(audit-while-process, multi-decoder fan-out). many terminates the main chain,
each branch runs to its own exit. Available only pre-collector. Post-collector
fan-out uses the destinations map.
"next": {
"many": [
{ "match": { "key": "event.consent.analytics", "operator": "eq", "value": "granted" }, "next": "ga4-pipeline" },
{ "next": "audit-log" }
]
}
A path is the multi-step chain through a flow's transformers section. A
pass-through step (short: pass) is a single step inside a path that
declares no code and no package; the runtime synthesizes its push from the
operative fields the step does declare.
Pass-through steps come in three variants:
before and/or next set. A named hop that shares a
chain across multiple call sites (avoids duplicating arrays).cache set. A dedup or short-circuit step.
cache.stop: true at a pre-collector position halts the pipeline.mapping: Mapping.Config set. A declarative
event-to-event transform that mutates the event in-flight.{
"transformers": {
"validateThenEnrich": {
"before": ["validate", "enrich"]
}
},
"destinations": {
"gtag": {
"package": "@walkeros/web-destination-gtag",
"before": "validateThenEnrich"
},
"meta": {
"package": "@walkeros/web-destination-meta",
"before": "validateThenEnrich"
}
}
}
Transformer step entries follow a closed schema: unknown top-level keys are
validation errors, and at least one operative field (code / package /
before / next / cache / mapping) must be set.
See
walkeros-understanding-transformers
for full depth on the three variants, the closed-schema rule, and the dual
semantic of mapping at the transformer position versus the destination
position.
A single transformer can appear in both pre-chains (source.next) and
post-chains (destination.before). The same transformer pool is shared; role
depends on which chain references it.
require)source.config.require: ["consent"] - source deferred until "consent" event
firesdestination.config.require: ["user"] - destination deferred until "user"
event firessource.config.mapping and source.config.consent -
applied before pre-chain; blocks event entirelydestination.config.mapping and
destination.config.consent - applied after post-chain; skips only that
destination, queues denied eventsdestination.before chains: intentional (e.g., shared
validator for monitoring)Package READMEs:
Source Files:
Documentation:
Use when adding read-through caching to a walkerOS store, memoizing a slow API/Sheets backing, composing multi-tier cache chains, or deduplicating concurrent store reads. Covers recipes, TTL choice, error policy, and observability counters.
Use when configuring walkerOS event mappings for specific use cases. Provides recipes for GA4, Meta, custom APIs, and common transformation patterns.
Use when transforming walkerOS events in the flow (source→collector or collector→destination), configuring data/map/loop/set/condition/policy, or using $code: syntax in JSON configs.
Use when bundling walkerOS flows, testing events with simulate/push, running local servers, validating configs, or configuring Flow JSON files.
Use when wiring `@walkeros/transformer-ga4` into a server flow, overriding default GA4 event mappings, dropping events, adding custom event keys, or troubleshooting GA4 Measurement Protocol decoding. Covers the `before`-chain wiring contract, configuration recipes, and per-field patching with extend/remove.
Use when writing or updating walkerOS documentation - README, website docs, or skills. Covers quality standards, example validation, and DRY patterns.