with one click
plugin-development
// Create and update DSOA plugins — full development lifecycle from planning through validation
// Create and update DSOA plugins — full development lifecycle from planning through validation
| name | plugin-development |
| description | Create and update DSOA plugins — full development lifecycle from planning through validation |
| license | MIT |
| compatibility | opencode |
| metadata | {"audience":"developers"} |
Use this skill when creating a new plugin, modifying an existing plugin, or reviewing plugin-related changes for the Dynatrace Snowflake Observability Agent (DSOA).
Plugin development follows a structured sequence. The first two phases are recommended but not enforced by this skill; phases 3-7 are mandatory for every plugin.
| Phase | Name | Output |
|---|---|---|
| 1 | Product planning (recommended) | Use cases, executive summary, DPO theme mapping |
| 2 | Implementation plan (recommended) | Ordered task list, reviewer-approved decisions |
| 3 | Scaffolding | Directory triad, boilerplate files |
| 4 | SQL & Python implementation | Views/procedures, Python class, config, semantics |
| 5 | Testing | Fixtures, mocked tests, all disabled_telemetry combos |
| 6 | Documentation | readme.md, config.md, USECASES.md entries, CHANGELOG/DEVLOG |
| 7 | Build, deploy, validate | build.sh, deploy to test-qa, verify telemetry in Dynatrace |
After plugin validation, the next step is typically dashboard/workflow creation
(see dynatrace-dashboard and dynatrace-workflow skills).
Before writing code, answer these questions:
docs/USECASES.md.Store planning artifacts in .github/context/proposals/.
Create an ordered task breakdown with:
disabled_telemetry combos, edge cases)Store alongside the proposal. Get approval before Phase 3.
Every plugin is a triad: Python module + SQL directory + config directory.
src/dtagent/plugins/
{name}.py
{name}.sql/
init/ # optional: ACCOUNTADMIN setup
admin/ # optional: admin-scope scripts
0xx_*.sql # views, procedures (prefix 0-69)
801_{name}_task.sql # task (fast schedule)
802_{name}_*_task.sql # optional: second task (deep schedule)
8xx_{name}_grants_task.sql # optional: admin grants task
901_update_{name}_conf.sql # config update procedure
{name}.config/
{name}-config.yml
instruments-def.yml
bom.yml
readme.md
config.md # optional: extended config docs
| Component | Convention | Example (snowpipes) |
|---|---|---|
| Plugin name | snake_case | snowpipes |
| Python file | {name}.py | snowpipes.py |
| Python class | {CamelCase}Plugin | SnowpipesPlugin |
PLUGIN_NAME | lowercase with underscores | "snowpipes" |
| SQL directory | {name}.sql/ | snowpipes.sql/ |
| Config directory | {name}.config/ | snowpipes.config/ |
| Config YAML | {name}-config.yml | snowpipes-config.yml |
| SQL objects | UPPERCASE | V_SNOWPIPES_INSTRUMENTED |
| Task name | TASK_DTAGENT_{UPPER} | TASK_DTAGENT_SNOWPIPES |
| Config procedure | UPDATE_{UPPER}_CONF() | UPDATE_SNOWPIPES_CONF() |
| Semantic fields | snowflake.{domain}.{field} | snowflake.pipe.name |
Create the directory structure:
mkdir -p src/dtagent/plugins/{name}.sql
mkdir -p src/dtagent/plugins/{name}.config
touch src/dtagent/plugins/{name}.py
Use a VIEW for straightforward data collection (most plugins). Use a procedure only when you need error handling, temp tables, conditional logic, or multi-step processing.
Required columns for log/metric plugins:
| Column | Type | Purpose |
|---|---|---|
TIMESTAMP | TIMESTAMP_LTZ | When data was collected/event occurred |
_MESSAGE | VARCHAR | Log content (auto-mapped to content in Dynatrace) |
DIMENSIONS | OBJECT | Low-cardinality fields for grouping; used with metrics |
ATTRIBUTES | OBJECT | High-cardinality context; NOT sent with metrics |
METRICS | OBJECT | Numerical measurements |
Optional columns:
| Column | Type | Purpose |
|---|---|---|
| Identifier cols | VARCHAR | Reference/logging (e.g. PIPE_NAME) |
EVENT_TIMESTAMPS | OBJECT | Timestamp fields that generate events |
Additional columns for span plugins:
| Column | Type | Purpose |
|---|---|---|
QUERY_ID | VARCHAR | Unique span ID |
PARENT_QUERY_ID | VARCHAR | Parent span (for traces) |
START_TIME / END_TIME | NUMBER | Epoch nanoseconds |
NAME | VARCHAR | Span name |
STATUS_CODE | VARCHAR | 'OK', 'ERROR', 'UNSET' |
SQL template:
use role DTAGENT_OWNER; use database DTAGENT_DB; use warehouse DTAGENT_WH;
create or replace view DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED as
with cte_source as (
select * from SNOWFLAKE.ACCOUNT_USAGE.SOME_VIEW
where SOME_TIME > DTAGENT_DB.STATUS.F_LAST_PROCESSED_TS('{name}')
)
select
current_timestamp() as TIMESTAMP,
ENTITY_NAME,
concat('Entity: ', ENTITY_NAME, ' in ', DB_NAME) as _MESSAGE,
object_construct(
'db.namespace', DB_NAME,
'snowflake.warehouse.name', WH_NAME
) as DIMENSIONS,
object_construct(
'snowflake.entity.id', ENTITY_ID,
'snowflake.entity.comment', COMMENT
) as ATTRIBUTES,
object_construct(
'snowflake.entity.size', SIZE_BYTES,
'snowflake.entity.rows', ROW_COUNT
) as METRICS
from cte_source;
grant select on view DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED to role DTAGENT_VIEWER;
When using a procedure instead:
create or replace procedure DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED()
returns table (TIMESTAMP timestamp_ltz, ...)
language sql
execute as caller
as $$
declare
c_result cursor for
with cte_data as (select ...)
select ... from cte_data;
begin
open c_result;
return table(resultset_from_cursor(c_result));
exception
when statement_error then
SYSTEM$LOG_ERROR(SQLERRM);
return table(select null as TIMESTAMP);
end;
$$;
grant usage on procedure DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED() to role DTAGENT_VIEWER;
SQL pitfalls:
snow sql CLI misparses cursor field access (r.name) inside $$ blocks.
Always capture into LET variables first.F_LAST_PROCESSED_TS('{name}') for incremental data to avoid duplicates.Single-schedule (most plugins):
use role DTAGENT_OWNER; use database DTAGENT_DB; use warehouse DTAGENT_WH;
create or replace task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
warehouse = DTAGENT_WH
schedule = 'USING CRON 0 */12 * * * UTC'
allow_overlapping_execution = FALSE
as
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}'));
grant ownership on task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
to role DTAGENT_VIEWER revoke current grants;
grant operate, monitor on task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
to role DTAGENT_VIEWER;
Dual-schedule (context-selective, like snowpipes):
-- 801: Fast task (every 5 min) — lightweight contexts only
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}:{fast_context}'));
-- 802: Deep task (hourly) — heavy contexts
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}:{ctx1},{ctx2}'));
The syntax '{plugin}:{context1},{context2}' triggers only the listed contexts.
Without the colon, all contexts run.
use role DTAGENT_OWNER; use schema DTAGENT_DB.CONFIG; use warehouse DTAGENT_WH;
create or replace procedure DTAGENT_DB.CONFIG.UPDATE_{UPPER}_CONF()
returns text language SQL execute as caller
as $$
begin
-- Single schedule:
call DTAGENT_DB.CONFIG.UPDATE_PLUGIN_SCHEDULE('{name}');
-- Dual schedule (pass additional schedule suffixes):
-- call DTAGENT_DB.CONFIG.UPDATE_PLUGIN_SCHEDULE('{name}', ARRAY_CONSTRUCT('history', 'grants'));
return '{name} plugin config updated';
exception
when statement_error then SYSTEM$LOG_WARN(SQLERRM); return SQLERRM;
end;
$$;
Plugins operating on user-created objects need include/exclude patterns
(DB.SCHEMA.OBJECT with % wildcards).
In views — compare full QUALIFIED_NAME against raw pattern VALUE:
where QUALIFIED_NAME LIKE ANY (select include_pattern from cte_includes)
and not QUALIFIED_NAME LIKE ANY (select exclude_pattern from cte_excludes)
In admin grant procedures — match at the same tier as the grant:
| Tier | Suppress when |
|---|---|
| DB-level grant | Exclude's split_part(VALUE, '.', 2) = '%' |
| Schema-level grant | (db.schema.%) LIKE ANY (raw excludes) |
| Object-level grant | Include VALUE itself LIKE ANY (raw excludes) |
Never collapse a fine-grained exclude to DB-level only.
Simple plugin (single context):
"""Plugin file for processing {name} plugin data."""
# MIT license header + region markers (see existing plugins)
from typing import Dict, List, Optional
from dtagent.plugins import Plugin
from dtagent.context import RUN_PLUGIN_KEY, RUN_RESULTS_KEY, RUN_ID_KEY # COMPILE_REMOVE
class {CamelCase}Plugin(Plugin):
"""{Name} plugin class."""
PLUGIN_NAME = "{name}"
def process(self, run_id: str, run_proc: bool = True,
contexts: Optional[List[str]] = None) -> Dict[str, Dict[str, int]]:
t_data = "APP.V_{UPPER}_INSTRUMENTED"
entries, logs, metrics, events = self._log_entries(
lambda: self._get_table_rows(t_data),
"{name}",
run_uuid=run_id,
report_metrics=True,
log_completion=run_proc,
)
return self._report_results(
{"{name}": {"entries": entries, "log_lines": logs,
"metrics": metrics, "events": events}},
run_id,
)
Multi-context plugin (dual schedule):
def process(self, run_id, run_proc=True, contexts=None):
results = {}
if not contexts or "fast_ctx" in contexts:
e, l, m, ev = self._log_entries(
lambda: self._get_table_rows(t_fast), "fast_ctx",
run_uuid=run_id, report_timestamp_events=True,
report_metrics=True, log_completion=run_proc)
results["fast_ctx"] = {"entries": e, "log_lines": l, "metrics": m, "events": ev}
if not contexts or "deep_ctx" in contexts:
e, l, m, ev = self._log_entries(
lambda: self._get_table_rows(t_deep), "deep_ctx",
run_uuid=run_id, report_metrics=True, log_completion=run_proc)
results["deep_ctx"] = {"entries": e, "log_lines": l, "metrics": m, "events": ev}
return self._report_results(results, run_id)
Span plugin — use _process_span_rows() instead of _log_entries(). See
query_history.py or login_history.py for real examples.
| Method | Purpose |
|---|---|
_log_entries(fn, context, ...) | Primary for log/metric/event plugins |
_process_span_rows(fn, ...) | For span/trace plugins |
_get_table_rows(query) | Generator over Snowflake results |
_report_results(dict, run_id) | Format return value |
_report_execution(name, ts, ...) | Log completion (multi-context manual) |
Key _log_entries() flags:
| Flag | Default | Effect |
|---|---|---|
report_metrics | True | Emit metrics from METRICS column |
report_timestamp_events | True | Emit events from EVENT_TIMESTAMPS |
report_all_as_events | False | Emit every row as an event |
log_completion | True | Auto-log run completion |
{name}-config.ymlplugins:
{name}:
include: # optional, for filterable plugins
- '%.%.%'
exclude: # optional
- DTAGENT_DB.%.%
schedule: USING CRON 0 */12 * * * UTC
# schedule_history: USING CRON 0 * * * * UTC # for dual-schedule
# schedule_grants: USING CRON 30 */12 * * * UTC # for admin grants task
# lookback_hours: 4 # custom settings
is_disabled: false
telemetry:
- metrics
- logs
- events
- biz_events
instruments-def.ymlFour sections: dimensions, attributes, metrics, event_timestamps.
dimensions:
db.namespace:
__context_names: ["{name}"]
__example: analytics_db
__description: The database name.
attributes:
snowflake.entity.id:
__context_names: ["{name}"]
__example: "12345"
__description: Unique entity identifier.
metrics:
snowflake.entity.size:
__context_names: ["{name}"]
__example: "1048576"
__description: Entity size.
displayName: Entity Size
unit: bytes
event_timestamps: # only if report_timestamp_events=True
snowflake.event.trigger:
__context_names: ["{name}"]
__example: "snowflake.entity.created_time"
__description: Event trigger key.
snowflake.entity.created_time:
__context_names: ["{name}"]
__example: 1639051180946000000
__description: Entity creation timestamp (epoch nanoseconds).
Field naming rules:
snake_case, custom fields start with snowflake.duration, not duration_ms).count suffix (implied for counters)is_ or has_ prefixbom.ymldelivers:
- name: DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED
type: view
- name: DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
type: task
- name: DTAGENT_DB.CONFIG.UPDATE_{UPPER}_CONF()
type: procedure
references:
- name: SNOWFLAKE.ACCOUNT_USAGE.SOME_VIEW
type: view
privileges: SELECT
readme.mdBrief description (1-2 sentences).
What data is collected:
- ...
Key use cases:
- ...
## Configuration
Default schedule and customization examples.
## Querying in Dynatrace
Example DQL queries for logs, metrics, events.
Create test/plugins/test_{name}.py:
class Test{CamelCase}:
import pytest
FIXTURES = {
"APP.V_{UPPER}_INSTRUMENTED": "test/test_data/{name}.ndjson",
}
@pytest.mark.xdist_group(name="test_telemetry")
def test_{name}(self):
from typing import Dict, Generator
from dtagent.plugins.{name} import {CamelCase}Plugin
import test._utils as utils
from test import TestDynatraceSnowAgent, _get_session
utils._generate_all_fixtures(_get_session(), self.FIXTURES)
class Test{CamelCase}Plugin({CamelCase}Plugin):
def _get_table_rows(self, t_data: str) -> Generator[Dict, None, None]:
return utils._safe_get_fixture_entries(
Test{CamelCase}.FIXTURES, t_data, limit=2)
def __local_get_plugin_class(source: str):
return Test{CamelCase}Plugin
from dtagent import plugins
plugins._get_plugin_class = __local_get_plugin_class
disabled_combinations = [
[],
["metrics"],
["logs"],
["metrics", "logs"],
["logs", "spans", "metrics", "events"],
]
for disabled_telemetry in disabled_combinations:
utils.execute_telemetry_test(
TestDynatraceSnowAgent,
test_name="test_{name}",
disabled_telemetry=disabled_telemetry,
affecting_types_for_entries=["logs", "metrics"],
base_count={{
"{name}": {{"entries": N, "log_lines": N, "metrics": M}},
}},
)
For multi-context plugins, add FIXTURES entries for each view/procedure and
separate context tests (see test_snowpipes.py for the pattern — context-None,
context-subset tests).
test/test_data/test/test_results/test_{name}/disabled_telemetry combos tested: [], ["metrics"], ["logs"], all-disabledaffecting_types_for_entries matches the telemetry types that affect entry countscontexts=None and individual context selection.venv/bin/pytest test/plugins/test_{name}.py -v# From live Snowflake (requires test/credentials.yml):
.venv/bin/pytest test/plugins/test_{name}.py -p
# Run mocked tests:
.venv/bin/pytest test/plugins/test_{name}.py -v
| Artifact | Action |
|---|---|
{name}.config/readme.md | Write plugin description, use cases, config examples, DQL queries |
{name}.config/config.md | Optional: extended configuration documentation |
docs/USECASES.md | Add use cases under appropriate DPO theme(s) and tier(s) |
docs/CHANGELOG.md | User-facing: new plugin summary (1-2 sentences) |
docs/DEVLOG.md | Developer-facing: implementation details, decisions, patterns used |
docs/PLUGINS.md | Do not edit — autogenerated by build_docs.sh |
docs/SEMANTICS.md | Do not edit — autogenerated by build_docs.sh |
Run ./scripts/dev/build_docs.sh after documentation changes.
# 1. Build
./scripts/dev/build.sh
# 2. Run tests
.venv/bin/pytest test/plugins/test_{name}.py -v
# 3. Run full suite + lint
.venv/bin/pytest && make lint
# 4. Deploy to test-qa
./scripts/deploy/deploy.sh test-qa --scope=plugins,config --options=skip_confirm
# 5. Verify in Snowflake
# SHOW TASKS LIKE 'TASK_DTAGENT_{UPPER}%' IN SCHEMA DTAGENT_DB.APP;
# CALL DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}'));
# 6. Verify in Dynatrace (DQL)
# fetch logs | filter db.system == "snowflake" | filter dsoa.run.context == "{name}"
Deploy scope rules:
| Change | Scope |
|---|---|
| Plugin SQL only | plugins,config |
| Python agent code changed | plugins,agents,config |
New plugin toggled on/off with deploy_disabled_plugins: false | plugins,agents,config |
Always include config — omitting it leaves tasks suspended.
Single view, single task, _log_entries() with report_metrics=True.
Examples: budgets, data_schemas, dynamic_tables, resource_monitors.
Multiple views processed in one process() call, single task. Guard each
context with if not contexts or "ctx" in contexts:. Use _report_execution()
for manual completion logging.
Examples: budgets (budgets + spendings), warehouse_usage.
Multiple views on different cadences. Fast task (801) invokes lightweight
contexts; deep task (802) invokes heavy contexts. Context-selective syntax:
ARRAY_CONSTRUCT('{plugin}:{ctx1},{ctx2}').
Example: snowpipes (fast: pipe status; deep: copy history, usage history).
Uses _process_span_rows() for hierarchical traces with parent-child
relationships. Returns span IDs, events, errors.
Examples: query_history, login_history.
_log_entries() with report_timestamp_events=True. Requires EVENT_TIMESTAMPS
column in SQL and event_timestamps section in instruments-def.yml.
Example: snowpipes (pipe creation/modification events).
Call a stored procedure before processing contexts (e.g. refresh materialized
data). Example: budgets calls P_GET_BUDGETS before reading views.
| Symptom | Likely cause | Fix |
|---|---|---|
| Plugin not found / not loaded | Class naming wrong, missing PLUGIN_NAME | Check {CamelCase}Plugin, rebuild |
| SQL deployment errors | Lowercase object names, missing USE statements | Ensure UPPERCASE, add use role/database/warehouse |
| No data in Dynatrace | Plugin disabled, telemetry type not in config | Check is_disabled, telemetry list |
| Task not running | Task suspended, config not deployed | Deploy with --scope=config, call UPDATE_{UPPER}_CONF() |
| Mismatched test counts | Stale fixtures, wrong base_count | Regenerate with -p, verify expected counts |
| Semantic fields missing | Mismatch between SQL and instruments-def.yml | Align field names, rebuild docs |
| Config not applied | Config scope not deployed | deploy.sh --scope=config |
| Procedure overload error | Signature changed without upgrade script | Create DROP PROCEDURE IF EXISTS in upgrade/ |
UPPER() or LOWER() for comparisons.contexts=None, individual contexts, and context subsets.-p flag._report_execution() for multi-context manual completion. When
log_completion=False on individual contexts, call _report_execution() at
the end to log combined results.-- Check plugin logs
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "{name}"
| sort timestamp desc | limit 50
-- Check self-monitoring
fetch bizevents
| filter db.system == "snowflake" and dsoa.run.context == "self_monitoring"
| filter dsoa.run.plugin == "{name}"
| fields timestamp, dsoa.run.results | sort timestamp desc
-- Count telemetry by context
fetch logs
| filter db.system == "snowflake" and dsoa.run.plugin == "{name}"
| summarize count(), by: {dsoa.run.context}
-- Query metrics
timeseries avg(snowflake.{domain}.{metric}),
by: {db.namespace}
| filter db.system == "snowflake"
-- Monitor plugin performance (self-monitoring field-level breakdown)
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "self_monitoring"
| filter dsoa.run.plugin == "{name}"
| fields timestamp, dsoa.run.plugin, dsoa.run.id,
{name}.entries, {name}.log_lines, {name}.metrics,
{name}.spans, {name}.span_events, {name}.events
| sort timestamp desc
-- Check errors
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "{name}"
| filter loglevel == "ERROR"
| sort timestamp desc
Use plugin/option guards for conditional code inclusion:
--%PLUGIN:{name}:
-- Only included when {name} plugin is enabled
create or replace view ...
--%:PLUGIN:{name}
--%OPTION:dtagent_admin:
-- Only included when admin role is enabled
grant role ... to role DTAGENT_ADMIN;
--%:OPTION:dtagent_admin
For cross-plugin dependencies, wrap both column references AND join clauses in guards. Test with the dependency both enabled and disabled.
When a stored procedure's parameter list changes, Snowflake won't replace it — it raises an ambiguous overload error. Create an upgrade script:
src/dtagent.sql/upgrade/<new-version>/xxx_drop_{name}_old_proc.sql
--%PLUGIN:{name}:
DROP PROCEDURE IF EXISTS DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED(<old-param-types>);
--%:PLUGIN:{name}
Deploy upgrade before main deploy:
./scripts/deploy/deploy.sh test-qa --scope=upgrade --from-version=<prev> --options=skip_confirm
./scripts/deploy/deploy.sh test-qa --scope=plugins,admin,config --options=skip_confirm
Create and update dashboard and workflow documentation
Create and update Dynatrace dashboards for DSOA telemetry
Create and update Dynatrace workflows for DSOA automation
Review a pull request and process review comments left by others
AI-guided QA walkthrough for DSOA releases. Automates version detection, deployment commands, notebook deployment, and interactive test walkthrough. Use when a QA engineer needs to execute the DSOA release test suite.
Create and update Snowflake synthetic test setups for DSOA telemetry validation