with one click
ingestion-pipeline-doctor-nodejs
Ingestion pipeline architecture overview and convention reference. Use when you need a quick orientation to the pipeline framework or want to know which doctor agent to use for a specific concern.
Menu
Ingestion pipeline architecture overview and convention reference. Use when you need a quick orientation to the pipeline framework or want to know which doctor agent to use for a specific concern.
Clarify how to visualize change over a time range before building a trend. Use whenever the user asks how much something changed, grew, dropped, improved, or regressed between two points or periods — "how much did X change from A to B", "before vs after", "start vs end", "week over week", "compare this month to last", "change over time" — or mentions a "slope chart" / "slopegraph". Two readings of "change" need different charts: the whole trend (a line, every interval) versus just the two endpoints (a slope, start vs end). Ask which they want, then render it. Not for choosing a saved insight ChartDisplayType in the insight editor.
Explore PostHog MCP intent clusters — agent goals grouped by semantic similarity, with each cluster's tool distribution and error rates. Use when the user asks "what are agents trying to do with the MCP?", "group the intents", "which goals fail most?", "what does each cluster route to?", wants to recompute the clustering, or pastes an MCP analytics intent-clustering URL.
Investigate individual PostHog MCP sessions — the sequence of tool calls a single agent made in one run, what it was trying to do, and where it went wrong. Use when the user asks "what did this MCP session do?", "show me the tool calls for session X", "what was the agent's goal?", "which sessions had errors?", or pastes an MCP analytics sessions URL.
Investigate the quality of PostHog MCP tool calls — error rates, latency, reach, and which tools are failing or slow. Use when the user asks "which MCP tool has the highest error rate?", "what's the slowest tool?", "which tools fail most often?", "how reliable is tool X?", wants a tool-quality matrix, or pastes an MCP analytics tool-quality / dashboard URL and asks what it shows.
Guides PostHog engineers through dashboard widget platform work — ship a new widget_type (WIDGET_REGISTRY, catalog, run_widgets, WidgetCard) or update a shipped type (config, query, layout, RBAC, tile filter bar, list footer, titleHref, throttles). Use for WidgetSpec, widget_specs/, widget-configs.zod.ts, hogli build:openapi, error_tracking_list, session_replay_list, widgetFilters, formatWidgetListCountFooter, widget_query_throttle, or WidgetCard composition. New types need widget-intake confirmation first. Not for MCP batch-add of existing types or adding tiles to a dashboard.
Assesses what a page's heatmap is telling you and recommends concrete changes. Pulls click / rageclick / scroll-depth data for a URL, names the hot elements by cross-referencing autocapture events on the same page, and can create a saved heatmap the user opens in PostHog, then summarizes the behavior and proposes improvements. TRIGGER when: user asks what a heatmap shows, why people aren't clicking something, where users rage-click, how far they scroll, what to change on a page based on heatmap/click data, or to 'analyze/assess/review the heatmap' for a URL. DO NOT TRIGGER when: the user only wants to create a saved heatmap screenshot with no analysis (use heatmaps-saved-create directly), or is asking about session replay in general (use investigating-replay).
| name | ingestion-pipeline-doctor-nodejs |
| description | Ingestion pipeline architecture overview and convention reference. Use when you need a quick orientation to the pipeline framework or want to know which doctor agent to use for a specific concern. |
Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.
The ingestion pipeline processes events through a typed, composable step chain:
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()
See nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts for the real implementation.
| What | Where |
|---|---|
| Step type | nodejs/src/ingestion/pipelines/steps.ts |
| Result types | nodejs/src/ingestion/pipelines/results.ts |
| Doc-test chapters | nodejs/src/ingestion/pipelines/docs/*.test.ts |
| Joined pipeline | nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts |
| Doctor agents | .claude/agents/ingestion/ |
| Test helpers | nodejs/src/ingestion/pipelines/docs/helpers.ts |
| Concern | Agent | When to use |
|---|---|---|
| Step structure | pipeline-step-doctor | Factory pattern, type extension, config injection, naming |
| Result handling | pipeline-result-doctor | ok/dlq/drop/redirect, side effects, ingestion warnings |
| Composition | pipeline-composition-doctor | Builder chain, concurrency, grouping, branching, retries |
| Testing | pipeline-testing-doctor | Test helpers, assertions, fake timers, doc-test style |
Steps: Factory function returning a named inner function. Generic <T extends Input> for type extension. No any. Config via closure.
Results: Use ok(), dlq(), drop(), redirect() constructors. Side effects as promises in ok(value, [effects]). Warnings as third parameter.
Composition: messageAware wraps the pipeline. handleResults inside messageAware. handleSideEffects after. groupBy + concurrently for per-entity work. gather before batch steps.
Testing: Step tests call factory directly. Use consumeAll()/collectBatches() helpers. Fake timers for async. Type guards for result assertions. No any.
Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.