| name | cab-workflow |
| description | CAB (Change Advisory Board) workflow patterns for EUCORA including evidence pack generation, risk scoring formulas, approval workflows, and exception management. Use when implementing CAB approvals, generating evidence packs, or calculating risk scores. |
| status | ✅ Working |
| last-validated | "2026-01-30T00:00:00.000Z" |
CAB Workflow Patterns
Change Advisory Board governance patterns for the EUCORA platform.
Quick Reference
| Component | Pattern |
|---|
| Risk Threshold | > 50 requires CAB approval |
| Privileged Tooling | Always requires CAB |
| Evidence Pack | Immutable, WORM storage |
| Approval Record | Append-only event store |
| Exceptions | Expiry date + compensating controls |
Risk Scoring Formula
Formula (v1.0)
RiskScore = clamp(0..100, Σ(weight_i × normalized_factor_i))
Where normalized_factor_i is between 0.0 (no risk) and 1.0 (maximum risk).
Risk Factors and Weights
| Factor | Weight | Examples |
|---|
| Privilege Impact | 20 | Admin required, service install, kernel extensions |
| Supply Chain Trust | 15 | Signature validity, notarization, publisher reputation |
| Exploitability | 10 | Network listeners, exposed services, macros |
| Data Access | 10 | Credential store, wide filesystem access |
| SBOM/Vulnerability | 15 | Critical/High CVEs in dependencies |
| Blast Radius | 10 | Scope size, ring level, BU/site count |
| Operational Complexity | 10 | Offline import, reboots, sequencing |
| History | 10 | Prior incidents, failure rate, rollback difficulty |
Implementation
from dataclasses import dataclass
from typing import Dict
RISK_WEIGHTS_V1 = {
"privilege_impact": 20,
"supply_chain_trust": 15,
"exploitability": 10,
"data_access": 10,
"sbom_vulnerability": 15,
"blast_radius": 10,
"operational_complexity": 10,
"history": 10,
}
@dataclass
class RiskAssessment:
score: float
factors: Dict[str, float]
model_version: str = "v1.0"
requires_cab: bool = False
@classmethod
def calculate(cls, factors: Dict[str, float]) -> "RiskAssessment":
"""Calculate risk score from normalized factors."""
score = sum(
RISK_WEIGHTS_V1.get(k, 0) * v
for k, v in factors.items()
)
score = max(0, min(100, score))
return cls(
score=score,
factors=factors,
requires_cab=score > 50,
)
def assess_deployment_risk(deployment) -> RiskAssessment:
"""Assess risk for a deployment."""
factors = {}
if deployment.requires_admin:
factors["privilege_impact"] = 1.0
elif deployment.requires_service_install:
factors["privilege_impact"] = 0.7
else:
factors["privilege_impact"] = 0.2
if deployment.artifact.is_signed and deployment.artifact.is_notarized:
factors["supply_chain_trust"] = 0.1
elif deployment.artifact.is_signed:
factors["supply_chain_trust"] = 0.3
else:
factors["supply_chain_trust"] = 0.9
if deployment.artifact.has_network_listeners:
factors["exploitability"] = 1.0
elif deployment.artifact.has_exposed_services:
factors["exploitability"] = 0.7
elif deployment.artifact.has_macros:
factors["exploitability"] = 0.5
else:
factors["exploitability"] = 0.1
if deployment.artifact.accesses_credential_store:
factors["data_access"] = 1.0
elif deployment.artifact.has_wide_filesystem_access:
factors["data_access"] = 0.7
else:
factors["data_access"] = 0.1
if deployment.artifact.has_critical_vulns:
factors["sbom_vulnerability"] = 1.0
elif deployment.artifact.has_high_vulns:
factors["sbom_vulnerability"] = 0.7
else:
factors["sbom_vulnerability"] = 0.0
target_count = deployment.target_scope_size
if target_count > 10000:
factors["blast_radius"] = 1.0
elif target_count > 1000:
factors["blast_radius"] = 0.6
else:
factors["blast_radius"] = 0.2
if deployment.requires_offline_import:
factors["operational_complexity"] = 1.0
elif deployment.requires_reboot:
factors["operational_complexity"] = 0.7
elif deployment.requires_sequencing:
factors["operational_complexity"] = 0.5
else:
factors["operational_complexity"] = 0.1
if deployment.artifact.has_prior_incidents:
factors["history"] = 1.0
elif deployment.artifact.failure_rate > 0.1:
factors["history"] = 0.7
elif deployment.artifact.rollback_difficulty == "hard":
factors["history"] = 0.5
else:
factors["history"] = 0.1
return RiskAssessment.calculate(factors)
Evidence Pack Schema
Required Fields
class EvidencePack(TimeStampedModel, CorrelationIdModel):
"""Immutable evidence pack for CAB submission."""
class Status(models.TextChoices):
DRAFT = "draft"
SUBMITTED = "submitted"
APPROVED = "approved"
REJECTED = "rejected"
deployment = models.ForeignKey("deployments.DeploymentIntent", on_delete=models.PROTECT)
version = models.CharField(max_length=16, default="1.0")
artifact_hash_sha256 = models.CharField(max_length=64)
artifact_signature = models.TextField()
artifact_signature_valid = models.BooleanField()
sbom_format = models.CharField(max_length=16)
sbom_content = models.JSONField()
scan_tool = models.CharField(max_length=32)
scan_results = models.JSONField()
scan_passed = models.BooleanField()
critical_count = models.IntegerField(default=0)
high_count = models.IntegerField(default=0)
policy_decision = models.CharField(max_length=16)
policy_exceptions = models.JSONField(default=list)
risk_score = models.DecimalField(max_digits=5, decimal_places=2)
risk_factors = models.JSONField()
risk_model_version = models.CharField(max_length=16, default="v1.0")
rollout_rings = models.JSONField()
rollout_schedule = models.JSONField()
rollback_strategy = models.CharField(max_length=32)
rollback_procedure = models.TextField()
rollback_validated = models.BooleanField(default=False)
test_lab_passed = models.BooleanField()
test_ring0_passed = models.BooleanField()
test_evidence_urls = models.JSONField(default=list)
exceptions = models.JSONField(default=list)
status = models.CharField(max_length=16, choices=Status.choices, default=Status.DRAFT)
submitted_at = models.DateTimeField(null=True, blank=True)
submitted_by = models.ForeignKey("core.User", on_delete=models.PROTECT, null=True)
class Meta:
ordering = ["-created_at"]
Evidence Pack Generation
class EvidencePackGenerator:
"""Generate complete evidence pack for CAB submission."""
async def generate(self, deployment) -> EvidencePack:
"""Generate evidence pack from deployment."""
artifact = deployment.artifact
evidence = EvidencePack(
deployment=deployment,
artifact_hash_sha256=artifact.sha256_hash,
artifact_signature=artifact.signature,
artifact_signature_valid=await self._verify_signature(artifact),
sbom_format=artifact.sbom.format,
sbom_content=artifact.sbom.content,
scan_tool=settings.VULN_SCANNER,
scan_results=await self._run_scan(artifact),
scan_passed=await self._evaluate_scan_policy(artifact),
critical_count=artifact.critical_vuln_count,
high_count=artifact.high_vuln_count,
policy_decision=await self._evaluate_policy(artifact),
policy_exceptions=await self._get_exceptions(artifact),
risk_score=deployment.risk_score,
risk_factors=deployment.risk_factors,
risk_model_version="v1.0",
rollout_rings=deployment.rollout_plan.rings,
rollout_schedule=deployment.rollout_plan.schedule,
rollback_strategy=deployment.rollback_strategy,
rollback_procedure=deployment.rollback_procedure,
rollback_validated=await self._check_rollback_validated(deployment),
test_lab_passed=await self._check_lab_tests(deployment),
test_ring0_passed=await self._check_ring0_tests(deployment),
test_evidence_urls=await self._collect_test_urls(deployment),
)
await evidence.asave()
return evidence
CAB Approval Workflow
States
┌─────────┐ submit ┌───────────┐
│ DRAFT │────────────▶│ SUBMITTED │
└─────────┘ └─────┬─────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌────────┐ ┌────────────┐
│ APPROVED │ │REJECTED│ │ CONDITIONS │
└──────────┘ └────────┘ └────────────┘
│
▼
┌──────────┐
│ APPROVED │
└──────────┘
Approval Service
class CABApprovalService:
"""CAB approval workflow service."""
async def submit_for_approval(
self,
evidence_pack: EvidencePack,
submitter: User,
) -> CABSubmission:
"""Submit evidence pack for CAB approval."""
await self._validate_evidence_complete(evidence_pack)
if evidence_pack.risk_score <= 50:
return await self._auto_approve(evidence_pack, "Low risk - auto-approved")
submission = await CABSubmission.objects.acreate(
evidence_pack=evidence_pack,
submitted_by=submitter,
status=CABSubmission.Status.PENDING,
correlation_id=evidence_pack.correlation_id,
)
await self._notify_approvers(submission)
return submission
async def approve(
self,
submission: CABSubmission,
approver: User,
conditions: list[str] = None,
) -> CABApproval:
"""Approve a CAB submission."""
if not approver.has_role("cab_approver"):
raise PermissionError("User is not a CAB approver")
if submission.submitted_by == approver:
raise PermissionError("Cannot approve own submission")
approval = await CABApproval.objects.acreate(
submission=submission,
approver=approver,
decision=CABApproval.Decision.APPROVED,
conditions=conditions or [],
correlation_id=submission.correlation_id,
)
submission.status = CABSubmission.Status.APPROVED
submission.approved_at = timezone.now()
await submission.asave()
await EventStore.objects.acreate(
event_type="cab.approved",
correlation_id=submission.correlation_id,
data={
"submission_id": str(submission.id),
"evidence_pack_id": str(submission.evidence_pack_id),
"approver_id": str(approver.id),
"conditions": conditions,
},
)
return approval
async def reject(
self,
submission: CABSubmission,
approver: User,
reason: str,
) -> CABApproval:
"""Reject a CAB submission."""
approval = await CABApproval.objects.acreate(
submission=submission,
approver=approver,
decision=CABApproval.Decision.REJECTED,
rejection_reason=reason,
correlation_id=submission.correlation_id,
)
submission.status = CABSubmission.Status.REJECTED
await submission.asave()
return approval
Exception Management
Exception Record
class VulnerabilityException(TimeStampedModel, CorrelationIdModel):
"""Exception for vulnerability findings."""
vulnerability_id = models.CharField(max_length=64)
artifact = models.ForeignKey("packaging.Artifact", on_delete=models.PROTECT)
justification = models.TextField()
compensating_controls = models.TextField()
expires_at = models.DateTimeField()
approved_by = models.ForeignKey("core.User", on_delete=models.PROTECT)
approved_at = models.DateTimeField(auto_now_add=True)
is_active = models.BooleanField(default=True)
revoked_at = models.DateTimeField(null=True, blank=True)
revoked_by = models.ForeignKey("core.User", null=True, on_delete=models.SET_NULL)
revoked_reason = models.TextField(blank=True)
class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(expires_at__gt=models.F("created_at")),
name="exception_must_have_future_expiry",
),
]
Exception Workflow
class ExceptionService:
"""Manage vulnerability exceptions."""
async def create_exception(
self,
vulnerability_id: str,
artifact,
justification: str,
compensating_controls: str,
expires_at: datetime,
requestor: User,
approver: User,
) -> VulnerabilityException:
"""Create a vulnerability exception."""
if not approver.has_role("security_reviewer"):
raise PermissionError("Security Reviewer approval required")
if expires_at <= timezone.now():
raise ValueError("Expiry must be in the future")
max_expiry = timezone.now() + timedelta(days=90)
if expires_at > max_expiry:
raise ValueError("Exception cannot exceed 90 days")
exception = await VulnerabilityException.objects.acreate(
vulnerability_id=vulnerability_id,
artifact=artifact,
justification=justification,
compensating_controls=compensating_controls,
expires_at=expires_at,
approved_by=approver,
correlation_id=generate_correlation_id("exc"),
)
logger.info(
f"Exception created for {vulnerability_id}",
extra={
"exception_id": str(exception.id),
"artifact_id": str(artifact.id),
"expires_at": expires_at.isoformat(),
}
)
return exception
Ring Gates
Promotion Gates
| Ring | Success Rate | Time-to-Compliance | Additional Requirements |
|---|
| Ring 1 (Canary) | ≥ 98% | ≤ 24h | Rollback validated |
| Ring 2 (Pilot) | ≥ 97% | ≤ 24h | CAB approved if Risk > 50 |
| Ring 3 (Department) | ≥ 99% | ≤ 24h | All previous gates passed |
| Ring 4 (Global) | ≥ 99% | ≤ 24h | Stakeholder sign-off |
Gate Evaluation
class PromotionGateService:
"""Evaluate ring promotion gates."""
THRESHOLDS = {
1: {"success_rate": 0.98, "time_to_compliance_hours": 24},
2: {"success_rate": 0.97, "time_to_compliance_hours": 24},
3: {"success_rate": 0.99, "time_to_compliance_hours": 24},
4: {"success_rate": 0.99, "time_to_compliance_hours": 24},
}
async def can_promote(
self,
deployment,
target_ring: int,
) -> tuple[bool, list[str]]:
"""Check if deployment can promote to target ring."""
issues = []
thresholds = self.THRESHOLDS[target_ring]
success_rate = await self._get_success_rate(deployment)
if success_rate < thresholds["success_rate"]:
issues.append(
f"Success rate {success_rate:.1%} below threshold {thresholds['success_rate']:.0%}"
)
avg_compliance_time = await self._get_avg_compliance_time(deployment)
if avg_compliance_time > thresholds["time_to_compliance_hours"]:
issues.append(
f"Compliance time {avg_compliance_time}h exceeds {thresholds['time_to_compliance_hours']}h"
)
if not deployment.rollback_validated:
issues.append("Rollback not validated")
if target_ring >= 2 and deployment.risk_score > 50:
if not await self._has_cab_approval(deployment):
issues.append("CAB approval required for Risk > 50 when targeting Ring 2+")
if await self._has_active_incidents(deployment):
issues.append("Active incidents block promotion")
return len(issues) == 0, issues
API Endpoints
POST /api/v1/cab/submit/
GET /api/v1/cab/pending/
POST /api/v1/cab/{id}/approve/
POST /api/v1/cab/{id}/reject/
GET /api/v1/cab/{id}/evidence/
POST /api/v1/exceptions/
GET /api/v1/exceptions/
DELETE /api/v1/exceptions/{id}/
POST /api/v1/risk/assess/
GET /api/v1/risk/factors/
Checklist
Evidence Pack Generation
☐ Artifact hash (SHA-256) computed
☐ Signature verified
☐ SBOM generated (SPDX or CycloneDX)
☐ Vulnerability scan completed
☐ Risk score calculated
☐ Rollout plan documented
☐ Rollback plan documented and validated
☐ Test evidence collected
CAB Submission
☐ Evidence pack complete
☐ Risk score > 50? → CAB required
☐ Privileged tooling? → CAB required
☐ Exceptions documented with expiry
☐ Submitted by authorized user
☐ Approval recorded in event store
Anti-Patterns
| ❌ FORBIDDEN | ✅ CORRECT |
|---|
| Publishing without CAB (Risk > 50) | CAB approval mandatory |
| Exception without expiry | All exceptions expire ≤90 days |
| Modifying evidence pack | Immutable - version and resubmit |
| Same person submit + approve | Separate submitter and approver |
| Skipping rollback validation | Rollback tested before Ring 2 |