| name | temporal-e2e-validate |
| description | Full end-to-end validation of Temporal distributed execution (Phases 2-5 + v1 routing + v2 queue options / worker-runtime profiles). Two modes: (1) pytest against a real Temporal server for detailed assertions on all test cases, and (2) true 3-process setup (server + separate worker process + submitter) that validates the actual deployment topology including cross-process serialization, LibraryCrate propagation, deferred hydration, concurrent isolation, image payload storage, and cross-worker graph tracing with GraphSpec assembly. Step 8 validates v1 per-activity routing. Step 9 validates v2 per-queue submitter options (timeouts, retry, rate-limit), per-handle option overrides, named worker-runtime profiles selected via `--profile`, and the strict `--task-queue` CLI typo check with "did you mean?" suggestion. Mode 1 also folds in the error-handling suite (activity error boundary, workflow error-report full chain, local parity arm); Mode 2 Tiers 13-16 validate that a worker-side failure — from an LLM, extract, image-gen, or batched child-workflow activity — carries a structured `ErrorReport` across the activity -> workflow -> submitter boundary. Use when the user says "validate temporal", "e2e temporal", "temporal regression", "temporal validation", "3-process test", "full temporal test", "validate phases", "image temporal", "queue options", "runtime profile", "validate error handling", "error report", "error propagation", or wants comprehensive verification that distributed execution works end-to-end. Also use proactively after changes to LibraryCrate, deferred hydration, ClassRegistry scoping, graph tracing, image generation activities, content storage, queue options resolution, worker-runtime profiles, error classification / `ErrorReport` / the Temporal error boundary, or Temporal workflow code.
|
| allowed-tools | ["Bash(tmux *)","Bash(curl *)","Bash(.venv/bin/pytest *)","Bash(.venv/bin/pipelex *)","Bash(ls *)","Bash(which *)","Bash(open *)","Bash(cat *)","Bash(.venv/bin/python *)","Bash(grep *)","Bash(sort *)","Bash(head *)","Bash(tail *)","Bash(temporal *)","Bash(jq *)","Bash(timeout *)","Bash(pkill *)","Bash(sleep *)","Bash(echo *)","Bash(seq *)"] |
Temporal E2E Validation Suite
This skill validates that Pipelex pipelines execute correctly when distributed across
Temporal workers — separate processes that receive serialized work, run pipes, and return
results. It covers the full chain: shipping pipe definitions to the worker (Phase 2),
deserializing dynamic concepts the worker has never seen (Phase 3), isolating concurrent
workflows so they don't corrupt each other (Phase 4), assembling an execution graph
from trace events emitted across workers (Phase 4.5), and verifying that image-heavy
pipelines (image generation, image-to-LLM flow) don't blow up Temporal's payload
limits by ensuring images are stored at the activity level.
Important: surface results immediately
After each command completes, immediately tell the user the outcome in plain text —
PASS/FAIL, what it means, output paths. Do NOT silently move on to the next command.
The user sees collapsed tool outputs by default and relies on your text messages.
Use multi-line formatting with clean indentation — never cram everything onto one line:
Tier 1 PASS
The worker received the pipe definitions and executed a 2-step sequence correctly.
Output: results/native_text_sequence_output_01/
Graph: results/native_text_sequence_output_01/reactflow.html
For failures, include the error message and what it means:
Tier 2 FAIL
KajsonDecoderError: Class 'Greeting' not found
The worker tried to deserialize WorkingMemory before registering dynamic concepts.
Check worker output: tmux capture-pane -t temporal-worker-router -p -S -200
(or tmux capture-pane -t temporal-worker-runner / temporal-worker depending on setup)
Timeouts policy — REQUIRED for every command
A hung Temporal test will lock the session and consume the agent's wall time until the
user interrupts. Workers, workflow polls, and gRPC retries will happily wait forever.
Every .venv/bin/pytest and .venv/bin/pipelex run bundle invocation in this skill
runs under a hard shell timeout and, for pytest, under --timeout=N per test as well.
pytest --timeout=N alone is not sufficient — it caps a single test, not fixture
setup/teardown, so a broken session-scoped fixture can still hang forever without the
shell cap.
Defaults to use (apply unless a step explicitly overrides):
| Command kind | Shell cap | Per-test cap |
|---|
Single dry-run pipeline (pipelex run bundle ... --dry-run --mock-inputs) | timeout 120 | — |
| Two concurrent dry-run pipelines (isolation pairs) | timeout 180 | — |
Live pipeline (pipelex run bundle ... --temporal without --dry-run) | timeout 600 | — |
Repro/diagnostic script (.venv/bin/python <path-to-script>.py) | timeout 120 | — |
Pytest folder (dry, library_crate/ or single module) | timeout 300 | --timeout=90 |
| Pytest single test class/case (dry) | timeout 180 | --timeout=60 |
| Pytest folder (live, real LLM/img-gen) | timeout 900 | --timeout=300 |
Form to use everywhere:
timeout 300 .venv/bin/pytest -x -v tests/integration/pipelex/temporal/library_crate/ \
-m temporal --temporal-server local --timeout=90 2>&1 | tail -80
timeout 120 .venv/bin/pipelex run bundle <bundle> --pipe <pipe> \
--temporal --dry-run --mock-inputs --no-logo --graph 2>&1 | tail -15
echo "EXIT=$?"
If a command times out (exit 124 from timeout) or behaves as if hung, the response is
ALWAYS the same:
- Capture worker output:
tmux capture-pane -t temporal-worker-router -p -S -200
and tmux capture-pane -t temporal-worker-runner -p -S -200 (or
temporal-worker for the single-worker setup). Quote the actual error.
- Kill zombies before retrying:
pkill -f "pipelex.temporal.worker_cli",
pkill -f "pytest.*temporal". Stale workers from a previous timed-out run
will swallow new dispatches and make every subsequent step look hung.
- Diagnose root cause from the captured output. Do not raise the timeout
and rerun blind — that wastes another minute and produces the same hang. The
"Interpreting failures" table at the bottom of this skill maps common error
strings to causes.
Prerequisites
which tmux && echo "ok" || echo "MISSING: brew install tmux"
which temporal && echo "ok" || echo "MISSING: brew install temporal"
.venv/bin/python -c "import temporalio; print(f'temporalio {temporalio.__version__}')"
Mode 1: Automated Test Suite (pytest)
Runs integration tests against a real Temporal server (localhost:7233), but the worker
runs in-process — no separate worker needed. This is the fast path for catching
regressions.
Step 1: Ensure the Temporal dev server is running
curl -s http://localhost:8233 > /dev/null 2>&1 && echo "running" || echo "not running"
If not running:
tmux new-session -d -s temporal-server 'temporal server start-dev'
sleep 3
curl -s http://localhost:8233 > /dev/null 2>&1 && echo "running" || echo "FAILED"
Do NOT start if already running — it will fail with a bind error.
Step 2: Run the tests
The conftest auto-registers the Pipelex custom search attributes listed in
pipelex.temporal.config_temporal.BUILTIN_SEARCH_ATTRIBUTES on the first
session that hits a fresh dev server. No manual setup needed.
Dry mode (fast, no LLM costs):
timeout 300 .venv/bin/pytest -v tests/integration/pipelex/temporal/library_crate/ \
-m temporal --temporal-server local --timeout=90 2>&1 | tail -80
Live mode (real LLM calls):
timeout 900 .venv/bin/pytest -v tests/integration/pipelex/temporal/library_crate/ \
-m temporal --temporal-server local --pipe-run-mode live --timeout=300 2>&1 | tail -80
Note on -x: omitted here so we see the full failure surface, not just the first hang.
If you want the classic stop-on-first-failure behavior, add -x back — but expect to
diagnose root cause from the first failure before retrying.
Step 2b: Run the error-handling tests
These verify that a Pipelex failure crosses the Temporal boundary carrying a
structured ErrorReport (error_category / error_domain / retryable /
model / provider / user_action), with local/Temporal parity. They live
outside library_crate/, so Step 2 does not pick them up. The LLM call is
mocked in every one of these tests — no API keys, no inference cost.
timeout 300 .venv/bin/pytest -v \
tests/integration/pipelex/temporal/test_activity_error_boundary.py \
tests/integration/pipelex/temporal/test_wf_pipe_run_failure_path.py \
tests/integration/pipelex/temporal/test_workflow_error_report_full_chain.py \
-m temporal --temporal-server local --timeout=90 2>&1 | tail -60
timeout 180 .venv/bin/pytest -v \
tests/integration/pipelex/error_handling/ --timeout=60 2>&1 | tail -40
The local arm is not marked temporal, so it must run as its own
invocation — a -m temporal filter would deselect it.
Step 3: Report results
Tell the user what each suite validated and whether it passed. Here is what the suites test:
- TestWfLibraryCrate — Can a worker receive a "crate" (a portable bundle of pipe
definitions) and execute a PipeSequence from it? Also tests that submitting without a
crate correctly fails with
PipeNotFoundError.
- TestWfDeferredHydration — When a pipe creates a brand-new concept type at runtime
(e.g.
Greeting with message and language fields), can the worker deserialize
WorkingMemory that contains instances of that concept, even though the worker has never
seen the class before?
- TestWfConcurrentConceptIsolation — Two workflows run simultaneously on the same
worker, both defining a concept called
Result — but with different fields (score, label
vs value, confidence, is_valid). Does each workflow get the right version, or do they
clobber each other?
- TestWfConcurrentPipeIsolation — Same idea, but for pipes: two workflows both define
a pipe called
shared_step with different prompts. Does each workflow execute its own
version?
- TestWfMultiConceptIsolation — Worst case: two workflows define two overlapping
concepts each (
Profile + Summary) with incompatible structures, running across 6
concurrent workflows.
- TestWfPipeParallel — A PipeParallel controller dispatches branches as concurrent
child workflows. Do the branches execute independently and merge correctly?
- TestWfPipeCondition / PipeBatch / PipeCompose / CombinedPipeline — Controller
coverage for condition routing, fan-out/fan-in, composition, and nested controller
combinations.
For Step 2b (error-handling), here is what the suites validate:
- TestActivityErrorBoundary — A real activity failure (LLM and non-LLM) is caught
by the
convert_pipelex_errors boundary and surfaces a classified ErrorReport on
the workflow side, with the correct non_retryable flag derived from the error
category.
- TestWfPipeRunFailurePath — When the pipe router fails inside the workflow,
delivery still fires with
DeliveryStatus.FAILED (and pipe_output unset), then the
failure re-raises.
- TestWorkflowErrorReportFullChain — The
ErrorReport survives the full
activity → workflow → submitter Temporal round-trip with error_category /
retryable / model / provider / user_action intact — the recovered report
carries the real failure message, not the generic "Failed to execute workflow".
- TestErrorReportLocalFullChain — The local (non-Temporal) parity arm: the same
failing pipe run through
PipeRouter directly. Both arms assert the same
ErrorReportParityTestData constants, so local/Temporal ErrorReport parity holds
by construction.
Known xfails: Some controller execution tests fail in dry-run mode because
StuffArtefact objects can't serialize through Temporal's data converter. The crate
structure tests pass. These xfails will resolve once StuffArtefact serialization is fixed.
The test test_missing_crate_fails_pipe_resolution is a negative test — it intentionally
submits without a crate. The Temporal warning Failed activation on workflow with
RuntimeError: No current library set is expected.
Mode 2: True 3-Process Validation
The real deployment test: three separate OS processes — Temporal server, worker, and
submitter — with no shared memory. The worker has its own Python runtime, its own
sys.modules, its own ClassRegistry. This is the only way to catch bugs an in-process
worker (Mode 1) cannot — e.g. the worker failing to deserialize a PipeJob because concept
classes aren't registered, the Kajson decoder bypassing class lookup, or the Temporal
data converter silently dropping fields.
Mode 2 is a battery of tiers and scenarios. It is long, and most invocations need only
part of it — so the procedure lives in reference files under references/, each
loaded only for the work at hand. Read them as needed:
| Reference file | Covers | Read it when |
|---|
references/mode-2-setup.md | Steps 1–2: Temporal server + worker processes; run-mode (dry vs live) | Always — start here. Every other Mode 2 file assumes the server + worker(s) are already up. |
references/mode-2-tiers.md | Steps 3–7: Tiers 1–14 (sequence, hydration, parallel, image gen/flow, isolation, object gen, extract, CV batch, error propagation), codec, final report | Validating core distributed execution, or running the full suite. |
references/routing-battery.md | Step 8: v1 activity_queues per-activity / per-handle routing (Tiers 10a–10c) | Validating that activity_queues isolates workers. Opt-in. |
references/queue-options-battery.md | Step 9: v2 per-queue options, per-handle overrides, rate limits, worker-runtime profiles, CLI typo check (Scenarios A–F) | Validating queue options / worker-runtime profiles. Opt-in. |
Choosing what to run — match the request, then read only those files:
- "validate temporal" / "full temporal test" / a broad regression check →
mode-2-setup.md, then mode-2-tiers.md; offer the two batteries as opt-in extras.
- "validate temporal error handling" / "error report" / "error propagation" → Mode 1 Step 2b above (the precise pytest assertions) plus
mode-2-setup.md + mode-2-tiers.md Step 5f (Tiers 13–16, cross-process).
- "queue options" / "runtime profile" / "routing" →
mode-2-setup.md, then routing-battery.md and/or queue-options-battery.md.
- Anything image-related →
mode-2-setup.md + mode-2-tiers.md Tiers 4–5, run live — dry-run cannot surface payload-size bugs.
The Timeouts policy and the "surface results immediately" rule above apply to
every command in every reference file. The Interpreting failures table and Bundle
reference below cover all of Mode 2 — keep them in mind while running any reference file.
Cleanup
Propose these to the user — do NOT run them automatically:
- Kill tmux sessions:
tmux kill-session -t temporal-worker-router / tmux kill-session -t temporal-worker-runner (or tmux kill-session -t temporal-worker if you used the single full worker) / tmux kill-session -t temporal-server. If any of Tiers 13–16 ran, also kill temporal-worker-err and confirm .env is clean (grep tier-err .env returns nothing). A blanket pkill -f "pipelex.temporal.worker_cli" clears any stray worker process.
- Clean results directory:
rm -rf results/
- Clean trace files:
rm -rf .pipelex/traces/
- Remove temporary override if still present:
.venv/bin/python -c "from pathlib import Path; Path('.pipelex/pipelex_temporary_override.toml').unlink(missing_ok=True)"
Leave the server running if the user plans to iterate.
Interpreting failures
| Error | What it means |
|---|
PipeNotFoundError on worker | The LibraryCrate didn't arrive or wasn't loaded — the worker can't find the pipes it's supposed to run |
KajsonDecoderError: Class 'X' not found | The worker tried to deserialize data containing a dynamic concept before registering that concept's class — hydration order bug |
RuntimeError: Failed decoding arguments | Temporal's data converter can't deserialize the PipeJob on the worker — usually a serialization format issue |
WorkflowFailureError wrapping TemporalError | The pipe itself failed during execution — read the inner error for the real cause |
Submitter shows "Failed to execute workflow ..." with no inner detail (Tier 13–16) | The ErrorReport did not cross the boundary — submitter-side report recovery (recover_error_report / WorkflowExecutionError) is broken, or the activity bridge didn't pack the report into ApplicationError.details |
| Tier 13–16 run exits 0 (success) instead of failing | A healthy worker is still polling temporal_task_queue and stole the activity (kill all workers, keep only temporal-worker-err); or the worker booted before .env was tampered — it captures env at boot, so it must be started after the cat >> .env step and before .env is restored |
AssertionError: StructuredContent missing field | Per-workflow ClassRegistry isolation failed — the worker used the wrong concept class (from another workflow's definitions) |
| Submitter hangs indefinitely | The worker crashed during deserialization — check tmux capture-pane -t temporal-worker-router -p -S -200 (and the runner session, or temporal-worker for the single-worker setup) |
| Both concurrent jobs succeed but wrong data | ContextVar leak between workflows — per-workflow scoping is broken, one workflow's class definitions bled into the other |
No reactflow.html generated | GraphSpec assembly failed — either tracing is disabled in pipelex.toml or NDJSON events weren't emitted by the worker |
PayloadSizeWarning in worker logs | Image data (base64) is being passed inline through Temporal payloads instead of being stored at the activity level — the fix is to call storage in the image generation activity before returning results |
NotImplementedError during image generation | The image generation or content storage path isn't wired up for the Temporal execution path — activity-level storage is missing |
Payload too large / DataConverterError on image pipes | Raw image bytes exceed Temporal's payload size limit (~2MB default) — images must be stored and referenced by URI, not passed inline |
ImageContent missing or has no uri field | The image generation activity returned raw image data instead of a stored ImageContent with a storage URI |
| Codec enabled but no storage files | Payloads are below the threshold (1MB default) — expected in dry-run mode where mock data is small. Run live mode for realistic payload sizes |
FileNotFoundError in codec decode | The codec is trying to load a payload from storage that doesn't exist — storage root path may be misconfigured or files were cleaned up mid-run |
Bundle reference
Crate/isolation bundles — in tests/integration/pipelex/temporal/library_crate/:
| Bundle | What it tests | Main pipe |
|---|
native_text_sequence.mthds | Basic crate propagation (native Text, 2 steps) | native_text_sequence |
dynamic_concept_sequence.mthds | Deferred hydration (Greeting concept created at runtime) | dynamic_greeting_sequence |
conflict_concept_alpha.mthds | Concept isolation — Result(score, label) | alpha_pipeline |
conflict_concept_beta.mthds | Concept isolation — Result(value, confidence, is_valid) | beta_pipeline |
conflict_pipe_alpha.mthds | Pipe isolation — shared_step with alpha prompt | pipe_alpha_pipeline |
conflict_pipe_beta.mthds | Pipe isolation — shared_step with beta prompt | pipe_beta_pipeline |
multi_concept_alpha.mthds | Multi-concept isolation — Profile(name, age) + Summary(headline, body) | multi_alpha_pipeline |
multi_concept_beta.mthds | Multi-concept isolation — Profile(title, department, level) + Summary(content) | multi_beta_pipeline |
temporal_parallel.mthds | PipeParallel — concurrent child workflows (ToneAnalysis + LengthAnalysis) | temporal_parallel_sequence |
temporal_batch.mthds | PipeBatch — fan-out to per-item child workflows | temporal_batch_sequence |
temporal_condition.mthds | PipeCondition — conditional routing via child workflow | temporal_condition_sequence |
temporal_compose.mthds | PipeCompose — operator composition + deferred hydration | temporal_compose_sequence |
temporal_combined.mthds | Nested PipeParallel + PipeCondition | temporal_combined_pipeline |
large_payload_sequence.mthds | Codec stress test — 3-step verbose sequence accumulating large WorkingMemory | large_payload_sequence |
cv_batch_screening.mthds | Deeply-nested controller stack (PipeSequence -> PipeSequence + PipeBatch -> PipeSequence) over PipeExtract + PipeLLM. Inputs JSON: cv_batch_screening_inputs.json | batch_analyze_cvs_for_job_offer |
Image payload bundles — in tests/integration/pipelex/pipes/pipelines/:
| Bundle | What it tests | Main pipe |
|---|
crazy_image_generation.mthds | Image generation pipeline — LLM imagines scene + PipeImgGen renders it, custom ImagePrompt concept | generate_crazy_image |
test_image_out_in.mthds | Image flow — PipeImgGen generates image, then PipeLLM (vision) describes it | image_out_in |