一键导入
audit-trail
Structured audit event logging for compliance, forensics, and activity tracking — 7-year retention per Privacy Act 1988
用 Codex 或 Claude 帮你安装 复制这段 Prompt,粘贴到 Codex、Claude 或其他助手里,让它检查 Skill 页面并帮你完成安装。
菜单
Structured audit event logging for compliance, forensics, and activity tracking — 7-year retention per Privacy Act 1988
用 Codex 或 Claude 帮你安装 复制这段 Prompt,粘贴到 Codex、Claude 或其他助手里,让它检查 Skill 页面并帮你完成安装。
基于 SOC 职业分类
Apply this skill for Unite-Hub Supabase migrations, PostgREST/Data API visibility, founder-scoped Playwright journeys, or errors such as PGRST205, access=denied, stale Supabase linked refs, or migration history drift. Prevents repeating the SQL/cache/auth loop by enforcing the exact verification sequence for core journeys.
The compass for Unite-Hub's road to /shipit. Defines the single NorthStar (a real, comprehensive, working founder CRM in production, every section GREEN), the binding definition of GREEN, and the No-Invaders Manifest that keeps the build honest and surgical. Consult BEFORE deciding what to build/skip/finish — it resolves "200 ≠ real" temptations and scope-creep pressure. P1, auto-loaded.
Apply this skill WHEN scaffolding a new cron "pull" route that syncs external/derived data into Supabase on a schedule (Vercel cron). Encodes the Unite-Hub cron invariants: CRON_SECRET auth, FOUNDER_USER_ID actor, overlap safety, idempotent upsert, last-sync timestamp, and failure surfacing. Generic `cron-scheduler` covers scheduling; this covers the PULL handler body. P3.
Apply this skill WHEN verifying that a route, page, or integration serves REAL data and not silent mock/placeholder data. Detects the "false-green" failure mode: an endpoint returns 200 (or a page renders) while the underlying data is fabricated because a provider is unconnected. Trigger WHENEVER classifying a section's readiness, reviewing integration wrappers, or before marking anything GREEN. P2 — load on audit/verify tasks.
Manifest-first context isolation — each subagent receives only its scope, never the full codebase
Apply this skill for ANY decision with non-obvious tradeoffs: architectural choices, debugging without a clear root cause, performance strategies, security decisions, feature design with competing constraints, refactoring scope decisions. Forces multi-perspective analysis before committing to a solution. P1 auto-load — always active on complex reasoning tasks.
| name | audit-trail |
| type | skill |
| version | 1.0.0 |
| priority | 2 |
| domain | security |
| description | Structured audit event logging for compliance, forensics, and activity tracking — 7-year retention per Privacy Act 1988 |
Structured audit event logging for compliance, forensics, and activity tracking in NodeJS-Starter-V1.
| Field | Value |
|---|---|
| Skill ID | audit-trail |
| Category | Authentication & Security |
| Complexity | Medium |
| Complements | error-taxonomy, structured-logging, health-check |
| Version | 1.0.0 |
| Locale | en-AU |
Codifies structured audit trail patterns for NodeJS-Starter-V1: immutable audit event logging with Pydantic models, PostgreSQL-backed audit log table, FastAPI middleware for request/response capture, authentication event tracking, agent activity correlation, retention policies with time-based archival, and query APIs for compliance reporting.
AgentEventPublisherstructured-logging skill instead)AgentEventPublisher in src/state/events.py)EvidenceCollector in apps/web/lib/audit/)metrics-collector skill instead)correlation_id that links it to the originating request, user session, and (if applicable) agent run.from datetime import datetime
from enum import Enum
from typing import Any
from uuid import uuid4
from pydantic import BaseModel, Field
class AuditAction(str, Enum):
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
LOGIN = "login"
LOGOUT = "logout"
LOGIN_FAILED = "login_failed"
PERMISSION_DENIED = "permission_denied"
EXPORT = "export"
ESCALATE = "escalate"
class AuditEvent(BaseModel):
"""Immutable audit event record."""
id: str = Field(default_factory=lambda: f"audit_{uuid4().hex[:12]}")
timestamp: datetime = Field(default_factory=datetime.utcnow)
action: AuditAction
resource_type: str # e.g., "user", "document", "agent_run"
resource_id: str | None = None
actor_id: str | None = None # user ID or agent ID
actor_type: str = "user" # "user" | "agent" | "system"
correlation_id: str | None = None
ip_address: str | None = None
user_agent: str | None = None
details: dict[str, Any] = Field(default_factory=dict)
outcome: str = "success" # "success" | "failure" | "denied"
metadata: dict[str, Any] = Field(default_factory=dict)
Project Reference: apps/backend/src/state/events.py — the existing AgentEventPublisher tracks agent run lifecycle events. The AuditEvent model complements this by capturing user-initiated and system-level actions that fall outside agent runs.
| Action | When to Emit | Outcome Values |
|---|---|---|
create | New record inserted | success, failure |
read | Sensitive data accessed | success, denied |
update | Record modified | success, failure |
delete | Record removed | success, failure |
login | Successful authentication | success |
login_failed | Failed authentication | failure |
logout | User session ended | success |
permission_denied | Unauthorised access attempt | denied |
export | Data exported or downloaded | success, failure |
escalate | Agent escalated to human | success |
CREATE TABLE IF NOT EXISTS audit_log (
id VARCHAR(50) PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
action VARCHAR(30) NOT NULL,
resource_type VARCHAR(50) NOT NULL,
resource_id VARCHAR(100),
actor_id VARCHAR(100),
actor_type VARCHAR(20) NOT NULL DEFAULT 'user',
correlation_id VARCHAR(100),
ip_address INET,
user_agent TEXT,
details JSONB DEFAULT '{}'::JSONB,
outcome VARCHAR(20) NOT NULL DEFAULT 'success',
metadata JSONB DEFAULT '{}'::JSONB
);
-- Indexes for common query patterns
CREATE INDEX idx_audit_timestamp ON audit_log(timestamp DESC);
CREATE INDEX idx_audit_actor ON audit_log(actor_id, timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_log(resource_type, resource_id);
CREATE INDEX idx_audit_action ON audit_log(action);
CREATE INDEX idx_audit_correlation ON audit_log(correlation_id);
CREATE INDEX idx_audit_outcome ON audit_log(outcome)
WHERE outcome != 'success';
-- Partition by month for retention management (optional)
-- CREATE TABLE audit_log_2026_02 PARTITION OF audit_log
-- FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Project Reference: scripts/init-db.sql — add this table after SECTION 6 (Utility Views). No existing audit tables in the schema. The audit_evidence table referenced in apps/web/lib/audit/evidence-collector.ts is a Supabase-only table for frontend evidence, not backend audit events.
| Index | Query Pattern |
|---|---|
idx_audit_timestamp | Recent events: ORDER BY timestamp DESC LIMIT 100 |
idx_audit_actor | User activity: WHERE actor_id = ? ORDER BY timestamp DESC |
idx_audit_resource | Resource history: WHERE resource_type = ? AND resource_id = ? |
idx_audit_correlation | Request tracing: WHERE correlation_id = ? |
idx_audit_outcome | Security review: WHERE outcome = 'denied' (partial index) |
from src.utils import get_logger
logger = get_logger(__name__)
class AuditTrail:
"""Append-only audit event emitter."""
def __init__(self, store) -> None:
self.store = store
async def emit(self, event: AuditEvent) -> None:
"""Write audit event to database."""
try:
self.store.client.table("audit_log").insert(
event.model_dump(mode="json")
).execute()
except Exception as exc:
# Audit failures must never crash the application.
# Log and continue — investigate separately.
logger.error(
"audit_write_failed",
event_id=event.id,
action=event.action,
error=str(exc),
)
async def emit_batch(self, events: list[AuditEvent]) -> None:
"""Write multiple audit events in a single transaction."""
rows = [e.model_dump(mode="json") for e in events]
try:
self.store.client.table("audit_log").insert(rows).execute()
except Exception as exc:
logger.error(
"audit_batch_write_failed",
count=len(events),
error=str(exc),
)
Rule: Audit writes must never raise exceptions to the caller. A failed audit write is logged and investigated separately — it must not block the business operation that triggered it.
Complements: structured-logging skill — audit failures are logged via structlog with structured fields for monitoring and alerting.
import time
from uuid import uuid4
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from src.auth.jwt import extract_user_email
from src.state.supabase import SupabaseStateStore
class AuditMiddleware(BaseHTTPMiddleware):
"""Capture audit events for state-changing API requests."""
AUDITABLE_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
async def dispatch(self, request: Request, call_next):
# Skip non-auditable methods
if request.method not in self.AUDITABLE_METHODS:
return await call_next(request)
correlation_id = request.headers.get(
"x-correlation-id", f"req_{uuid4().hex[:12]}"
)
start_time = time.monotonic()
# Extract actor from JWT
token = request.cookies.get("access_token")
actor_id = extract_user_email(token) if token else None
response: Response = await call_next(request)
duration_ms = (time.monotonic() - start_time) * 1000
# Emit audit event
trail = AuditTrail(SupabaseStateStore())
await trail.emit(AuditEvent(
action=self._method_to_action(request.method),
resource_type=self._extract_resource(request.url.path),
actor_id=actor_id,
correlation_id=correlation_id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
outcome="success" if response.status_code < 400 else "failure",
details={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2),
},
))
# Propagate correlation ID in response
response.headers["x-correlation-id"] = correlation_id
return response
@staticmethod
def _method_to_action(method: str) -> AuditAction:
return {"POST": AuditAction.CREATE, "PUT": AuditAction.UPDATE,
"PATCH": AuditAction.UPDATE, "DELETE": AuditAction.DELETE,
}.get(method, AuditAction.UPDATE)
@staticmethod
def _extract_resource(path: str) -> str:
parts = path.strip("/").split("/")
return parts[1] if len(parts) > 1 else "unknown"
Project Reference: apps/backend/src/api/main.py — add app.add_middleware(AuditMiddleware) after existing middleware.
For fine-grained control beyond middleware, use an @audited(resource_type, action) decorator that attaches _audit_resource and _audit_action attributes to endpoint functions. The middleware reads these attributes to override auto-detected values.
from src.auth.jwt import verify_password, create_access_token
async def login_with_audit(
email: str,
password: str,
ip_address: str | None,
trail: AuditTrail,
) -> str | None:
"""Authenticate user and emit audit event."""
user = await get_user_by_email(email)
if not user or not verify_password(password, user["password_hash"]):
await trail.emit(AuditEvent(
action=AuditAction.LOGIN_FAILED,
resource_type="auth",
actor_id=email,
ip_address=ip_address,
outcome="failure",
details={"reason": "invalid_credentials"},
))
return None
token = create_access_token({"sub": email})
await trail.emit(AuditEvent(
action=AuditAction.LOGIN,
resource_type="auth",
resource_id=str(user["id"]),
actor_id=email,
ip_address=ip_address,
outcome="success",
))
return token
Project Reference: apps/backend/src/auth/jwt.py — current auth module has no audit logging. The login_with_audit wrapper adds audit events without modifying the existing JWT functions.
| Event | Action | Priority |
|---|---|---|
| Successful login | login | Always |
| Failed login | login_failed | Always |
| Logout | logout | Always |
| Password change | update | Always |
| Permission denied (403) | permission_denied | Always |
| Admin action | create/update/delete | Always |
| Data export | export | Always |
| Token refresh | update | Optional |
| Profile view | read | Optional |
class AuditedEventPublisher(AgentEventPublisher):
"""Extends AgentEventPublisher with audit trail correlation."""
def __init__(self, trail: AuditTrail) -> None:
super().__init__()
self.trail = trail
async def start_run(self, **kwargs) -> str:
run_id = await super().start_run(**kwargs)
await self.trail.emit(AuditEvent(
action=AuditAction.CREATE,
resource_type="agent_run",
resource_id=run_id,
actor_id=kwargs.get("user_id"),
actor_type="system",
correlation_id=kwargs.get("task_id"),
details={"agent_name": kwargs.get("agent_name")},
))
return run_id
async def escalate_run(self, run_id: str, reason: str, **kwargs):
await super().escalate_run(run_id, reason, **kwargs)
await self.trail.emit(AuditEvent(
action=AuditAction.ESCALATE,
resource_type="agent_run",
resource_id=run_id,
actor_type="agent",
outcome="success",
details={"reason": reason},
))
Project Reference: apps/backend/src/state/events.py:39-313 — AgentEventPublisher handles real-time status updates for the frontend. AuditedEventPublisher extends it to write immutable audit records for compliance without duplicating the real-time functionality.
| Data Classification | Retention Period | Archival |
|---|---|---|
| Security events (login, permission_denied) | 365 days | Archive to cold storage |
| State-changing operations (create, update, delete) | 90 days | Archive to cold storage |
| Read access events | 30 days | Delete after expiry |
| Agent activity events | 90 days | Archive to cold storage |
Implement cleanup_audit_log(store, retention_days=90, security_retention_days=365) as an async function that:
retention_days using .delete().lt("timestamp", cutoff).not_.in_("action", security_actions)security_retention_days using .delete().lt("timestamp", security_cutoff).in_("action", security_actions){"non_security_deleted": int, "security_deleted": int}Complements: cron-scheduler skill — schedule cleanup_audit_log as a daily cron job with CRON_SECRET authentication. health-check skill — add audit log table size to the /ready endpoint.
Project Reference: apps/web/lib/audit/evidence-collector.ts:116-137 — the frontend EvidenceCollector already implements retention policies with category-based durations. Follow the same pattern for backend audit events.
Create a GET /api/audit/events endpoint on APIRouter(prefix="/api/audit", tags=["audit"]) with these query parameters:
| Parameter | Type | Description |
|---|---|---|
actor_id | str | None | Filter by actor |
resource_type | str | None | Filter by resource type |
resource_id | str | None | Filter by specific resource |
action | str | None | Filter by action enum |
outcome | str | None | Filter by outcome |
from_date / to_date | datetime | None | Date range |
correlation_id | str | None | Trace a request chain |
limit | int (default 50, max 200) | Pagination |
offset | int (default 0) | Pagination offset |
Chain Supabase .eq() / .gte() / .lte() filters, order by timestamp DESC, apply .range(offset, offset + limit - 1), return {"events": data, "count": len(data)}.
Rule: The audit query endpoint must be restricted to admin users. Never expose audit logs to non-admin users — they may contain IP addresses, user agents, and other sensitive metadata.
| Pattern | Problem | Correct Approach |
|---|---|---|
| Mutable audit records (UPDATE/DELETE) | Destroys forensic evidence | Append-only writes; cleanup via privileged process |
| Audit writes blocking business logic | Application errors on audit failure | Fire-and-forget with error logging |
| No correlation ID | Cannot trace events across services | Propagate x-correlation-id header |
Logging everything as read | Noise drowns out security signals | Only audit sensitive reads (PII, credentials) |
| Storing raw request bodies | PII exposure, storage bloat | Store action + resource + outcome only |
| Same retention for all events | Security events deleted too early | Category-based retention (security = 365d) |
| Audit endpoint without auth | Audit data leaks to unauthorised users | Admin-only access with JWT verification |
| Synchronous audit in hot path | Latency on every request | Async writes; consider background queue |
Before merging audit-trail changes:
audit_log table created with correct indexesAuditEvent Pydantic model validates all required fieldsAuditTrail.emit() never raises exceptions to callerscorrelation_id propagated via x-correlation-id headerAgentEventPublisher to audit trailWhen applying this skill, structure implementation as:
### Audit Trail Implementation
**Storage**: [PostgreSQL / Supabase]
**Event Model**: AuditEvent (Pydantic)
**Capture Method**: [middleware / decorator / explicit]
**Auth Events**: [login, logout, login_failed, permission_denied]
**Agent Correlation**: [AuditedEventPublisher / direct emit]
**Retention**: security=[days], operations=[days], reads=[days]
**Query API**: /api/audit/events (admin-only)
**Cleanup**: [cron / manual / scheduled]