| name | agent-governance-patterns |
| description | Design and implement governance controls for tool-using and multi-agent AI systems, including policy enforcement, approval gates, audit trails, trust scoring, rate limits, and safe tool execution.
|
| metadata | {"skill-author":"Marie-Lynne Block"} |
Agent Governance Patterns
Patterns for adding safety, trust, and deterministic policy enforcement to tool-using and multi-agent AI systems.
Overview
Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails. Use this skill to design or implement controls; use a compliance-focused skill when the task is to score an existing system against a formal standard.
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
↓ ↓ ↓
Threat Detection Allow/Deny Trust Update
Use This Skill When
- Agents with tool access: Any agent that calls external tools (APIs, databases, shell commands)
- Multi-agent systems: Agents delegating to other agents need trust boundaries
- Production deployments: Compliance, audit, and safety requirements
- Sensitive operations: Financial transactions, data access, infrastructure management
- Policy enforcement: Tool allowlists, blocklists, approval gates, rate limits, or content filters are needed
- Operational accountability: Tool calls need structured audit trails, denial reasons, or replayable records
Do Not Use This Skill For
- General security reviews where agent governance is not the focus
- OWASP ASI compliance scoring or gap reporting
- Supply-chain integrity, signing, plugin provenance, or manifest verification
- Broad agent architecture design unless policy enforcement, trust boundaries, or auditability are central
Pattern 1: Governance Policy
Define what an agent is allowed to do as a composable, serializable policy object.
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import re
class PolicyAction(Enum):
ALLOW = "allow"
DENY = "deny"
REVIEW = "review"
@dataclass
class GovernancePolicy:
"""Declarative policy controlling agent behavior."""
name: str
allowed_tools: list[str] = field(default_factory=list)
blocked_tools: list[str] = field(default_factory=list)
blocked_patterns: list[str] = field(default_factory=list)
max_calls_per_request: int = 100
require_human_approval: list[str] = field(default_factory=list)
def check_tool(self, tool_name: str) -> PolicyAction:
"""Check if a tool is allowed by this policy."""
if tool_name in self.blocked_tools:
return PolicyAction.DENY
if tool_name in self.require_human_approval:
return PolicyAction.REVIEW
if self.allowed_tools and tool_name not in self.allowed_tools:
return PolicyAction.DENY
return PolicyAction.ALLOW
def check_content(self, content: str) -> Optional[str]:
"""Check content against blocked patterns. Returns matched pattern or None."""
for pattern in self.blocked_patterns:
if re.search(pattern, content, re.IGNORECASE):
return pattern
return None
Policy Composition
Combine multiple policies (e.g., org-wide + team + agent-specific):
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
"""Merge policies with most-restrictive-wins semantics."""
combined = GovernancePolicy(name="composed")
for policy in policies:
combined.blocked_tools.extend(policy.blocked_tools)
combined.blocked_patterns.extend(policy.blocked_patterns)
combined.require_human_approval.extend(policy.require_human_approval)
combined.max_calls_per_request = min(
combined.max_calls_per_request,
policy.max_calls_per_request
)
if policy.allowed_tools:
if combined.allowed_tools:
combined.allowed_tools = [
t for t in combined.allowed_tools if t in policy.allowed_tools
]
else:
combined.allowed_tools = list(policy.allowed_tools)
return combined
org_policy = GovernancePolicy(
name="org-wide",
blocked_tools=["shell_exec", "delete_database"],
blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"],
max_calls_per_request=50
)
team_policy = GovernancePolicy(
name="data-team",
allowed_tools=["query_db", "read_file", "write_report"],
require_human_approval=["write_report"]
)
agent_policy = compose_policies(org_policy, team_policy)
Policy as YAML
Store policies as configuration, not code:
name: production-agent
allowed_tools:
- search_documents
- query_database
- send_email
blocked_tools:
- shell_exec
- delete_record
blocked_patterns:
- "(?i)(api[_-]?key|secret|password)\\s*[:=]"
- "(?i)(drop|truncate|delete from)\\s+\\w+"
max_calls_per_request: 25
require_human_approval:
- send_email
import yaml
def load_policy(path: str) -> GovernancePolicy:
with open(path) as f:
data = yaml.safe_load(f)
return GovernancePolicy(**data)
Pattern 2: Semantic Intent Classification
Detect dangerous intent in prompts before they reach the agent, using pattern-based signals.
from dataclasses import dataclass
@dataclass
class IntentSignal:
category: str
confidence: float
evidence: str
THREAT_SIGNALS = [
(r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8),
(r"(?i)export\s+.*\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9),
(r"(?i)curl\s+.*\s+-d\s+", "data_exfiltration", 0.7),
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),
]
def classify_intent(content: str) -> list[IntentSignal]:
"""Classify content for threat signals."""
signals = []
for pattern, category, weight in THREAT_SIGNALS:
match = re.search(pattern, content)
if match:
signals.append(IntentSignal(
category=category,
confidence=weight,
evidence=match.group()
))
return signals
def is_safe(content: str, threshold: float = 0.7) -> bool:
"""Quick check: is the content safe above the given threshold?"""
signals = classify_intent(content)
return not any(s.confidence >= threshold for s in signals)
Key insight: Intent classification happens before tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check after generation.
Pattern-based classification is a baseline control, not complete sensitive-data detection. For regulated or high-risk systems, combine it with structured validators, data-loss prevention controls, and human approval for high-impact actions.
Pattern 3: Tool-Level Governance Decorator
Wrap individual tool functions with governance checks:
import functools
import time
import contextvars
_call_counters: contextvars.ContextVar[dict[str, int] | None] = contextvars.ContextVar(
"governance_call_counters",
default=None,
)
def _request_call_counters() -> dict[str, int]:
counters = _call_counters.get()
if counters is None:
counters = {}
_call_counters.set(counters)
return counters
def govern(policy: GovernancePolicy, audit_trail=None):
"""Decorator that enforces governance policy on a tool function."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
start = time.monotonic()
def audit(action: str, **details):
if audit_trail is not None:
audit_trail.append({
"tool": tool_name,
"action": action,
"policy": policy.name,
"timestamp": time.time(),
**details,
})
try:
action = policy.check_tool(tool_name)
if action == PolicyAction.DENY:
raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
if action == PolicyAction.REVIEW:
raise PermissionError(f"Tool '{tool_name}' requires human approval")
counters = _request_call_counters()
counters[policy.name] = counters.get(policy.name, 0) + 1
if counters[policy.name] > policy.max_calls_per_request:
raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")
for arg in list(args) + list(kwargs.values()):
if isinstance(arg, str):
matched = policy.check_content(arg)
if matched:
raise PermissionError(f"Blocked pattern detected: {matched}")
except PermissionError as error:
audit("denied", reason=str(error))
raise
except Exception as error:
audit("denied", reason="governance_check_failed", error=str(error))
raise PermissionError("Governance check failed closed") from error
try:
result = await func(*args, **kwargs)
audit("allowed", duration_ms=(time.monotonic() - start) * 1000)
return result
except Exception as e:
audit("error", error=str(e))
raise
return wrapper
return decorator
audit_log = []
policy = GovernancePolicy(
name="search-agent",
allowed_tools=["search", "summarize"],
blocked_patterns=[r"(?i)password"],
max_calls_per_request=10
)
@govern(policy, audit_trail=audit_log)
async def search(query: str) -> str:
"""Search documents — governed by policy."""
return f"Results for: {query}"
Pattern 4: Trust Scoring
Track agent reliability over time with decay-based trust scores:
from dataclasses import dataclass, field
import math
import time
@dataclass
class TrustScore:
"""Trust score with temporal decay."""
score: float = 0.5
successes: int = 0
failures: int = 0
last_updated: float = field(default_factory=time.time)
def record_success(self, reward: float = 0.05):
self.successes += 1
self.score = min(1.0, self.score + reward * (1 - self.score))
self.last_updated = time.time()
def record_failure(self, penalty: float = 0.15):
self.failures += 1
self.score = max(0.0, self.score - penalty * self.score)
self.last_updated = time.time()
def current(self, decay_rate: float = 0.001) -> float:
"""Get score with temporal decay — trust erodes without activity."""
elapsed = time.time() - self.last_updated
decay = math.exp(-decay_rate * elapsed)
return self.score * decay
@property
def reliability(self) -> float:
total = self.successes + self.failures
return self.successes / total if total > 0 else 0.0
trust = TrustScore()
trust.record_success()
trust.record_success()
trust.record_failure()
if trust.current() >= 0.7:
pass
elif trust.current() >= 0.4:
pass
else:
pass
Multi-agent trust: In systems where agents delegate to other agents, each agent maintains trust scores for its delegates:
class AgentTrustRegistry:
def __init__(self):
self.scores: dict[str, TrustScore] = {}
def get_trust(self, agent_id: str) -> TrustScore:
if agent_id not in self.scores:
self.scores[agent_id] = TrustScore()
return self.scores[agent_id]
def most_trusted(self, agents: list[str]) -> str:
return max(agents, key=lambda a: self.get_trust(a).current())
def meets_threshold(self, agent_id: str, threshold: float) -> bool:
return self.get_trust(agent_id).current() >= threshold
Pattern 5: Audit Trail
Append-only audit log for all agent actions — critical for compliance and debugging:
from dataclasses import dataclass, field
import json
import time
@dataclass
class AuditEntry:
timestamp: float
agent_id: str
tool_name: str
action: str
policy_name: str
details: dict = field(default_factory=dict)
class AuditTrail:
"""Append-only audit trail for agent governance events."""
def __init__(self):
self._entries: list[AuditEntry] = []
def log(self, agent_id: str, tool_name: str, action: str,
policy_name: str, **details):
self._entries.append(AuditEntry(
timestamp=time.time(),
agent_id=agent_id,
tool_name=tool_name,
action=action,
policy_name=policy_name,
details=details
))
def denied(self) -> list[AuditEntry]:
"""Get all denied actions — useful for security review."""
return [e for e in self._entries if e.action == "denied"]
def by_agent(self, agent_id: str) -> list[AuditEntry]:
return [e for e in self._entries if e.agent_id == agent_id]
def export_jsonl(self, path: str):
"""Export as JSON Lines for log aggregation systems."""
with open(path, "w") as f:
for entry in self._entries:
f.write(json.dumps({
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"tool": entry.tool_name,
"action": entry.action,
"policy": entry.policy_name,
**entry.details
}) + "\n")
Pattern 6: Framework Integration
These examples show where to attach the governance wrapper in common agent frameworks. Treat them as adaptation sketches and verify the exact decorator signatures, tool registration APIs, and version constraints against the framework version used in your project.
PydanticAI
from pydantic_ai import Agent
policy = GovernancePolicy(
name="support-bot",
allowed_tools=["search_docs", "create_ticket"],
blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
max_calls_per_request=20
)
agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")
@agent.tool
@govern(policy)
async def search_docs(ctx, query: str) -> str:
"""Search knowledge base — governed."""
return await kb.search(query)
@agent.tool
@govern(policy)
async def create_ticket(ctx, title: str, body: str) -> str:
"""Create support ticket — governed."""
return await tickets.create(title=title, body=body)
CrewAI
from crewai import Agent, Task, Crew
policy = GovernancePolicy(
name="research-crew",
allowed_tools=["search", "analyze"],
max_calls_per_request=30
)
def governed_crew_run(crew: Crew, policy: GovernancePolicy):
"""Wrap crew execution with governance checks."""
audit = AuditTrail()
for agent in crew.agents:
for tool in agent.tools:
original = tool.func
tool.func = govern(policy, audit_trail=audit)(original)
result = crew.kickoff()
return result, audit
OpenAI Agents SDK
from agents import Agent, function_tool
policy = GovernancePolicy(
name="coding-agent",
allowed_tools=["read_file", "write_file", "run_tests"],
blocked_tools=["shell_exec"],
max_calls_per_request=50
)
@function_tool
@govern(policy)
async def read_file(path: str) -> str:
"""Read file contents — governed."""
from pathlib import Path
root = Path(".").resolve()
safe_path = Path(path).resolve()
if root not in safe_path.parents and safe_path != root:
raise ValueError("Path traversal blocked by governance")
with safe_path.open() as f:
return f.read()
Governance Levels
Match governance strictness to risk level:
| Level | Controls | Use Case |
|---|
| Open | Audit only, no restrictions | Internal dev/testing |
| Standard | Tool allowlist + content filters | General production agents |
| Strict | All controls + human approval for sensitive ops | Financial, healthcare, legal |
| Locked | Allowlist only, no dynamic tools, full audit | Compliance-critical systems |
Best Practices
| Practice | Rationale |
|---|
| Policy as configuration | Store policies in YAML/JSON, not hardcoded — enables change without deploys |
| Most-restrictive-wins | When composing policies, deny always overrides allow |
| Deterministic decisions | Use explicit policy checks for allow/deny/review rather than asking an LLM to decide permissions |
| Pre-flight intent check | Classify intent before tool execution, not after |
| Trust decay | Trust scores should decay over time — require ongoing good behavior |
| Append-only audit | Never modify or delete audit entries — immutability enables compliance |
| Audit redaction | Store enough detail to investigate, but redact secrets, credentials, and unnecessary personal data |
| Structured denials | Return consistent denial reasons so users, operators, and tests can distinguish policy failures |
| Fail closed | If governance check errors, deny the action rather than allowing it |
| Human approval for high-impact actions | Require explicit review for irreversible, external, financial, legal, or privileged operations |
| Separate policy from logic | Governance enforcement should be independent of agent business logic |
Quick Start Checklist
This skill includes copyable local resources:
## Agent Governance Implementation Checklist
### Setup
- [ ] Define governance policy (allowed tools, blocked patterns, rate limits)
- [ ] Choose governance level (open/standard/strict/locked)
- [ ] Set up audit trail storage
### Implementation
- [ ] Add @govern decorator to all tool functions
- [ ] Add intent classification to user input processing
- [ ] Implement trust scoring for multi-agent interactions
- [ ] Wire up audit trail export
### Validation
- [ ] Test that blocked tools are properly denied
- [ ] Test that content filters catch sensitive patterns
- [ ] Test rate limiting behavior
- [ ] Verify audit trail captures all events
- [ ] Test policy composition (most-restrictive-wins)
Related Resources