com um clique
write-back-testing
// Implement test utilities that write test data to the source system and validate end-to-end read cycles.
// Implement test utilities that write test data to the source system and validate end-to-end read cycles.
Create a pyproject.toml for a source connector and build it as an independent Python package.
Single step only: audit a completed connector — implementation, testing & simulator validation, artifacts, security smells, cross-doc consistency — and produce a scored markdown review report. Read-mostly; does not modify connector code.
Single step only: implement the connector in Python when the API doc already exists. Do NOT use for full connector creation — use the create-connector agent instead.
Single step only: run the per-source pytest suite, diagnose failures, and fix the connector or simulator until everything passes. Branches on mode={simulate|record}. Do NOT use for full connector creation — use the create-connector agent instead.
Set up authentication for a source connector — generate connector spec, collect credentials interactively, and validate auth.
Run the authenticate script to collect credentials from the user via a browser form.
| name | write-back-testing |
| description | Implement test utilities that write test data to the source system and validate end-to-end read cycles. |
| disable-model-invocation | true |
This step requires the write-back API documentation for the source system (typically found at src/databricks/labs/community_connector/sources/{source_name}/{source_name}_api_doc.md). If no write-back API doc is available, this step can be skipped.
Implement test utilities that write test data to the source system, then validate your connector correctly reads and ingests that data. This creates a complete write → read → verify cycle.
Only test against non-production environments. Write operations create real data in the source system.
Create tests/unit/sources/{source_name}/{source_name}_test_utils.py implementing the interface defined in tests/unit/sources/lakeflow_connect_test_utils.py.
The base class LakeflowConnectWriteTestUtils provides default no-op implementations for every method (returning empty lists and (False, [], {})). You only need to override the methods your source supports.
Use the write-back API documentation as your implementation guide:
Key Methods to Implement:
list_insertable_tables(): Return table names that support write operations (only those documented in the write-back API section)generate_rows_and_write(table_name, number_of_rows): Generate test data and write to the source system using documented endpoints. Returns (success, written_rows, column_mapping)list_deletable_tables(): Return table names that support delete testing — only for tables with cdc_with_deletes ingestion typedelete_rows(table_name, number_of_rows): Delete records and return deleted row info for verification via read_table_deletes. Returns (success, deleted_rows, column_mapping)Reference Implementation: See tests/unit/sources/example/example_test_utils.py for a complete working example.
The column_mapping Return Value:
The third element of the tuple returned by generate_rows_and_write and delete_rows maps field names in written_rows/deleted_rows to field paths in records returned by the connector's read_table / read_table_deletes. The test suite uses this to verify written values appear correctly when read back.
Common patterns:
{"order_id": "order_id"}{"email": "properties.email"} — source nests fields under a parent object (e.g., HubSpot){"language": "user_language"} — connector normalizes the field name (e.g., Qualtrics userLanguage → user_language)Use dot notation for nested paths. The test suite resolves them by traversing nested dicts.
Implementation Tips:
__init__ using the options dict (same credentials passed to the connector)test_, generated_)time.sleep(15) for Qualtrics, time.sleep(60) for HubSpot)Modify tests/unit/sources/{source_name}/test_{source_name}_lakeflow_connect.py to mix in the write-back test class before the base class and set the test_utils_class attribute. The write-back tests live in their own suite (test_write_back_suite.py) so they only run when explicitly mixed in — and most of them auto-skip in simulate mode (the default), so they don't run in CI:
from databricks.labs.community_connector.sources.{source_name}.{source_name} import {SourceName}LakeflowConnect
from tests.unit.sources.{source_name}.{source_name}_test_utils import LakeflowConnectWriteTestUtils
from tests.unit.sources.test_suite import LakeflowConnectTests
from tests.unit.sources.test_write_back_suite import LakeflowConnectWriteBackTests
class Test{SourceName}Connector(LakeflowConnectWriteBackTests, LakeflowConnectTests):
connector_class = {SourceName}LakeflowConnect
test_utils_class = LakeflowConnectWriteTestUtils
The MRO order matters — LakeflowConnectWriteBackTests must come first so its setup_class runs and chains via super() to the base.
Reference: See tests/unit/sources/example/test_example_lakeflow_connect.py.
Write-back tests that mutate the source (test_write_to_source,
test_incremental_after_write, test_delete_and_read_deletes)
auto-skip unless you set CONNECTOR_TEST_MODE=live. To run them
against a real source:
source .venv/bin/activate # or: python3.10 -m venv .venv && pip install -e ".[dev]"
CONNECTOR_TEST_MODE=live \
CONNECTOR_TEST_CONFIG_PATH=~/secrets/{source_name}.json \
pytest tests/unit/sources/{source_name}/test_{source_name}_lakeflow_connect.py -v
When LakeflowConnectWriteBackTests is mixed in and test_utils_class is set, these tests are added to the class:
| Test | What it does |
|---|---|
test_list_insertable_tables | Validates that every insertable table also appears in list_tables() |
test_write_to_source | Calls generate_rows_and_write for each insertable table, verifies the 3-tuple return shape, success=True, non-empty rows, and non-empty column_mapping |
test_incremental_after_write | Does an initial read to capture the offset, writes 1 row, creates a fresh connector instance, reads from the captured offset, and verifies the written row appears using column_mapping |
For connectors with cdc_with_deletes tables whose source API supports deleting records.
Methods to Override:
list_deletable_tables(): Return tables that support delete testing. Every table returned must have ingestion_type: "cdc_with_deletes" — the test suite validates this.
delete_rows(table_name, number_of_rows): Recommended approach:
generate_rows_and_write) to maintain data balance(success, deleted_rows, column_mapping) where deleted_rows contains primary key valuesdef delete_rows(self, table_name: str, number_of_rows: int) -> Tuple[bool, List[Dict], Dict[str, str]]:
self.generate_rows_and_write(table_name, number_of_rows)
# Fetch and delete existing records via source API
time.sleep(60)
return True, [{"id": "123"}], {"id": "properties.id"}
Tests added:
| Test | What it does |
|---|---|
test_list_deletable_tables | Validates that every deletable table appears in list_tables() and has ingestion_type: "cdc_with_deletes" |
test_delete_and_read_deletes | Deletes 1 row from the first deletable table, then verifies it appears in read_table_deletes results |
Write Operation Fails (400/403)
Incremental Sync Doesn't Pick Up New Data
time.sleep() after write to allow the source to commit (5–60s depending on the source)Column Mapping Errors (written row not found in read/delete results)
column_mapping to reflect transformations (nesting, renaming){"email": "properties.email"}{"language": "user_language"}Test Data Conflicts
uuid.uuid4().hex[:8] in generated IDs to avoid collisionstest_, generated_)