원클릭으로
validate-incremental-sync
// Validate that ADME domain tables (wellbore, reservoir, rock_and_fluid) correctly implement CDC incremental sync — offset tracking, watermark filtering, and pagination termination.
// Validate that ADME domain tables (wellbore, reservoir, rock_and_fluid) correctly implement CDC incremental sync — offset tracking, watermark filtering, and pagination termination.
Set up authentication for the ADME OSDU connector — get an Azure bearer token and write dev_config.json for live testing.
Mechanical audit of the ADME OSDU connector — implementation correctness, test coverage, artifacts completeness, and security. Produces a scored report.
Run the AdmeOsduLakeflowConnect pytest suite, diagnose failures, and fix the connector or simulator until everything passes. Branches on mode={simulate|live}.
Validate that credentials for the ADME OSDU connector authenticate successfully against the real ADME API.
| name | validate-incremental-sync |
| description | Validate that ADME domain tables (wellbore, reservoir, rock_and_fluid) correctly implement CDC incremental sync — offset tracking, watermark filtering, and pagination termination. |
Confirm that:
read_table(table, None, {}) returns records and an offsetread_table(table, offset, {}) with the returned offset returns no new records (stable = done)modifyTime:[{watermark} TO *]) is correctly sent on watermarked readsOnly domain tables are CDC: wellbore, reservoir, rock_and_fluid.
Governance tables (legal_tags, entitlements) are snapshot — always return offset=None.
From connector/lakeflow/adme_osdu.py:
# After first page
new_offset = {"cursor": next_cursor, "watermark": max_wm}
# next_cursor: None if no next page from ADME cursor field
# max_wm: max modifyTime across returned records
On the second call with this offset:
cursor = None (no next page)watermark = "2024-03-17T12:15:00.000Z" (max from first batch)load_full = False (watermark is set)query = "modifyTime:[2024-03-17T12:15:00.000Z TO *]" via incremental_filter_templateend_offset == start_offset → donePYTHONPATH=. /Users/gokul.pillai/.ai-dev-kit/.venv/bin/pytest \
tests/unit/adme_osdu/test_adme_osdu_lakeflow_connect.py::TestAdmeOsduConnector::test_read_wellbore_twice_returns_same_stable_offset \
tests/unit/adme_osdu/test_adme_osdu_lakeflow_connect.py::TestAdmeOsduConnector::test_read_terminates \
-v
Both must pass.
import json, sys
sys.path.insert(0, '.')
sys.path.insert(0, 'tests/stubs')
with open('tests/unit/adme_osdu/dev_config.json') as f:
opts = json.load(f)
from connector.lakeflow.adme_osdu import AdmeOsduLakeflowConnect
c = AdmeOsduLakeflowConnect(opts)
for table in ('wellbore', 'reservoir', 'rock_and_fluid'):
# Full load
recs1, off1 = c.read_table(table, None, {})
all_recs = list(recs1)
print(f"{table}: {len(all_recs)} records, offset={off1}")
if not all_recs:
print(f" ⚠️ No records in {table} — skipping incremental check")
continue
# Incremental from watermark
recs2, off2 = c.read_table(table, off1, {})
new_recs = list(recs2)
if off2 == off1:
print(f" ✅ {table}: offset stable (no new records, as expected for same watermark)")
else:
print(f" ℹ️ {table}: new offset {off2} — {len(new_recs)} additional records found")
# Verify cursor field present
for r in all_recs[:3]:
assert 'modifyTime' in r, f"{table}: missing modifyTime in record"
print(f" ✅ modifyTime present in all sampled records")
Midpoint watermark test (live only):
import re
all_modify_times = sorted(r['modifyTime'] for r in all_recs if r.get('modifyTime'))
if len(all_modify_times) >= 2:
midpoint = all_modify_times[len(all_modify_times) // 2]
recs_mid, _ = c.read_table(table, {"cursor": None, "watermark": midpoint}, {})
mid_recs = list(recs_mid)
violations = [r for r in mid_recs if r.get('modifyTime', '') < midpoint]
if not violations:
print(f" ✅ Midpoint filter correct — no records before {midpoint}")
else:
print(f" ❌ {len(violations)} records before midpoint — incremental filter may be wrong")
| Check | Expected |
|---|---|
| First read returns > 0 records | ✅ |
Returned offset has cursor and watermark keys | ✅ |
modifyTime present in all domain records | ✅ |
| Second read with same offset returns stable offset | ✅ |
test_read_terminates completes in < 20 iterations | ✅ |
| No records before midpoint watermark (live) | ✅ |
| Symptom | Cause | Fix |
|---|---|---|
| Second read returns same records again | incremental_filter_template missing | Add it to _default_domain_config |
test_read_terminates loops infinitely | Mock not returning empty on incremental query | Check body.get("query") in _search_handler |
modifyTime missing | Wrong path in _flatten_record | Check r.get("modifyTime") — top-level field |
| Offset keys differ between reads | Inconsistent offset dict construction | Use same keys in _read_domain |