com um clique
plugin-development
// Create and update DSOA plugins — full development lifecycle from planning through validation
// Create and update DSOA plugins — full development lifecycle from planning through validation
Create and update dashboard and workflow documentation
Create and update Dynatrace dashboards for DSOA telemetry
Create and update Dynatrace workflows for DSOA automation
Review a pull request and process review comments left by others
AI-guided QA walkthrough for DSOA releases. Automates version detection, deployment commands, notebook deployment, and interactive test walkthrough. Use when a QA engineer needs to execute the DSOA release test suite.
Create and update Snowflake synthetic test setups for DSOA telemetry validation
| name | plugin-development |
| description | Create and update DSOA plugins — full development lifecycle from planning through validation |
| license | MIT |
| compatibility | opencode |
| metadata | {"audience":"developers"} |
Use this skill when creating a new plugin, modifying an existing plugin, or reviewing plugin-related changes for the Dynatrace Snowflake Observability Agent (DSOA).
Plugin development follows a structured sequence. The first two phases are recommended but not enforced by this skill; phases 3-7 are mandatory for every plugin.
| Phase | Name | Output |
|---|---|---|
| 1 | Product planning (recommended) | Use cases, executive summary, DPO theme mapping |
| 2 | Implementation plan (recommended) | Ordered task list, reviewer-approved decisions |
| 3 | Scaffolding | Directory triad, boilerplate files |
| 4 | SQL & Python implementation | Views/procedures, Python class, config, semantics |
| 5 | Testing | Fixtures, mocked tests, all disabled_telemetry combos |
| 6 | Documentation | readme.md, config.md, USECASES.md entries, CHANGELOG/DEVLOG |
| 7 | Build, deploy, validate | build.sh, deploy to test-qa, verify telemetry in Dynatrace |
After plugin validation, the next step is typically dashboard/workflow creation
(see dynatrace-dashboard and dynatrace-workflow skills).
Before writing code, answer these questions:
docs/USECASES.md.Store planning artifacts in .github/context/proposals/.
Create an ordered task breakdown with:
disabled_telemetry combos, edge cases)Store alongside the proposal. Get approval before Phase 3.
Every plugin is a triad: Python module + SQL directory + config directory.
src/dtagent/plugins/
{name}.py
{name}.sql/
init/ # optional: ACCOUNTADMIN setup
admin/ # optional: admin-scope scripts
0xx_*.sql # views, procedures (prefix 0-69)
801_{name}_task.sql # task (fast schedule)
802_{name}_*_task.sql # optional: second task (deep schedule)
8xx_{name}_grants_task.sql # optional: admin grants task
901_update_{name}_conf.sql # config update procedure
{name}.config/
{name}-config.yml
instruments-def.yml
bom.yml
readme.md
config.md # optional: extended config docs
| Component | Convention | Example (snowpipes) |
|---|---|---|
| Plugin name | snake_case | snowpipes |
| Python file | {name}.py | snowpipes.py |
| Python class | {CamelCase}Plugin | SnowpipesPlugin |
PLUGIN_NAME | lowercase with underscores | "snowpipes" |
| SQL directory | {name}.sql/ | snowpipes.sql/ |
| Config directory | {name}.config/ | snowpipes.config/ |
| Config YAML | {name}-config.yml | snowpipes-config.yml |
| SQL objects | UPPERCASE | V_SNOWPIPES_INSTRUMENTED |
| Task name | TASK_DTAGENT_{UPPER} | TASK_DTAGENT_SNOWPIPES |
| Config procedure | UPDATE_{UPPER}_CONF() | UPDATE_SNOWPIPES_CONF() |
| Semantic fields | snowflake.{domain}.{field} | snowflake.pipe.name |
Create the directory structure:
mkdir -p src/dtagent/plugins/{name}.sql
mkdir -p src/dtagent/plugins/{name}.config
touch src/dtagent/plugins/{name}.py
Use a VIEW for straightforward data collection (most plugins). Use a procedure only when you need error handling, temp tables, conditional logic, or multi-step processing.
Required columns for log/metric plugins:
| Column | Type | Purpose |
|---|---|---|
TIMESTAMP | TIMESTAMP_LTZ | When data was collected/event occurred |
_MESSAGE | VARCHAR | Log content (auto-mapped to content in Dynatrace) |
DIMENSIONS | OBJECT | Low-cardinality fields for grouping; used with metrics |
ATTRIBUTES | OBJECT | High-cardinality context; NOT sent with metrics |
METRICS | OBJECT | Numerical measurements |
Optional columns:
| Column | Type | Purpose |
|---|---|---|
| Identifier cols | VARCHAR | Reference/logging (e.g. PIPE_NAME) |
EVENT_TIMESTAMPS | OBJECT | Timestamp fields that generate events |
Additional columns for span plugins:
| Column | Type | Purpose |
|---|---|---|
QUERY_ID | VARCHAR | Unique span ID |
PARENT_QUERY_ID | VARCHAR | Parent span (for traces) |
START_TIME / END_TIME | NUMBER | Epoch nanoseconds |
NAME | VARCHAR | Span name |
STATUS_CODE | VARCHAR | 'OK', 'ERROR', 'UNSET' |
SQL template:
use role DTAGENT_OWNER; use database DTAGENT_DB; use warehouse DTAGENT_WH;
create or replace view DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED as
with cte_source as (
select * from SNOWFLAKE.ACCOUNT_USAGE.SOME_VIEW
where SOME_TIME > DTAGENT_DB.STATUS.F_LAST_PROCESSED_TS('{name}')
)
select
current_timestamp() as TIMESTAMP,
ENTITY_NAME,
concat('Entity: ', ENTITY_NAME, ' in ', DB_NAME) as _MESSAGE,
object_construct(
'db.namespace', DB_NAME,
'snowflake.warehouse.name', WH_NAME
) as DIMENSIONS,
object_construct(
'snowflake.entity.id', ENTITY_ID,
'snowflake.entity.comment', COMMENT
) as ATTRIBUTES,
object_construct(
'snowflake.entity.size', SIZE_BYTES,
'snowflake.entity.rows', ROW_COUNT
) as METRICS
from cte_source;
grant select on view DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED to role DTAGENT_VIEWER;
When using a procedure instead:
create or replace procedure DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED()
returns table (TIMESTAMP timestamp_ltz, ...)
language sql
execute as caller
as $$
declare
c_result cursor for
with cte_data as (select ...)
select ... from cte_data;
begin
open c_result;
return table(resultset_from_cursor(c_result));
exception
when statement_error then
SYSTEM$LOG_ERROR(SQLERRM);
return table(select null as TIMESTAMP);
end;
$$;
grant usage on procedure DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED() to role DTAGENT_VIEWER;
SQL pitfalls:
snow sql CLI misparses cursor field access (r.name) inside $$ blocks.
Always capture into LET variables first.F_LAST_PROCESSED_TS('{name}') for incremental data to avoid duplicates.Single-schedule (most plugins):
use role DTAGENT_OWNER; use database DTAGENT_DB; use warehouse DTAGENT_WH;
create or replace task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
warehouse = DTAGENT_WH
schedule = 'USING CRON 0 */12 * * * UTC'
allow_overlapping_execution = FALSE
as
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}'));
grant ownership on task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
to role DTAGENT_VIEWER revoke current grants;
grant operate, monitor on task DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
to role DTAGENT_VIEWER;
Dual-schedule (context-selective, like snowpipes):
-- 801: Fast task (every 5 min) — lightweight contexts only
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}:{fast_context}'));
-- 802: Deep task (hourly) — heavy contexts
call DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}:{ctx1},{ctx2}'));
The syntax '{plugin}:{context1},{context2}' triggers only the listed contexts.
Without the colon, all contexts run.
use role DTAGENT_OWNER; use schema DTAGENT_DB.CONFIG; use warehouse DTAGENT_WH;
create or replace procedure DTAGENT_DB.CONFIG.UPDATE_{UPPER}_CONF()
returns text language SQL execute as caller
as $$
begin
-- Single schedule:
call DTAGENT_DB.CONFIG.UPDATE_PLUGIN_SCHEDULE('{name}');
-- Dual schedule (pass additional schedule suffixes):
-- call DTAGENT_DB.CONFIG.UPDATE_PLUGIN_SCHEDULE('{name}', ARRAY_CONSTRUCT('history', 'grants'));
return '{name} plugin config updated';
exception
when statement_error then SYSTEM$LOG_WARN(SQLERRM); return SQLERRM;
end;
$$;
Plugins operating on user-created objects need include/exclude patterns
(DB.SCHEMA.OBJECT with % wildcards).
In views — compare full QUALIFIED_NAME against raw pattern VALUE:
where QUALIFIED_NAME LIKE ANY (select include_pattern from cte_includes)
and not QUALIFIED_NAME LIKE ANY (select exclude_pattern from cte_excludes)
In admin grant procedures — match at the same tier as the grant:
| Tier | Suppress when |
|---|---|
| DB-level grant | Exclude's split_part(VALUE, '.', 2) = '%' |
| Schema-level grant | (db.schema.%) LIKE ANY (raw excludes) |
| Object-level grant | Include VALUE itself LIKE ANY (raw excludes) |
Never collapse a fine-grained exclude to DB-level only.
Simple plugin (single context):
"""Plugin file for processing {name} plugin data."""
# MIT license header + region markers (see existing plugins)
from typing import Dict, List, Optional
from dtagent.plugins import Plugin
from dtagent.context import RUN_PLUGIN_KEY, RUN_RESULTS_KEY, RUN_ID_KEY # COMPILE_REMOVE
class {CamelCase}Plugin(Plugin):
"""{Name} plugin class."""
PLUGIN_NAME = "{name}"
def process(self, run_id: str, run_proc: bool = True,
contexts: Optional[List[str]] = None) -> Dict[str, Dict[str, int]]:
t_data = "APP.V_{UPPER}_INSTRUMENTED"
entries, logs, metrics, events = self._log_entries(
lambda: self._get_table_rows(t_data),
"{name}",
run_uuid=run_id,
report_metrics=True,
log_completion=run_proc,
)
return self._report_results(
{"{name}": {"entries": entries, "log_lines": logs,
"metrics": metrics, "events": events}},
run_id,
)
Multi-context plugin (dual schedule):
def process(self, run_id, run_proc=True, contexts=None):
results = {}
if not contexts or "fast_ctx" in contexts:
e, l, m, ev = self._log_entries(
lambda: self._get_table_rows(t_fast), "fast_ctx",
run_uuid=run_id, report_timestamp_events=True,
report_metrics=True, log_completion=run_proc)
results["fast_ctx"] = {"entries": e, "log_lines": l, "metrics": m, "events": ev}
if not contexts or "deep_ctx" in contexts:
e, l, m, ev = self._log_entries(
lambda: self._get_table_rows(t_deep), "deep_ctx",
run_uuid=run_id, report_metrics=True, log_completion=run_proc)
results["deep_ctx"] = {"entries": e, "log_lines": l, "metrics": m, "events": ev}
return self._report_results(results, run_id)
Span plugin — use _process_span_rows() instead of _log_entries(). See
query_history.py or login_history.py for real examples.
| Method | Purpose |
|---|---|
_log_entries(fn, context, ...) | Primary for log/metric/event plugins |
_process_span_rows(fn, ...) | For span/trace plugins |
_get_table_rows(query) | Generator over Snowflake results |
_report_results(dict, run_id) | Format return value |
_report_execution(name, ts, ...) | Log completion (multi-context manual) |
Key _log_entries() flags:
| Flag | Default | Effect |
|---|---|---|
report_metrics | True | Emit metrics from METRICS column |
report_timestamp_events | True | Emit events from EVENT_TIMESTAMPS |
report_all_as_events | False | Emit every row as an event |
log_completion | True | Auto-log run completion |
{name}-config.ymlplugins:
{name}:
include: # optional, for filterable plugins
- '%.%.%'
exclude: # optional
- DTAGENT_DB.%.%
schedule: USING CRON 0 */12 * * * UTC
# schedule_history: USING CRON 0 * * * * UTC # for dual-schedule
# schedule_grants: USING CRON 30 */12 * * * UTC # for admin grants task
# lookback_hours: 4 # custom settings
is_disabled: false
telemetry:
- metrics
- logs
- events
- biz_events
instruments-def.ymlFour sections: dimensions, attributes, metrics, event_timestamps.
dimensions:
db.namespace:
__context_names: ["{name}"]
__example: analytics_db
__description: The database name.
attributes:
snowflake.entity.id:
__context_names: ["{name}"]
__example: "12345"
__description: Unique entity identifier.
metrics:
snowflake.entity.size:
__context_names: ["{name}"]
__example: "1048576"
__description: Entity size.
displayName: Entity Size
unit: bytes
event_timestamps: # only if report_timestamp_events=True
snowflake.event.trigger:
__context_names: ["{name}"]
__example: "snowflake.entity.created_time"
__description: Event trigger key.
snowflake.entity.created_time:
__context_names: ["{name}"]
__example: 1639051180946000000
__description: Entity creation timestamp (epoch nanoseconds).
Field naming rules:
snake_case, custom fields start with snowflake.duration, not duration_ms).count suffix (implied for counters)is_ or has_ prefixbom.ymldelivers:
- name: DTAGENT_DB.APP.V_{UPPER}_INSTRUMENTED
type: view
- name: DTAGENT_DB.APP.TASK_DTAGENT_{UPPER}
type: task
- name: DTAGENT_DB.CONFIG.UPDATE_{UPPER}_CONF()
type: procedure
references:
- name: SNOWFLAKE.ACCOUNT_USAGE.SOME_VIEW
type: view
privileges: SELECT
readme.mdBrief description (1-2 sentences).
What data is collected:
- ...
Key use cases:
- ...
## Configuration
Default schedule and customization examples.
## Querying in Dynatrace
Example DQL queries for logs, metrics, events.
Create test/plugins/test_{name}.py:
class Test{CamelCase}:
import pytest
FIXTURES = {
"APP.V_{UPPER}_INSTRUMENTED": "test/test_data/{name}.ndjson",
}
@pytest.mark.xdist_group(name="test_telemetry")
def test_{name}(self):
from typing import Dict, Generator
from dtagent.plugins.{name} import {CamelCase}Plugin
import test._utils as utils
from test import TestDynatraceSnowAgent, _get_session
utils._generate_all_fixtures(_get_session(), self.FIXTURES)
class Test{CamelCase}Plugin({CamelCase}Plugin):
def _get_table_rows(self, t_data: str) -> Generator[Dict, None, None]:
return utils._safe_get_fixture_entries(
Test{CamelCase}.FIXTURES, t_data, limit=2)
def __local_get_plugin_class(source: str):
return Test{CamelCase}Plugin
from dtagent import plugins
plugins._get_plugin_class = __local_get_plugin_class
disabled_combinations = [
[],
["metrics"],
["logs"],
["metrics", "logs"],
["logs", "spans", "metrics", "events"],
]
for disabled_telemetry in disabled_combinations:
utils.execute_telemetry_test(
TestDynatraceSnowAgent,
test_name="test_{name}",
disabled_telemetry=disabled_telemetry,
affecting_types_for_entries=["logs", "metrics"],
base_count={{
"{name}": {{"entries": N, "log_lines": N, "metrics": M}},
}},
)
For multi-context plugins, add FIXTURES entries for each view/procedure and
separate context tests (see test_snowpipes.py for the pattern — context-None,
context-subset tests).
test/test_data/test/test_results/test_{name}/disabled_telemetry combos tested: [], ["metrics"], ["logs"], all-disabledaffecting_types_for_entries matches the telemetry types that affect entry countscontexts=None and individual context selection.venv/bin/pytest test/plugins/test_{name}.py -v# From live Snowflake (requires test/credentials.yml):
.venv/bin/pytest test/plugins/test_{name}.py -p
# Run mocked tests:
.venv/bin/pytest test/plugins/test_{name}.py -v
| Artifact | Action |
|---|---|
{name}.config/readme.md | Write plugin description, use cases, config examples, DQL queries |
{name}.config/config.md | Optional: extended configuration documentation |
docs/USECASES.md | Add use cases under appropriate DPO theme(s) and tier(s) |
docs/CHANGELOG.md | User-facing: new plugin summary (1-2 sentences) |
docs/DEVLOG.md | Developer-facing: implementation details, decisions, patterns used |
docs/PLUGINS.md | Do not edit — autogenerated by build_docs.sh |
docs/SEMANTICS.md | Do not edit — autogenerated by build_docs.sh |
Run ./scripts/dev/build_docs.sh after documentation changes.
# 1. Build
./scripts/dev/build.sh
# 2. Run tests
.venv/bin/pytest test/plugins/test_{name}.py -v
# 3. Run full suite + lint
.venv/bin/pytest && make lint
# 4. Deploy to test-qa
./scripts/deploy/deploy.sh test-qa --scope=plugins,config --options=skip_confirm
# 5. Verify in Snowflake
# SHOW TASKS LIKE 'TASK_DTAGENT_{UPPER}%' IN SCHEMA DTAGENT_DB.APP;
# CALL DTAGENT_DB.APP.DTAGENT(ARRAY_CONSTRUCT('{name}'));
# 6. Verify in Dynatrace (DQL)
# fetch logs | filter db.system == "snowflake" | filter dsoa.run.context == "{name}"
Deploy scope rules:
| Change | Scope |
|---|---|
| Plugin SQL only | plugins,config |
| Python agent code changed | plugins,agents,config |
New plugin toggled on/off with deploy_disabled_plugins: false | plugins,agents,config |
Always include config — omitting it leaves tasks suspended.
Single view, single task, _log_entries() with report_metrics=True.
Examples: budgets, data_schemas, dynamic_tables, resource_monitors.
Multiple views processed in one process() call, single task. Guard each
context with if not contexts or "ctx" in contexts:. Use _report_execution()
for manual completion logging.
Examples: budgets (budgets + spendings), warehouse_usage.
Multiple views on different cadences. Fast task (801) invokes lightweight
contexts; deep task (802) invokes heavy contexts. Context-selective syntax:
ARRAY_CONSTRUCT('{plugin}:{ctx1},{ctx2}').
Example: snowpipes (fast: pipe status; deep: copy history, usage history).
Uses _process_span_rows() for hierarchical traces with parent-child
relationships. Returns span IDs, events, errors.
Examples: query_history, login_history.
_log_entries() with report_timestamp_events=True. Requires EVENT_TIMESTAMPS
column in SQL and event_timestamps section in instruments-def.yml.
Example: snowpipes (pipe creation/modification events).
Call a stored procedure before processing contexts (e.g. refresh materialized
data). Example: budgets calls P_GET_BUDGETS before reading views.
| Symptom | Likely cause | Fix |
|---|---|---|
| Plugin not found / not loaded | Class naming wrong, missing PLUGIN_NAME | Check {CamelCase}Plugin, rebuild |
| SQL deployment errors | Lowercase object names, missing USE statements | Ensure UPPERCASE, add use role/database/warehouse |
| No data in Dynatrace | Plugin disabled, telemetry type not in config | Check is_disabled, telemetry list |
| Task not running | Task suspended, config not deployed | Deploy with --scope=config, call UPDATE_{UPPER}_CONF() |
| Mismatched test counts | Stale fixtures, wrong base_count | Regenerate with -p, verify expected counts |
| Semantic fields missing | Mismatch between SQL and instruments-def.yml | Align field names, rebuild docs |
| Config not applied | Config scope not deployed | deploy.sh --scope=config |
| Procedure overload error | Signature changed without upgrade script | Create DROP PROCEDURE IF EXISTS in upgrade/ |
UPPER() or LOWER() for comparisons.contexts=None, individual contexts, and context subsets.-p flag._report_execution() for multi-context manual completion. When
log_completion=False on individual contexts, call _report_execution() at
the end to log combined results.-- Check plugin logs
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "{name}"
| sort timestamp desc | limit 50
-- Check self-monitoring
fetch bizevents
| filter db.system == "snowflake" and dsoa.run.context == "self_monitoring"
| filter dsoa.run.plugin == "{name}"
| fields timestamp, dsoa.run.results | sort timestamp desc
-- Count telemetry by context
fetch logs
| filter db.system == "snowflake" and dsoa.run.plugin == "{name}"
| summarize count(), by: {dsoa.run.context}
-- Query metrics
timeseries avg(snowflake.{domain}.{metric}),
by: {db.namespace}
| filter db.system == "snowflake"
-- Monitor plugin performance (self-monitoring field-level breakdown)
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "self_monitoring"
| filter dsoa.run.plugin == "{name}"
| fields timestamp, dsoa.run.plugin, dsoa.run.id,
{name}.entries, {name}.log_lines, {name}.metrics,
{name}.spans, {name}.span_events, {name}.events
| sort timestamp desc
-- Check errors
fetch logs
| filter db.system == "snowflake" and dsoa.run.context == "{name}"
| filter loglevel == "ERROR"
| sort timestamp desc
Use plugin/option guards for conditional code inclusion:
--%PLUGIN:{name}:
-- Only included when {name} plugin is enabled
create or replace view ...
--%:PLUGIN:{name}
--%OPTION:dtagent_admin:
-- Only included when admin role is enabled
grant role ... to role DTAGENT_ADMIN;
--%:OPTION:dtagent_admin
For cross-plugin dependencies, wrap both column references AND join clauses in guards. Test with the dependency both enabled and disabled.
When a stored procedure's parameter list changes, Snowflake won't replace it — it raises an ambiguous overload error. Create an upgrade script:
src/dtagent.sql/upgrade/<new-version>/xxx_drop_{name}_old_proc.sql
--%PLUGIN:{name}:
DROP PROCEDURE IF EXISTS DTAGENT_DB.APP.F_{UPPER}_INSTRUMENTED(<old-param-types>);
--%:PLUGIN:{name}
Deploy upgrade before main deploy:
./scripts/deploy/deploy.sh test-qa --scope=upgrade --from-version=<prev> --options=skip_confirm
./scripts/deploy/deploy.sh test-qa --scope=plugins,admin,config --options=skip_confirm