| name | esaa-security-audit |
| description | Execute deterministic, event-sourced security audits using ESAA-Security's LLM-based agent architecture with 95 checks across 16 security domains |
| triggers | ["run a security audit using ESAA","audit this codebase with ESAA-Security","execute security checks with event sourcing","generate deterministic security report","verify security findings with ESAA","audit AI-generated code for vulnerabilities","run PARCER security playbooks","trace security audit with event store"] |
ESAA-Security Audit Skill
Skill by ara.so — Security Skills collection.
Overview
ESAA-Security applies the Event Sourcing for Autonomous Agents (ESAA) architecture to automated security auditing. It executes structured security audits across 16 security domains with 95 executable checks, governed by an immutable append-only event log. Every finding, classification, and remediation decision is recorded as a verifiable fact.
Key differentiators:
- Deterministic audits — same repository state produces same findings via event replay
- Hallucination prevention — schema-validated outputs with evidence requirements
- Complete audit trail —
.roadmap/activity.jsonl records every check execution
- Governed agents — PARCER contracts enforce decision hygiene and token budgets
- Verifiable reports — SHA-256 hash verification from events to final output
Installation
git clone https://github.com/elzobrito/ESAA-Security.git
cd ESAA-Security
pip install -r requirements.txt
export OPENAI_API_KEY=$YOUR_OPENAI_KEY
export ANTHROPIC_API_KEY=$YOUR_ANTHROPIC_KEY
export AUDIT_TARGET_REPO="/path/to/repo"
Requirements:
- Python 3.9+
- LLM API access (OpenAI GPT-4, Anthropic Claude, or compatible)
- Target repository must be readable by the audit agent
Repository Structure
.roadmap/ # Event sourcing core
├── activity.jsonl # Immutable event store
├── roadmap.json # Derived audit progress
├── issues.json # Structured findings
├── AGENT_CONTRACT.yaml # Agent boundaries
├── ORCHESTRATOR_CONTRACT.yaml # State mutation rules
└── PROJECTION_SPEC.md # Event → state mapping
playbooks/
├── playbooks.security.json # 95 security checks
└── global_input_contract.json # Input requirements
reports/
├── phase1/ # Reconnaissance
├── phase2/ # Domain audits
├── phase3/ # Risk classification
├── phase4/ # Recommendations
└── final/ # Compiled report
Core Concepts
Event Store (activity.jsonl)
Every audit action is an immutable event:
{
"event_id": "evt_001",
"timestamp": "2026-05-14T10:30:00Z",
"event_type": "task.started",
"task_id": "SEC-010",
"phase": "phase2",
"domain": "authentication",
"agent": "agent-impl"
}
{
"event_id": "evt_002",
"timestamp": "2026-05-14T10:32:15Z",
"event_type": "check.completed",
"task_id": "SEC-010",
"check_id": "AU-002",
"status": "fail",
"severity": "high",
"finding": "Password stored without bcrypt/argon2",
"evidence": {
"file": "auth/user.py",
"line": 45,
"code_snippet": "user.password = request.form['password']"
},
"hash": "a3f8b2..."
}
Read Model (roadmap.json)
Projection of audit progress (derived from events):
{
"version": "0.4.0",
"phases": {
"phase1": {
"status": "done",
"tasks": {
"SEC-001": {"status": "done", "output": "reports/phase1/tech-stack.md"}
}
},
"phase2": {
"status": "in_progress",
"domains": {
"authentication": {
"checks_passed": 5,
"checks_failed": 3,
"tasks": ["SEC-010", "SEC-011"]
}
}
}
}
}
Running an Audit
Phase 1: Reconnaissance
from esaa_security import Orchestrator, Agent
orchestrator = Orchestrator(
event_store=".roadmap/activity.jsonl",
roadmap_path=".roadmap/roadmap.json",
target_repo=os.getenv("AUDIT_TARGET_REPO")
)
agent_spec = Agent(
role="agent-spec",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-spec.yaml"
)
recon_tasks = ["SEC-001", "SEC-002", "SEC-003", "SEC-004"]
for task_id in recon_tasks:
result = agent_spec.execute_task(task_id, orchestrator.get_context())
orchestrator.validate_and_append(task_id, result)
Task outputs:
SEC-001: Tech stack inventory (languages, frameworks, dependencies)
SEC-002: Architecture map (components, trust boundaries)
SEC-003: Data flow diagram (inputs, storage, outputs)
SEC-004: Attack surface enumeration (endpoints, file uploads, APIs)
Phase 2: Domain Audit Execution
with open("playbooks/playbooks.security.json") as f:
playbooks = json.load(f)
agent_impl = Agent(
role="agent-impl",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-impl.yaml"
)
auth_checks = ["AU-001", "AU-002", "AU-003", "AU-004", "AU-005", "AU-006", "AU-007", "AU-008"]
for check_id in auth_checks:
playbook = playbooks["checks"][check_id]
result = agent_impl.execute_check(
check_id=check_id,
playbook=playbook,
context=orchestrator.get_context()
)
orchestrator.validate_and_append(
task_id=f"SEC-{check_id}",
result=result
)
Example check result:
{
"check_id": "AU-002",
"status": "fail",
"severity": "critical",
"title": "Weak Password Hashing",
"description": "Passwords stored using SHA-256 instead of bcrypt/argon2",
"evidence": {
"files": ["auth/models.py"],
"lines": [67],
"code": "hashlib.sha256(password.encode()).hexdigest()"
},
"cwe": "CWE-916",
"owasp": "A02:2021 Cryptographic Failures",
"recommendation": "Replace SHA-256 with bcrypt (cost factor 12+)",
"references": [
"https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html"
]
}
Phase 3: Risk Classification
agent_qa = Agent(
role="agent-qa",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-qa.yaml"
)
findings = orchestrator.get_all_findings()
risk_matrix = agent_qa.classify_risks(findings)
with open("reports/phase3/risk-matrix.json", "w") as f:
json.dump(risk_matrix, f, indent=2)
Risk matrix output:
{
"critical": [
{"id": "AU-002", "cvss": 9.1, "exploitability": "high"}
],
"high": [
{"id": "IV-003", "cvss": 7.5, "exploitability": "medium"}
],
"medium": [
{"id": "SH-001", "cvss": 5.3, "exploitability": "low"}
],
"remediation_priority": ["AU-002", "AZ-001", "IV-003"]
}
Phase 4: Report Generation
report = agent_qa.generate_report(
findings=orchestrator.get_all_findings(),
risk_matrix=risk_matrix,
context=orchestrator.get_context()
)
with open("reports/final/security-audit-report.md", "w") as f:
f.write(report["markdown"])
with open("reports/final/security-audit-report.json", "w") as f:
json.dump(report["structured"], f, indent=2)
Event Replay and Verification
from esaa_security import EventReplay, HashVerifier
replayer = EventReplay(event_store=".roadmap/activity.jsonl")
replayed_roadmap = replayer.project_roadmap()
original_hash = HashVerifier.compute_hash(".roadmap/roadmap.json")
replayed_hash = HashVerifier.compute_hash(replayed_roadmap)
assert original_hash == replayed_hash, "Non-deterministic projection detected"
Security Domain Coverage
Critical Domains (8 total)
Secrets & Configuration (SC-001 to SC-008):
playbook = {
"check_id": "SC-001",
"title": "Hardcoded Secrets Detection",
"patterns": [
r'password\s*=\s*["\'][^"\']+["\']',
r'api_key\s*=\s*["\'][^"\']+["\']',
r'AWS_SECRET_ACCESS_KEY'
],
"severity": "critical"
}
Authentication (AU-001 to AU-008):
- Password hashing strength
- MFA enforcement
- Session token generation
- Credential transmission (HTTPS)
Authorization (AZ-001 to AZ-006):
- RBAC implementation
- Privilege escalation checks
- IDOR vulnerabilities
- API authorization
Input Validation (IV-001 to IV-007):
- SQL injection (ORM usage, parameterized queries)
- XSS (output encoding)
- Command injection
- Path traversal
Data Security (DA-001 to DA-005):
- Encryption at rest
- PII handling
- Data retention policies
High Priority Domains (7 total)
AI/LLM Security (AI-001 to AI-005):
playbook = {
"check_id": "AI-001",
"title": "Prompt Injection Defense",
"checks": [
"user_input_sanitization",
"system_prompt_isolation",
"output_validation",
"context_length_limits"
],
"severity": "high"
}
Configuration
Agent Contract (.roadmap/AGENT_CONTRACT.yaml)
agent_impl:
can:
- read: ["**/*.py", "**/*.js", "**/*.java", "config/**"]
- write: ["reports/phase2/**"]
- execute_checks: true
cannot:
- write: [".roadmap/activity.jsonl", ".roadmap/roadmap.json"]
- modify_state: true
- append_events: true
output_schema: "agent_result.schema.json"
token_budget: 8000
PARCER Profile (Token Budgets)
budget:
max_tokens: 8000
per_check: 500
context_window: 4000
fallback:
strategy: "map_reduce"
chunk_size: 2000
validation:
require_evidence: true
require_cwe_mapping: true
schema: "agent_result.schema.json"
Programmatic Usage
Custom Audit Pipeline
from esaa_security import AuditPipeline, SecurityDomain
domains = [
SecurityDomain.AUTHENTICATION,
SecurityDomain.AUTHORIZATION,
SecurityDomain.INPUT_VALIDATION,
SecurityDomain.AI_LLM_SECURITY
]
pipeline = AuditPipeline(
target_repo=os.getenv("AUDIT_TARGET_REPO"),
domains=domains,
event_store=".roadmap/activity.jsonl"
)
for event in pipeline.execute_streaming():
if event["event_type"] == "check.completed":
print(f"✓ {event['check_id']}: {event['status']}")
elif event["event_type"] == "finding.detected":
print(f"⚠ {event['severity']}: {event['title']}")
report = pipeline.get_report()
Query Event Store
from esaa_security import EventQuery
query = EventQuery(".roadmap/activity.jsonl")
critical = query.filter(
event_type="check.completed",
status="fail",
severity="critical"
).to_list()
coverage = query.aggregate_by("domain")
timeline = query.timeline(group_by="1h")
Common Patterns
Incremental Audit (Skip Completed)
roadmap = orchestrator.load_roadmap()
for task_id in all_tasks:
if roadmap.get_task_status(task_id) == "done":
print(f"Skip {task_id} (already completed)")
continue
result = agent.execute_task(task_id)
orchestrator.validate_and_append(task_id, result)
Parallel Domain Execution
from concurrent.futures import ThreadPoolExecutor
def audit_domain(domain_name, checks):
agent = Agent(role="agent-impl")
results = []
for check_id in checks:
result = agent.execute_check(check_id)
results.append(result)
return domain_name, results
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(audit_domain, domain, checks): domain
for domain, checks in domain_map.items()
}
for future in as_completed(futures):
domain, results = future.result()
for result in results:
orchestrator.validate_and_append(result)
Export to SARIF
from esaa_security import SARIFExporter
exporter = SARIFExporter(event_store=".roadmap/activity.jsonl")
sarif = exporter.to_sarif()
with open("security-audit.sarif", "w") as f:
json.dump(sarif, f, indent=2)
Troubleshooting
Issue: Schema Validation Failure
from jsonschema import validate
import json
with open(".roadmap/agent_result.schema.json") as f:
schema = json.load(f)
with open("reports/phase2/results/SEC-010.json") as f:
result = json.load(f)
validate(instance=result, schema=schema)
Issue: Event Store Corruption
from esaa_security import EventStoreValidator
validator = EventStoreValidator(".roadmap/activity.jsonl")
errors = validator.validate()
if errors:
print("Corrupt events:")
for err in errors:
print(f"Line {err['line']}: {err['message']}")
else:
print("✓ Event store valid")
Issue: Non-Deterministic Replay
from esaa_security import ReplayDebugger
debugger = ReplayDebugger(
event_store=".roadmap/activity.jsonl",
expected_roadmap=".roadmap/roadmap.json"
)
divergent_event = debugger.find_divergence()
print(f"Divergence at event: {divergent_event['event_id']}")
print(f"Expected: {divergent_event['expected_state']}")
print(f"Actual: {divergent_event['actual_state']}")
Issue: Agent Exceeds Token Budget
fallback:
strategy: "map_reduce"
chunk_size: 2000
max_chunks: 10
budget:
context_window: 3000
Issue: Missing Evidence in Findings
{
"check_id": "IV-001",
"status": "fail",
"evidence": {
"file": "api/routes.py",
"line": 23,
"code_snippet": "query = f\"SELECT * FROM users WHERE id={user_id}\""
}
}
Integration with CI/CD
GitHub Actions
name: ESAA Security Audit
on: [push, pull_request]
jobs:
audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run ESAA-Security Audit
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
AUDIT_TARGET_REPO: ${{ github.workspace }}
run: |
pip install -r requirements.txt
python orchestrator.py --full-audit
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: security-audit.sarif
- name: Fail on Critical Findings
run: |
python -c "import json; \
report = json.load(open('reports/final/security-audit-report.json')); \
exit(1 if report['critical_count'] > 0 else 0)"
Best Practices
- Always verify event store integrity before generating reports
- Use deterministic replay to validate audit reproducibility
- Configure token budgets per agent role to prevent runaway costs
- Enable Map-Reduce fallback for large repositories (>10k LOC)
- Review PARCER profiles to adjust validation strictness
- Export to SARIF for GitHub Security tab integration
- Archive
.roadmap/ directory for audit forensics
References