| name | di-agent-flow-pyflow |
| description | Complete API spec for pyflow, IBM's LLM-only Python DSL for authoring new batch or streaming flows on DataStage or StreamSets. The compact surface is built for high LLM authoring reliability — bootstrap here first, and fall back to the verbose engine-specific SDK only when pyflow cannot express a needed feature. |
Pyflow API Spec
The runtime provides q; do not import or instantiate. Every flow:
- Declares sources with
q.source() -- list only referenced columns, using exact names and types from asset metadata.
- Calls
q.name("<snake_case_name>") exactly once.
- Ends with exactly one sink:
q.output(frame), or q.write(frame, "symbol", operation="insert" | "overwrite" | "update") when writing to a destination asset.
Code must contain no imports or print().
Engine Targets
The caller passes the target engine to create_pyflow(engine=...); do not declare it in the code. The engine determines which Frame operations are allowed.
| Op | DataStage | StreamSets |
|---|
q.source() | any count | exactly one |
q.output() / q.write() | yes | yes |
.filter(), .sort() | yes | yes |
.lookup() | no | yes |
.tumble() / .slide().agg() | no | at most one |
.select() / .with_columns() | yes | yes |
.head() / .fetch(), .unique() | yes | no |
.union(), .intersect() | yes | no |
.group_by().agg() | yes | no |
.join(), .cross() | yes | no |
StreamSets flows must be a single linear chain:
q.source() -> [.filter() | .lookup()]* -> [.tumble()/.slide().agg()]? -> q.output() | q.write()
StreamSets windowed-agg measures support only .sum().
Symbols And Bindings
Strings passed to q.source(), .lookup(), and q.write() are local symbols. The caller binds each symbol to a catalog asset via create_pyflow(bindings=...); symbols need not match catalog names. Every used symbol must be bound.
Types
i8 i16 i32 i64 signed integers
f32 f64 floating point
string boolean text, true/false
date time timestamp temporal
Python literals auto-convert: int -> i64, float -> f64, str -> string, bool -> boolean. Never write nullable suffixes (?) in DSL code; suffixes appear only in catalog metadata.
q Namespace
q.source(symbol, {"col": "type", ...}) -> Frame
q.source(symbol, col="type", ...) -> Frame
q.name(name)
q.output(frame)
q.write(frame, symbol, operation="insert")
q.col(name) -> Expr
q.count_star() -> Expr
q.cast(value, type) -> Expr
q.when(cond).then(val)...
q.concat(*exprs) -> Expr
q.date_diff(d1, d2) -> Expr
q.strptime_time(expr, fmt) -> Expr
q.strftime(expr, fmt, tz?) -> Expr
Write Operations
q.write() supports row-level destination operations:
q.write(frame, "target")
q.write(frame, "target", operation="insert")
q.write(frame, "target", operation="overwrite")
q.write(frame, "target", operation="update")
operation: "insert" | "overwrite" | "update".
"overwrite" truncates the table before writing, so re-running a flow is idempotent. Use it when the destination should hold exactly this run's output (datastage only).
- Unsupported operations such as
"upsert" are rejected; do not approximate them with insert or update.
Expression Methods
Operators return Expr, not Python bools. Use &/|/~, never and/or/not. Parenthesize each comparison: (q.col("a") > 1) & (q.col("b") < 2).
== != > < >= <= comparison -> boolean
+ - * / arithmetic
& | ~ and / or / not
.alias(name)
.cast(type)
.sum()
.mean()/.avg() .count() .min() .max()
.is_in(v1, v2, ...)
.is_null() .is_not_null()
.asc() .desc()
.nulls_first() .nulls_last()
Conditional
q.when(cond).then(val)
q.when(cond).then(val).otherwise(else_val)
q.when(c1).then(v1).when(c2).then(v2).otherwise(else_val)
.str Accessor
.str.upper()
.str.contains(s) .str.starts_with(p) .str.ends_with(s)
.str.like(pattern) .str.replace(old, new)
.str.trim(chars?) .str.rtrim(chars?)
.str.substring(start_1based, length?)
Best Practice: Apply .str.trim() to string columns in final output results to remove leading and trailing whitespace, unless there is a clear requirement to preserve spacing or the user explicitly requests otherwise. Clean, trimmed final output is preferred by default.
Frame Methods
.filter(expr) -> Frame
.select(*exprs) -> Frame
.with_columns(*exprs) -> Frame
.sort(*col_refs) -> Frame
.head(count) -> Frame
.unique(*subset) -> Frame
.union(other) -> Frame
.intersect(other) -> Frame
Aggregates may appear only in .select() or .group_by().agg(). To filter on an aggregated value, aggregate first, then .filter(...).
No analytic window-over functions. Use .tumble() / .slide() for time-windowed aggregates on StreamSets.
Join [datastage]
a.join(b, on=, how="inner", suffix="_right") -> Frame
a.join(b, left_on=, right_on=, how="inner", suffix="_right") -> Frame
a.cross(b, suffix="_right") -> Frame
how: inner | left | right | outer | cross.
on= (same-name keys): right key columns are dropped.
left_on / right_on (different-name keys): both key columns are kept.
- Duplicate non-key right columns get
suffix (collisions stack: _right_right). Rename via q.col("x_right").alias(...).
Lookup [streamsets]
m.lookup(symbol, {col: type, ...}, on=, suffix="_right") -> Frame
m.lookup(symbol, {col: type, ...}, left_on=, right_on=, suffix="_right") -> Frame
m.lookup(symbol, col=type, ..., on=) -> Frame
- Enriches
m with columns read inline from the reference symbol. Do not declare the reference via a separate q.source().
- Reference columns are the reference stage's full schema: include
on= / right_on= key columns plus columns to pull through.
- Semantics: left-join-like. Unmatched rows are kept with reference columns as NULL. First match only. No
how=.
- Key and suffix rules match
.join() above.
Windowed Aggregates [streamsets]
m.tumble(length, group_by=?, tz=?, on=?).agg(*measures) -> Frame
m.slide(length, group_by=?, tz=?, on=?).agg(*measures) -> Frame
length: <number><unit> where unit is s | m | h | d (e.g. "30s", "15m", "1h").
group_by: str or list of column names; omit for one global row per window.
tz: IANA timezone. on: event-time column; omit to use processing time.
- Output columns:
[*group_by, window_start, window_end, *measure_aliases]; window_start / window_end are timestamp.
- Measures: the initial StreamSets windowing compiler supports only
.sum(). Each measure must be .alias()'d; no nesting.
Group-By And Aggregates-In-Select [datastage]
.group_by(*col_refs).agg(*measures) -> Frame
Inside .select(), mixing plain column refs with aggregates turns the plain refs into implicit grouping keys:
t.select(q.col("x").sum().alias("total"))
t.select("status", q.col("x").count().alias("n"))
For a computed grouping key, materialize it with .with_columns() first, then group by that column name.
Examples
DataStage aggregate: typed-literal filter, aggregate-in-select, then filter on the aggregated column:
orders = q.source("orders", {"region": "string", "amount": "f64", "orderdate": "date"})
q.name("top_regions_2024")
q.output(
orders
.filter(q.col("orderdate") >= q.cast("2024-01-01", "date"))
.select("region", q.col("amount").sum().alias("revenue"), q.count_star().alias("n_orders"))
.filter(q.col("n_orders") > 100)
.sort(q.col("revenue").desc())
.head(10)
)
DataStage chained joins; duplicate right-side names get "_right" and are renamed via .alias():
customer = q.source("customer", {"custkey": "i64", "name": "string", "nationkey": "i64"})
nation = q.source("nation", {"nationkey": "i64", "name": "string", "regionkey": "i64"})
region = q.source("region", {"regionkey": "i64", "name": "string"})
q.name("customer_geography")
q.output(
customer.join(nation, on="nationkey")
.select("custkey", "name", "regionkey", q.col("name_right").alias("nation"))
.join(region, on="regionkey")
.select("custkey", "name", "nation", q.col("name_right").alias("region"))
)
StreamSets lookup; reference schema is declared inline, not via a separate q.source():
orders = q.source("orders", {"cust_id": "i64", "amount": "f64"})
q.name("orders_with_customer")
q.output(
orders
.filter(q.col("amount") > 0)
.lookup("customer", {"cust_id": "i64", "name": "string"}, on="cust_id")
)
StreamSets windowed aggregate on an event-time column:
events = q.source("events", {"region": "string", "amount": "f64", "ts": "timestamp"})
q.name("revenue_15m")
q.output(
events
.tumble("15m", group_by="region", on="ts")
.agg(q.col("amount").sum().alias("revenue"))
)