| name | workflow-orchestrator |
| description | Module Loop and Iteration Skill for orchestrating multi-phase penetration testing workflows. Use when coordinating sequential tool execution, managing dependencies between reconnaissance and vulnerability scanning modules, implementing adaptive fallback strategies, or managing workflow state across iterations. Triggers on tasks requiring dependency chaining (e.g., Nmap results feeding into Nuclei), adaptive workflow adjustments based on results, or stateful iteration management to avoid redundant scans. |
Workflow Orchestrator Skill
Manages cyclic testing workflows with dependency chaining, adaptive re-evaluation, and intelligent state management.
Quick Start
from scripts.workflow_engine import WorkflowEngine
from scripts.dependency_chain import DependencyChain
from scripts.state_manager import StateManager
state = StateManager(target="example.com")
engine = WorkflowEngine(state)
chain = DependencyChain()
chain.add_step("recon", NmapModule(), required=True)
chain.add_step("vuln_scan", NucleiModule(),
depends_on=["recon"],
input_mapper=lambda r: {"ports": r.open_ports})
chain.add_step("exploit", ExploitModule(),
depends_on=["vuln_scan"],
fallback=EnumModule())
results = await engine.execute(chain, max_iterations=3)
Workflow Architecture
┌─────────────────────────────────────────────────────────────┐
│ WORKFLOW ITERATION │
├─────────────────────────────────────────────────────────────┤
│ 1. CHECK STATE → Skip if already tested │
│ 2. EXECUTE MODULE → Run with dependencies │
│ 3. EVALUATE RESULTS → Success / Empty / Error │
│ 4. ADAPT STRATEGY → Fallback if empty │
│ 5. CHAIN OUTPUTS → Feed into next modules │
│ 6. UPDATE STATE → Mark tested, store findings │
│ 7. ITERATE → Continue until max_iterations or complete │
└─────────────────────────────────────────────────────────────┘
Core Components
1. Dependency Chaining (scripts/dependency_chain.py)
Manages execution order and data flow between modules:
chain = DependencyChain()
chain.add_step("port_scan", NmapScanner(), required=True)
chain.add_step("service_enum", ServiceEnumerator(),
depends_on=["port_scan"])
chain.add_step("web_scan", NucleiScanner(),
depends_on=["port_scan"],
condition=lambda ctx: 80 in ctx.get("port_scan", {}).ports or
443 in ctx.get("port_scan", {}).ports)
chain.add_step("sql_exploit", SQLMapModule(),
depends_on=["web_scan"],
condition=lambda ctx: "sql_injection" in ctx.findings)
See references/dependency_patterns.md for complete patterns.
2. Adaptive Re-evaluation (scripts/adaptive_strategy.py)
Switches strategies based on results:
| Result | Action |
|---|
| Success | Continue to dependent modules |
| Empty | Trigger fallback module |
| Error | Retry with reduced intensity |
| Timeout | Queue for later retry |
fallbacks = {
"web_scan": {
"empty_result": "service_deep_scan",
"error": "reduced_web_scan",
"timeout": "queue_retry"
},
"vuln_scan": {
"empty_result": "full_port_scan",
"blocked": "stealth_scan"
}
}
strategy = AdaptiveStrategy(fallbacks)
new_module = strategy.decide("web_scan", result)
3. State Management (scripts/state_manager.py)
Prevents redundant scans using Smart Memory:
state = StateManager(target="example.com")
if state.is_tested("nmap", ports="80,443"):
return state.get_result("nmap")
result = await nmap.scan("example.com", ports="80,443")
state.store("nmap", result, metadata={"ports": "80,443", "timestamp": now()})
state.add_tested_path("/admin", method="GET", params={"id": "1"})
Workflow Patterns
Pattern 1: Standard Pentest Flow
Nmap (ports) → Nuclei (web vulns) → SQLMap (exploit) → Report
↓ ↓ (if empty) ↓
Service Fallback to: Fallback to:
Enum Dirbusting Manual check
Pattern 2: Deep Dive Iteration
Iteration 1: Quick scan (top 100 ports)
Iteration 2: Full scan (all ports) - only if no services found
Iteration 3: Service-specific tools based on banners
Pattern 3: Parallel Branches
┌→ Web Path (FFuF)
Nmap Ports ─┤
└→ Service Exploit (Metasploit)
See references/workflow_templates/ for YAML definitions.
Integration with Zen-AI-Pentest
from core.orchestrator import ZenOrchestrator
class WorkflowOrchestrator(ZenOrchestrator):
def __init__(self):
super().__init__()
self.workflow_engine = WorkflowEngine()
self.state_manager = StateManager()
async def run_workflow(self, target: str, strategy: str = "standard"):
template = WorkflowTemplate.load(f"assets/workflows/{strategy}.yaml")
state = self.state_manager.for_target(target)
return await self.workflow_engine.execute(
template=template,
state=state,
max_iterations=template.max_iterations
)
State Storage Format
{
"target": "example.com",
"workflow_id": "uuid",
"iterations": [
{
"iteration": 1,
"modules": {
"nmap": {
"status": "completed",
"result_hash": "sha256:abc123",
"timestamp": "2026-03-20T14:46:00Z",
"inputs": {"ports": "top-100"},
"outputs": {"open_ports": [80, 443]}
},
"nuclei": {
"status": "completed",
"depends_on": ["nmap"],
"inputs": {"ports": [80, 443]}
}
}
}
],
"tested_paths": [
{"path": "/admin", "method": "GET", "params": {}},
{"path": "/api/users", "method": "POST", "params": {"id": "1"}}
],
"findings": ["CVE-2021-44228", "sql_injection_login"],
"adaptations": [
{"module": "web_scan", "trigger": "empty", "fallback": "dirbusting"}
]
}
Error Handling
| Error Type | Handler | Action |
|---|
| Module Crash | Restart with reduced threads | Log, retry once |
| Dependency Missing | Skip dependent modules | Mark as blocked |
| State Corruption | Rebuild from last good | Alert operator |
| Max Iterations | Halt workflow | Return partial results |
Performance Optimization
- Caching: Store module results keyed by inputs
- Parallelization: Run independent modules concurrently
- Early Termination: Stop if critical finding detected
- Incremental Scans: Only test new paths in iterations
See references/optimization_guide.md.
References
Scripts
scripts/workflow_engine.py - Core workflow execution
scripts/dependency_chain.py - Module dependency management
scripts/adaptive_strategy.py - Fallback and adaptation logic
scripts/state_manager.py - Smart Memory integration
scripts/parallel_executor.py - Concurrent module execution