mit einem Klick
implement-connector
// Single step only: implement the connector in Python when the API doc already exists. Do NOT use for full connector creation — use the create-connector agent instead.
// Single step only: implement the connector in Python when the API doc already exists. Do NOT use for full connector creation — use the create-connector agent instead.
Create a pyproject.toml for a source connector and build it as an independent Python package.
Single step only: audit a completed connector — implementation, testing & simulator validation, artifacts, security smells, cross-doc consistency — and produce a scored markdown review report. Read-mostly; does not modify connector code.
Single step only: run the per-source pytest suite, diagnose failures, and fix the connector or simulator until everything passes. Branches on mode={simulate|record}. Do NOT use for full connector creation — use the create-connector agent instead.
Implement test utilities that write test data to the source system and validate end-to-end read cycles.
Set up authentication for a source connector — generate connector spec, collect credentials interactively, and validate auth.
Run the authenticate script to collect credentials from the user via a browser form.
| name | implement-connector |
| description | Single step only: implement the connector in Python when the API doc already exists. Do NOT use for full connector creation — use the create-connector agent instead. |
| disable-model-invocation | true |
Implement the Python connector for {{source_name}} that conforms exactly to the LakeflowConnect interface defined in
lakeflow_connect.py. The implementation should be based on the source API documentation in src/databricks/labs/community_connector/sources/{source_name}/{source_name}_api_doc.md produced by the research-source-api skill.
CRITICAL REQUIREMENT: Refer to src/databricks/labs/community_connector/sources/example/example.py for concrete examples of different patterns. You should follow the patterns demonstrated in the example connector.
For simple connectors, keeping everything in a single {source_name}.py file is perfectly fine. If the main file grows beyond 1000 lines, split it into multiple files for better maintainability.
get_table_schema, prefer StructType over MapType to enforce explicit typing. Avoid flattening nested fields. Prefer LongType over IntegerType to avoid overflow. Do not convert the JSON into dictionaries based on the schema before returning in read_table; return the raw parsed JSON and let the framework handle type coercion.ingestion_type returned from read_table_metadata is cdc or cdc_with_deletes, both primary_keys and cursor_field are required.ingestion_type is cdc_with_deletes, you must implement read_table_deletes(). This method should return records with at minimum the primary key fields and cursor field populated.None as the default value instead of an empty dictionary {}.table_options instead.For incremental ingestion of tables (cdc and append_only), the framework calls read_table repeatedly within a single trigger run. Each call produces one microbatch. A trigger run stops when the returned end_offset equals start_offset.
This termination condition is critical for Trigger AvailableNow. The connector runs under Trigger.AvailableNow, which issues microbatches until the source reports "no more data available" — signalled by read_table returning end_offset == start_offset. If the connector keeps advancing the offset (e.g. by chasing continuously-arriving new data), the trigger never terminates and the pipeline hangs. Every incremental connector must guarantee this condition is reached — see Guaranteeing Termination below.
max_records_per_batchEvery incremental table must support a max_records_per_batch table option. This caps how many records a single read_table call returns to the framework, giving Spark a bounded microbatch to process. Without it, a single call could return millions of rows and overwhelm the Spark driver.
This is orthogonal to the query-scoping strategies below. Regardless of whether you use a sliding window, a server-side limit, or neither, you must still respect max_records_per_batch by stopping record accumulation once the count is reached.
Exception — best-effort enforcement: When the API does not support ascending sort and a sliding time-window is used (see below), max_records_per_batch becomes best-effort. The window must be drained completely to avoid skipping or duplicating records, so the actual batch size is governed by window_seconds rather than a strict record count.
When a connector stops mid-way (due to max_records_per_batch or page limits), it checkpoints the max cursor value and resumes with since=<that value>. This only works if the API returns records sorted ascending by the cursor field. With descending order the max cursor comes from the first record, so since never advances — creating an infinite loop of re-fetching the same data.
When the API does not support ascending sort, use one of:
since and until so sort order within the window doesn't matter — the cursor advances to the window end regardless. In this mode max_records_per_batch is best-effort since the entire window must be drained.since filter for the next call. This avoids the sort-order problem entirely but only works for smaller datasets where fetching everything is practical.A common problem with incremental reads is that the query to the source API is too broad. For example, an API that accepts a since parameter but no until parameter forces the server to scan from since all the way to "now" — which can be slow or time out on large accounts. Two strategies exist to scope the query and avoid this:
Strategy A — Sliding time-window (for APIs with since/until or equivalent start/end time parameters):
Add a bounded end-time to the query. Instead of querying from since to now, query from since to since + window_seconds. Paginate all records within that window, then advance the cursor to the window end. If the window is empty, still advance the cursor so the next call slides forward. The window_seconds table option controls the window size. Provide a sensible default (e.g. 86400), but note that the optimal value depends on the data volume and it is up to the user to adjust it for their specific use case. For testing, always start with a very small value. Use this when the API supports both start and end time filters.
Critical: the first call must have a bounded since. Without it, the query is unbounded (everything up to until), which defeats the purpose of the window on large datasets. Resolve the starting cursor in this order:
start_offset["cursor"] from a previous read.start_timestamp table option the user provides. This is the fallback when auto-discovery is not possible.per_page=1 (or equivalent small limit) and no time filters; if the API sorts ascending, the first record is the oldest. If it sorts descending, use the Link header or equivalent to find the last page, then read the last record from that page. This adds 1–2 lightweight API calls but avoids requiring the user to know when data starts.Strategy B — Server-side limit (for APIs with limit/max_results/page_size parameters):
Pass a small limit parameter directly to the API so the server returns a bounded page. This keeps each individual API call small. Calls repeat until max_records_per_batch is reached.
Choosing a strategy: Start with the simplest approach (standard pagination + max_records_per_batch). If the source API doc warns about slow unbounded queries, or testing reveals timeouts/hangs, add Strategy A or B. If the API supports both time-range and limit parameters, prefer the sliding window (Strategy A) as it provides more predictable bounds.
When accumulating records to reach max_records_per_batch, how you stop depends on the table's ingestion type. The issue is what happens if you stop in the middle of a set of records that all share the exact same cursor timestamp.
For CDC / cdc_with_deletes tables (Client-side truncation is safe):
max_records_per_batch. The client decides exactly when to stop, and you use the last processed record's timestamp as the offset.For Append-Only tables (NO client-side truncation):
max_records_per_batch.> filtering).limit=50, directly to the API) so the server controls the boundary. You accumulate these small full pages until you reach or slightly exceed max_records_per_batch.The connector must ensure read_table eventually returns end_offset == start_offset to signal that all currently available data has been read. This happens in two cases:
datetime.now(UTC) in __init__ (self._init_ts). At the top of each incremental read, if start_offset already has a cursor >= _init_ts, return immediately with (iter([]), start_offset). Do not cap the cursor itself to _init_ts when yielding records; let the cursor be the last record's actual value so it advances naturally until it hits the short-circuit condition.If the source API uses timestamp-based cursors (e.g. since/updated_at), apply a lookback window at read time (subtract N seconds from the cursor when building the API query), not in the stored offset. This avoids drift in the checkpointed cursor while still catching concurrently-updated records. Important: The lookback subtraction must only be applied once at the beginning of the trigger (e.g. tracking state on self during the first read), rather than re-applying the lookback on every subsequent pagination read within the same trigger. Store the raw max_updated_at as the offset.
These options are critical for testing. Without them, tests may hang or take forever by attempting to read the entire dataset from the source.
max_records_per_batch in the test's dev_table_config.json (e.g., 100).window_seconds (e.g., 300 or 3600).limit (e.g., 5), or max_pages (e.g. 1)timeout parameter (e.g., timeout=20). Without it, a slow API hangs the connector and tests indefinitely with no error output.since/until etc.) to the API instead of fetching everything and filtering in Python. Client-side filtering still forces the server to scan the full dataset, which can cause timeouts on large accounts.date_range=all. Always scope queries to a bounded range.The framework exposes an experimental ingestion-agent operation surface
(spark.read.format("lakeflow_connect").option("operation", ...))
that's derived automatically from the LakeflowConnect methods you
implement. Do not subclass SupportsIngestionAgent, AgentOperation,
or any of the built-in op classes (ListObjectsOp, ReadTableOp,
…); do not override agent_operations(). The shape of the
customisation API is still being finalised. Just implement
LakeflowConnect — the agent surface comes along for free.
After completion, run python tools/scripts/merge_python_source.py {source_name} to generate the merged connector file _generated_{source_name}_python_source.py.