| name | dataflow |
| description | Kailash DataFlow — MANDATORY for DB/CRUD/bulk/migrations/multi-tenancy. Raw SQL/ORMs BLOCKED. |
Kailash DataFlow - Zero-Config Database Framework
DataFlow is a zero-config database framework built on Kailash Core SDK that automatically generates workflow nodes from database models.
Overview
- Automatic Node Generation: 11 nodes per model (@db.model decorator)
- Multi-Database Support: PostgreSQL, MySQL, SQLite (SQL) + MongoDB (Document) + pgvector (Vector Search)
- Enterprise Features: Multi-tenancy, multi-instance isolation, transactions
- Zero Configuration: String IDs preserved, deferred schema operations
- Developer Experience: Enhanced errors (DF-XXX codes), strict mode validation, debug agent, CLI tools
Quick Start
Express API (Recommended for Simple CRUD)
from dataflow import DataFlow
db = DataFlow("sqlite:///app.db", auto_migrate=True)
@db.model
class User:
name: str
email: str
active: bool = True
await db.initialize()
result = await db.express.create("User", {"name": "Alice", "email": "alice@example.com"})
user = await db.express.read("User", result["id"])
users = await db.express.list("User", {"active": True})
count = await db.express.count("User")
await db.express.update("User", result["id"], {"name": "Bob"})
await db.express.delete("User", result["id"])
result = db.express_sync.create("User", {"name": "Alice", "email": "alice@example.com"})
users = db.express_sync.list("User", {"active": True})
Workflow API (For Multi-Step Operations)
Use WorkflowBuilder only when you need multiple nodes with data flow between them.
from kailash.workflow.builder import WorkflowBuilder
from kailash.runtime.local import LocalRuntime
workflow = WorkflowBuilder()
workflow.add_node("User_Create", "create_user", {
"data": {"name": "John", "email": "john@example.com"}
})
with LocalRuntime() as runtime:
results, run_id = runtime.execute(workflow.build())
user_id = results["create_user"]["result"]
Generated Nodes (11 per model)
Each @db.model class generates:
{Model}_Create - Create single record
{Model}_Read - Read by ID
{Model}_Update - Update record
{Model}_Delete - Delete record
{Model}_List - List with filters
{Model}_Upsert - Insert or update (atomic)
{Model}_Count - Efficient COUNT(*) queries
{Model}_BulkCreate - Bulk insert
{Model}_BulkUpdate - Bulk update
{Model}_BulkDelete - Bulk delete
{Model}_BulkUpsert - Bulk upsert
Critical Rules
- ✅ String IDs preserved (no UUID conversion)
- ✅ Deferred schema operations (safe for Docker/async)
- ✅ Multi-instance isolation (one DataFlow per database)
- ✅ Result access:
results["node_id"]["result"]
- ❌ NEVER use truthiness checks on filter/data parameters (empty dict
{} is falsy)
- ❌ ALWAYS use key existence checks:
if "filter" in kwargs instead of if kwargs.get("filter")
- ❌ NEVER use direct SQL when DataFlow nodes exist
- ❌ NEVER use SQLAlchemy/Django ORM alongside DataFlow
Reference Documentation
Getting Started
Core Operations
Advanced Features
Data Fabric Engine
- dataflow-fabric-engine - External data sources (
db.source()), derived products (@db.product()), fabric runtime (db.start()), 5 source adapters, webhooks, SSRF protection, observability
Enterprise Features
Advanced Features
Developer Experience Tools
- dataflow-strict-mode - Build-time validation (4-layer, OFF/WARN/STRICT)
- dataflow-debug-agent - Intelligent error analysis (5-stage pipeline)
- ErrorEnhancer - Automatic error enhancement (40+ DF-XXX codes)
- Inspector API - Self-service debugging (18 introspection methods)
- CLI Tools - dataflow-validate, dataflow-analyze, dataflow-debug (5 commands)
Connection Pool & Monitoring
ML Integration
- dataflow-ml-integration - kailash-ml FeatureStore integration (ConnectionManager, point-in-time queries, polars interop)
Provenance & Audit
Cache Patterns
- cache-cas-fail-closed - CAS (compare-and-swap) fail-closed pattern when primitive can only be satisfied by one backend
Troubleshooting
Database Support Matrix
| Database | Type | Nodes/Model | Driver |
|---|
| PostgreSQL | SQL | 11 | asyncpg |
| MySQL | SQL | 11 | aiomysql |
| SQLite | SQL | 11 | aiosqlite |
| MongoDB | Document | 8 | Motor |
| pgvector | Vector | 3 | pgvector |
Not an ORM: DataFlow generates workflow nodes, not ORM models. Uses string-based result access and integrates with Kailash's workflow execution model.
Integration Patterns
With Nexus (Multi-Channel)
from dataflow import DataFlow
from nexus import Nexus
db = DataFlow(connection_string="...")
@db.model
class User:
id: str
name: str
nexus = Nexus(db.get_workflows())
nexus.run()
With Core SDK (Custom Workflows)
from dataflow import DataFlow
from kailash.workflow.builder import WorkflowBuilder
db = DataFlow(connection_string="...")
workflow = WorkflowBuilder()
workflow.add_node("User_Create", "user1", {...})
When to Use This Skill
Use DataFlow when you need to:
- Perform database operations in workflows
- Generate CRUD APIs automatically (with Nexus)
- Implement multi-tenant systems
- Work with existing databases
- Build database-first applications
- Handle bulk data operations
Related Skills
Support
For DataFlow-specific questions, invoke:
dataflow-specialist - DataFlow implementation and patterns
testing-specialist - DataFlow testing strategies (NO MOCKING policy)
- ``decide-framework
skill - Choose between Core SDK and DataFlow