with one click
core-sdk
// Kailash Core SDK — workflows, 110+ nodes, runtime, async, cycles, MCP, OpenTelemetry. Use for WorkflowBuilder + connections + runtime patterns.
// Kailash Core SDK — workflows, 110+ nodes, runtime, async, cycles, MCP, OpenTelemetry. Use for WorkflowBuilder + connections + runtime patterns.
[HINT] Download the complete skill directory including SKILL.md and all related files
| name | core-sdk |
| description | Kailash Core SDK — workflows, 110+ nodes, runtime, async, cycles, MCP, OpenTelemetry. Use for WorkflowBuilder + connections + runtime patterns. |
Comprehensive guide to Kailash Core SDK fundamentals for workflow automation and integration.
The Core SDK provides the foundational building blocks for creating custom workflows with fine-grained control:
from kailash.workflow.builder import WorkflowBuilder
from kailash.runtime.local import LocalRuntime
workflow = WorkflowBuilder()
workflow.add_node("NodeName", "id", {"param": "value"})
# Use context manager for proper resource cleanup (recommended)
with LocalRuntime() as runtime:
results, run_id = runtime.execute(workflow.build())
__del__ hardening, double-check locking, pool lifecycle, static analysis guardrailsWorkflowScheduler (kailash.runtime.scheduler) — cron + interval + one-shot scheduling for recurring workflow execution; APScheduler-backed SQLite jobstore. See 15-enterprise-infrastructure/scheduler-patterns.ExecutionTracker (kailash.runtime.execution_tracker) — per-node checkpoint primitive consumed by DurableRequest for resume-on-restart workflows. See 15-enterprise-infrastructure/durability-patterns.This is the single source of truth for node configuration. All other skills reference this section.
workflow.add_node(
"NodeClassName", # 1. Node type (PascalCase, string)
"unique_node_id", # 2. Unique ID (snake_case, string)
{ # 3. Configuration dict
"param1": "value",
"param2": 123
},
connections=[] # 4. Optional: input connections
)
| Parameter | Type | Description | Example |
|---|---|---|---|
| Node type | str | The node class name (PascalCase) | "LLMNode", "HTTPRequest" |
| Node ID | str | Unique identifier (snake_case) | "fetch_data", "process_1" |
| Config | dict | Node-specific configuration | {"url": "..."} |
| Connections | list | Optional input connections (4-tuple) | [("src", "out", "dst", "in")] |
Connection Methods:
# Method 1: add_connection (4-positional params - explicit)
workflow.add_connection("read_file", "content", "transform", "input")
# Method 2: connect (flexible API with keyword args)
workflow.connect("read_file", "transform", from_output="content", to_input="input")
# Method 3: connect with mapping (multiple outputs)
workflow.connect("node1", "node2", mapping={"content": "input", "meta": "metadata"})
workflow.add_node("NodeName", "id", {}).build() before executionworkflow.execute(runtime) - always runtime.execute(workflow.build())Both runtimes return identical structure: (results, run_id) tuple.
Both LocalRuntime and AsyncLocalRuntime inherit from BaseRuntime with shared capabilities:
BaseRuntime Foundation:
Shared Mixins:
AsyncLocalRuntime-Specific:
runtime.execute(workflow.build())workflow.add_node("NodeName", "id", {})(source_id, source_param, target_id, target_param)workflow.execute(runtime)Use this skill when you need to:
For complex workflows or debugging, invoke:
pattern-expert - Workflow patterns and cyclic debuggingtesting-specialist - Test workflow implementations