| name | foundry-security-spec |
| description | Implement Cisco's Foundry specification for agentic AI security evaluation systems with multi-agent architecture |
| triggers | ["implement foundry security evaluation","set up agentic security testing","create foundry spec system","build security evaluation agents","configure foundry security roles","design ai vulnerability discovery","implement foundry detector agents","create security finding workflow"] |
Foundry Security Spec
Skill by ara.so — Security Skills collection.
Foundry is an open specification from Cisco for building agentic AI security evaluation systems. It defines a multi-agent architecture with 8 core roles and 5 extension roles that coordinate to discover, validate, and report security findings. This is NOT a tool to install—it's a blueprint for building your own security evaluation system.
Core Concepts
Foundry provides:
- Architecture: 8 core agent roles (Orchestrator, Planner, Navigator, Detector, Explorer, Validator, Investigator, Publisher)
- Finding Lifecycle: States, verdicts, evidence gates, fingerprinting
- Coordination Model: Atomic claims, heartbeat liveness, auto-blocking
- Governance: Sandboxing, budgets, yield-gated auto-stop, coverage gates
- Detection-to-Prevention Flywheel: Rules catch known issues, explorers find new ones, gaps become new rules
Works with CodeGuard rule format for portable detection rules that transfer between evaluation and prevention.
Repository Structure
foundry-security-spec/
├── spec.md # Main specification (~130 functional requirements)
├── constitution.md # 11 inviolable principles
├── GLOSSARY.md # Terminology reference
└── README.md # Implementation guide
Implementation Workflow
Step 1: Read the Constitution
cat constitution.md
Key principles to understand:
- No unsupervised execution: Every finding requires explicit confirmation
- Evidence-gated findings: Claims without evidence don't become findings
- Reproducibility: Every finding must be reproducible from its evidence
- Atomic progress: Claims are indivisible units of work
- Fail-safe defaults: When stuck, escalate or yield—never guess
Step 2: Install spec-kit
npm install -g @github/spec-kit
cd your-security-eval-project
speckit init
This creates .specify/ directory for spec-driven development.
Step 3: Register the Constitution
cp path/to/foundry-security-spec/constitution.md .specify/memory/constitution.md
/speckit.constitution
Step 4: Seed Your Specification
mkdir -p specs/001-foundry
cp path/to/foundry-security-spec/spec.md specs/001-foundry/spec.md
Step 5: Clarify for Your Environment
/speckit.clarify
Answer questions in these categories:
Identity & Scope:
Q: What is your system name?
A: acme-security-eval
Q: Does "authorized evaluation with source access" hold?
A: yes
Q: Merge, split, or keep the 8 core roles as-is?
A: keep as-is for first implementation
Integration Choices:
Q: Version control system?
A: GitLab self-hosted at https://gitlab.internal
Q: Issue tracker?
A: Jira at https://jira.acme.com
Q: LLM provider?
A: OpenAI via internal gateway at https://llm.internal/v1
Q: Datastore?
A: PostgreSQL
Q: Isolation runtime?
A: Docker containers with network isolation
Q: Deployment target?
A: Kubernetes cluster
Policy Choices:
Q: Severity taxonomy?
A: Critical/High/Medium/Low matching our existing CVE scale
Q: Surface needs-review findings?
A: No, validator rejects inconclusive findings
Q: Label naming convention?
A: foundry:role/name format
Extension Scope (recommend NO for first build):
Q: Include Attack-Mapper role?
A: no
Q: Include Regression-Tracker role?
A: no
Q: Include Compliance-Mapper role?
A: no
Q: Include Impact-Assessor role?
A: no
Q: Include Remediation-Drafter role?
A: no
Step 6: Generate Your Specification
/speckit.specify
/speckit.clarify
Your specs/001-foundry/spec.md now contains YOUR specification with decisions filled in.
Step 7: Implement
/speckit.plan
/speckit.tasks
/speckit.implement
Agent Role Implementation Examples
Orchestrator Pattern
import asyncio
from typing import List, Dict
from datastore import FindingStore, ClaimStore
from agents import Planner, Detector, Explorer, Validator
class Orchestrator:
def __init__(self,
llm_client,
finding_store: FindingStore,
claim_store: ClaimStore,
budget_manager):
self.llm = llm_client
self.findings = finding_store
self.claims = claim_store
self.budget = budget_manager
self.planner = Planner(llm_client)
self.detector = Detector(llm_client, rules_corpus)
self.explorer = Explorer(llm_client)
self.validator = Validator(llm_client)
async def evaluate(self, target_repo: str) -> Dict:
"""Run complete evaluation coordinating all agents."""
eval_id = await self.findings.create_evaluation(
target=target_repo,
status="running"
)
try:
plan = await self.planner.create_plan(target_repo)
await self.claims.store_plan(eval_id, plan)
detection_task = asyncio.create_task(
self.run_detection(eval_id, plan)
)
exploration_task = asyncio.create_task(
self.run_exploration(eval_id, plan)
)
monitor_task = asyncio.create_task(
self.monitor_health(eval_id)
)
await asyncio.gather(
detection_task,
exploration_task,
monitor_task
)
coverage = await self.calculate_coverage(eval_id)
if coverage < plan.required_coverage:
raise InsufficientCoverageError(
f"Coverage {coverage}% < required {plan.required_coverage}%"
)
await self.findings.update_evaluation(
eval_id,
status="complete",
coverage=coverage
)
return {
"eval_id": eval_id,
"status": "complete",
"findings": await self.findings.count(eval_id),
"coverage": coverage
}
except Exception as e:
await self.findings.update_evaluation(
eval_id,
status="failed",
error=str(e)
)
raise
async def monitor_health(self, eval_id: str):
"""Monitor agent heartbeats and budgets."""
while True:
await asyncio.sleep(30)
stalled = await self.claims.find_stalled_claims(
eval_id,
heartbeat_threshold=300
)
for claim in stalled:
await self.claims.block_claim(
claim.id,
reason="heartbeat_timeout"
)
if await self.budget.is_exhausted(eval_id):
await self.findings.update_evaluation(
eval_id,
status="budget_exhausted"
)
break
Detector with CodeGuard Rules
from typing import List
from codeguard import RuleEngine, Finding as CodeGuardFinding
from models import Claim, Finding
class Detector:
def __init__(self, llm_client, rules_corpus_path: str):
self.llm = llm_client
self.rule_engine = RuleEngine.load(rules_corpus_path)
async def process_claim(self, claim: Claim) -> List[Finding]:
"""Apply detection rules to a code claim."""
code_units = await self.extract_code_units(claim)
findings = []
for unit in code_units:
rule_hits = await self.rule_engine.evaluate(
code=unit.content,
context=unit.context,
language=unit.language
)
for hit in rule_hits:
finding = Finding(
claim_id=claim.id,
rule_id=hit.rule_id,
severity=hit.severity,
weakness_id=hit.cwe_id,
location=hit.location,
evidence={
"rule_match": hit.matched_pattern,
"code_snippet": unit.content,
"line_range": hit.line_range
},
verdict="confirmed",
status="validated"
)
findings.append(finding)
await self.record_coverage(claim.id, unit.path)
return findings
async def extract_code_units(self, claim: Claim):
"""Use LLM to identify relevant code units in claim scope."""
prompt = f"""
Claim: {claim.description}
Scope: {claim.scope}
Identify all code units (functions, methods, classes) that should be
evaluated for security issues related to this claim.
Return as JSON array with: path, name, start_line, end_line
"""
response = await self.llm.complete(prompt)
return parse_code_units(response)
Explorer for Novel Issues
import asyncio
from typing import List, Optional
from models import Claim, Finding, RuleGap
class Explorer:
def __init__(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def investigate_claim(self, claim: Claim) -> List[Finding]:
"""Creative exploration beyond static rules."""
findings = []
hypotheses = await self.generate_hypotheses(claim)
for hypothesis in hypotheses:
async with self.sandbox.session() as session:
result = await self.test_hypothesis(
session,
hypothesis,
claim
)
if result.is_vulnerability:
if not result.has_reproduction:
continue
finding = Finding(
claim_id=claim.id,
severity=result.severity,
weakness_id=result.weakness_id,
description=result.description,
evidence=result.evidence,
verdict="needs-validation",
status="pending"
)
findings.append(finding)
if await self.should_have_detected(finding):
await self.record_rule_gap(finding)
return findings
async def generate_hypotheses(self, claim: Claim) -> List[Dict]:
"""Use LLM to generate creative test hypotheses."""
prompt = f"""
You are exploring code for security issues that static rules may miss.
Claim: {claim.description}
Code scope: {claim.scope}
Generate 3-5 security hypotheses to test:
- Focus on logic bugs, state confusion, race conditions
- Consider what rules can't express (context-dependent issues)
- Prioritize high-impact scenarios
For each hypothesis provide:
- What to test
- Why it might be vulnerable
- How to reproduce if vulnerable
Return as JSON array.
"""
response = await self.llm.complete(
prompt,
temperature=0.7
)
return parse_hypotheses(response)
async def record_rule_gap(self, finding: Finding):
"""Record that rules failed to detect this issue."""
gap = RuleGap(
finding_id=finding.id,
weakness_id=finding.weakness_id,
pattern=finding.evidence.get("vulnerable_pattern"),
reason="explorer_found_missed_by_detector",
suggested_rule=await self.draft_rule(finding)
)
await self.rule_gaps.store(gap)
Validator for Finding Confirmation
from models import Finding, ValidationResult
class Validator:
def __init__(self, llm_client, sandbox_runtime):
self.llm = llm_client
self.sandbox = sandbox_runtime
async def validate_finding(self, finding: Finding) -> ValidationResult:
"""Reproduce and confirm finding from evidence."""
if not self.has_sufficient_evidence(finding):
return ValidationResult(
verdict="rejected",
reason="insufficient_evidence"
)
async with self.sandbox.session() as session:
reproduced = await self.reproduce_issue(
session,
finding.evidence
)
if not reproduced:
return ValidationResult(
verdict="rejected",
reason="not_reproducible"
)
actual_severity = await self.assess_severity(
session,
finding
)
if actual_severity != finding.severity:
finding.severity = actual_severity
finding.evidence["severity_adjustment"] = {
"original": finding.severity,
"validated": actual_severity
}
fingerprint = await self.generate_fingerprint(finding)
return ValidationResult(
verdict="confirmed",
fingerprint=fingerprint,
severity=actual_severity,
reproduction_evidence=session.get_transcript()
)
def has_sufficient_evidence(self, finding: Finding) -> bool:
"""Check if finding has required evidence."""
required = ["location", "description"]
if finding.severity in ["critical", "high"]:
required.extend(["reproduction_steps", "impact"])
return all(k in finding.evidence for k in required)
async def generate_fingerprint(self, finding: Finding) -> str:
"""Create stable fingerprint for deduplication."""
components = [
finding.weakness_id,
finding.location.get("file_path"),
finding.location.get("function_name"),
finding.evidence.get("root_cause_pattern")
]
fingerprint_input = "|".join(str(c) for c in components if c)
return hashlib.sha256(fingerprint_input.encode()).hexdigest()[:16]
Publisher for Issue Tracker Integration
import aiohttp
from typing import Dict
from models import Finding
class Publisher:
def __init__(self, issue_tracker_config: Dict):
self.tracker_url = issue_tracker_config["url"]
self.project_key = issue_tracker_config["project"]
self.api_token = issue_tracker_config["token"]
async def publish_finding(self, finding: Finding) -> str:
"""Create issue in tracker for confirmed finding."""
if finding.verdict != "confirmed":
raise ValueError(f"Cannot publish {finding.verdict} finding")
existing = await self.find_existing_issue(finding.fingerprint)
if existing:
return existing.issue_id
issue_body = self.format_issue(finding)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.tracker_url}/rest/api/2/issue",
headers={
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
},
json=issue_body
) as resp:
resp.raise_for_status()
result = await resp.json()
issue_id = result["key"]
finding.issue_id = issue_id
finding.status = "published"
await finding.save()
return issue_id
def format_issue(self, finding: Finding) -> Dict:
"""Format finding as issue tracker ticket."""
description = f"""
*Security Finding from Foundry Evaluation*
*Severity:* {finding.severity.upper()}
*Weakness:* {finding.weakness_id}
*Location:* {finding.location.get('file_path')}:{finding.location.get('line_number')}
h3. Description
{finding.description}
h3. Evidence
{self.format_evidence(finding.evidence)}
h3. Reproduction
{finding.evidence.get('reproduction_steps', 'See evidence above')}
---
Fingerprint: {finding.fingerprint}
Evaluation ID: {finding.eval_id}
"""
return {
"fields": {
"project": {"key": self.project_key},
"summary": f"[{finding.severity.upper()}] {finding.weakness_id}: {finding.get_short_description()}",
"description": description,
"issuetype": {"name": "Security Vulnerability"},
"priority": {"name": self.map_severity_to_priority(finding.severity)},
"labels": [
f"foundry:eval:{finding.eval_id}",
f"foundry:weakness:{finding.weakness_id}",
f"foundry:fingerprint:{finding.fingerprint}"
]
}
}
Configuration Examples
Evaluation Configuration
evaluation:
name: "acme-security-eval"
orchestrator:
max_concurrent_claims: 10
heartbeat_interval: 60
stall_threshold: 300
budget:
max_tokens: 10000000
max_duration_hours: 48
per_agent_token_limit: 1000000
coverage:
required_percentage: 80
scope:
- src/**/*.py
- lib/**/*.js
exclude:
- tests/**
- docs/**
agents:
core:
- orchestrator
- planner
- navigator
- detector
- explorer
- validator
- investigator
- publisher
extensions: []
integrations:
llm:
provider: "openai"
endpoint: "${LLM_GATEWAY_URL}"
model: "gpt-4"
api_key: "${LLM_API_KEY}"
vcs:
type: "gitlab"
url: "${GITLAB_URL}"
token: "${GITLAB_TOKEN}"
issue_tracker:
type: "jira"
url: "${JIRA_URL}"
project: "SEC"
token: "${JIRA_TOKEN}"
datastore:
type: "postgresql"
connection_string: "${DATABASE_URL}"
sandbox:
runtime: "docker"
network_isolation: true
timeout_seconds: 300
resource_limits:
cpu: "1"
memory: "2Gi"
Detection Rules Configuration
detection:
rules_corpus: "/etc/foundry/rules/codeguard"
enabled_categories:
- injection
- authentication
- authorization
- cryptography
- data-exposure
- configuration
severity_mapping:
critical: ["CWE-89", "CWE-78", "CWE-79"]
high: ["CWE-306", "CWE-862"]
medium: ["CWE-327", "CWE-338"]
low: ["CWE-209", "CWE-532"]
rule_gaps:
enabled: true
auto_draft_rules: true
review_queue: "rule-improvements"
Running an Evaluation
import asyncio
from orchestrator import Orchestrator
from config import load_config
async def main():
config = load_config("config/evaluation.yaml")
orchestrator = Orchestrator(
llm_client=create_llm_client(config.integrations.llm),
finding_store=FindingStore(config.integrations.datastore),
claim_store=ClaimStore(config.integrations.datastore),
budget_manager=BudgetManager(config.budget)
)
result = await orchestrator.evaluate(
target_repo="https://gitlab.internal/acme/webapp"
)
print(f"Evaluation {result['eval_id']} complete")
print(f"Findings: {result['findings']}")
print(f"Coverage: {result['coverage']}%")
if __name__ == "__main__":
asyncio.run(main())
Common Patterns
Atomic Claim Processing
async def process_claim(self, claim: Claim):
if not await self.claims.try_claim(claim.id, self.agent_id):
return
try:
heartbeat_task = asyncio.create_task(
self.send_heartbeats(claim.id)
)
result = await self.do_work(claim)
await self.claims.complete(claim.id, result)
except Exception as e:
await self.claims.fail(claim.id, str(e))
finally:
heartbeat_task.cancel()
Evidence-Gated Finding Creation
def create_finding(self, claim: Claim, issue: Dict) -> Optional[Finding]:
if not issue.get("location"):
logger.warning(f"No location for issue in {claim.id}, skipping")
return None
if not issue.get("reproduction"):
logger.warning(f"No reproduction for issue in {claim.id}, skipping")
return None
return Finding(
claim_id=claim.id,
location=issue["location"],
evidence={
"reproduction": issue["reproduction"],
"impact": issue["impact"],
"code_snippet": issue["code"]
},
verdict="needs-validation"
)
Budget Enforcement
class BudgetManager:
async def check_budget(self, eval_id: str, tokens_requested: int) -> bool:
"""Check if budget allows operation."""
used = await self.get_tokens_used(eval_id)
limit = self.config.max_tokens
if used + tokens_requested > limit:
await self.notify_budget_exhausted(eval_id)
return False
return True
async def record_usage(self, eval_id: str, tokens: int):
"""Record token usage."""
await self.db.execute(
"INSERT INTO token_usage (eval_id, tokens, timestamp) VALUES ($1, $2, NOW())",
eval_id, tokens
)
Troubleshooting
Agents Not Finding Issues
Check rule corpus:
foundry-ctl list-rules --corpus /etc/foundry/rules/codeguard
foundry-ctl test-rule CWE-89 --file sample.py
Check explorer creativity:
hypotheses = await self.llm.complete(
prompt,
temperature=0.8
)
Claims Stalling
Check heartbeat configuration:
HEARTBEAT_INTERVAL = 60
STALL_THRESHOLD = 300
assert HEARTBEAT_INTERVAL < STALL_THRESHOLD / 2
Check for deadlocks:
SELECT claim_id, agent_id, last_heartbeat, status
FROM claims
WHERE status = 'in_progress'
AND last_heartbeat < NOW() - INTERVAL '5 minutes';
Findings Not Publishing
Check verdict state:
if finding.verdict != "confirmed":
logger.error(f"Cannot publish {finding.id}, verdict={finding.verdict}")
Check deduplication:
f1 = generate_fingerprint(finding)
f2 = generate_fingerprint(finding)
assert f1 == f2, "Fingerprints must be deterministic"
Low Coverage
Check scope configuration:
coverage:
scope:
- src/**/*.py
- lib/**/*.{js,ts}
exclude:
- tests/**
- vendor/**
Check claim distribution:
plan = await planner.create_plan(target)
print(f"Claims created: {len(plan.claims)}")
print(f"Surface area: {plan.surface_area}")
Integration with CodeGuard
from codeguard import RuleEngine
class Detector:
def __init__(self, rules_path: str):
self.rules = RuleEngine.load(rules_path)
async def scan(self, code: str, language: str):
return await self.rules.evaluate(code, language)
async def export_rule_gap(gap: RuleGap):
"""Convert discovered gap into CodeGuard rule."""
rule = {
"id": f"CWE-{gap.weakness_id}-{gap.pattern_hash}",
"name": gap.suggested_name,
"description": gap.description,
"pattern": gap.pattern,
"severity": gap.severity,
"languages": gap.applicable_languages
}
await write_codeguard_rule(rule, "rules/corpus/custom/")
Best Practices
- Start with 8 core roles only — Get foundational pipeline working before adding extensions
- Constitution is non-negotiable — Each principle prevents a real production failure
- Evidence gates everything — No evidence = no finding, regardless of confidence
- Fingerprints must be stable — Same issue in same place = same fingerprint always
- Budgets prevent runaway — Set token limits, enforce them, auto-stop when exhausted
- Coverage before completion — Don't mark evaluation complete until coverage gate passes
- Rule gaps feed corpus — When explorer finds something detector missed, create a rule
- Sandbox everything — Never execute in evaluation environment, always isolate
References