// Generic AIOps (AI for IT Operations) patterns and best practices for 2025. Provides comprehensive implementation strategies for intelligent monitoring, automation, incident response, and observability across any infrastructure. Framework-agnostic approach supporting multiple monitoring platforms, cloud providers, and automation tools.
| name | aiops |
| description | Generic AIOps (AI for IT Operations) patterns and best practices for 2025. Provides comprehensive implementation strategies for intelligent monitoring, automation, incident response, and observability across any infrastructure. Framework-agnostic approach supporting multiple monitoring platforms, cloud providers, and automation tools. |
| license | MIT |
This skill provides comprehensive patterns for implementing AIOps strategies in 2025, including intelligent monitoring, automated incident response, predictive analytics, and observability best practices. The patterns are designed to be framework-agnostic and applicable across different infrastructure platforms.
Use this skill when you need to:
# aiops/core/architecture.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
class AlertSeverity(str, Enum):
"""Alert severity levels"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class IncidentStatus(str, Enum):
"""Incident status"""
OPEN = "open"
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
MONITORING = "monitoring"
RESOLVED = "resolved"
CLOSED = "closed"
@dataclass
class Metric:
"""Metric data point"""
name: str
value: float
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
source: str = ""
unit: str = ""
@dataclass
class LogEntry:
"""Structured log entry"""
timestamp: datetime
level: str
message: str
service: str
trace_id: Optional[str] = None
span_id: Optional[str] = None
labels: Dict[str, str] = field(default_factory=dict)
stack_trace: Optional[str] = None
@dataclass
class Trace:
"""Distributed trace data"""
trace_id: str
spans: List[Dict[str, Any]]
duration: timedelta
service_map: Dict[str, List[str]]
error_count: int = 0
@dataclass
class Alert:
"""Alert representation"""
id: str
name: str
severity: AlertSeverity
status: IncidentStatus
description: str
source: str
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
annotations: Dict[str, str] = field(default_factory=dict)
fingerprint: str = ""
related_entities: List[str] = field(default_factory=list)
class DataSource(ABC):
"""Abstract data source interface"""
@abstractmethod
async def collect_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> List[Metric]:
"""Collect metrics from data source"""
pass
@abstractmethod
async def query_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[LogEntry]:
"""Query logs from data source"""
pass
@abstractmethod
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Get trace data"""
pass
class AIOpsEngine:
"""Core AIOps processing engine"""
def __init__(self):
self.data_sources: List[DataSource] = []
self.alert_processors: List[AlertProcessor] = []
self.ml_models: Dict[str, Any] = {}
self.knowledge_base: KnowledgeBase = KnowledgeBase()
self.automation_engine = AutomationEngine()
def register_data_source(self, data_source: DataSource) -> None:
"""Register a data source"""
self.data_sources.append(data_source)
logger.info(f"Registered data source: {type(data_source).__name__}")
def register_alert_processor(self, processor: 'AlertProcessor') -> None:
"""Register an alert processor"""
self.alert_processors.append(processor)
logger.info(f"Registered alert processor: {type(processor).__name__}")
async def process_alert(self, alert: Alert) -> Dict[str, Any]:
"""Process incoming alert through AIOps pipeline"""
logger.info(f"Processing alert: {alert.id} - {alert.name}")
# 1. Enrich alert with context
enriched_alert = await self._enrich_alert(alert)
# 2. Run through ML models for classification and prediction
analysis = await self._analyze_alert(enriched_alert)
# 3. Determine appropriate response
response_plan = await self._generate_response_plan(enriched_alert, analysis)
# 4. Execute automated actions if applicable
if response_plan.get("auto_execute", False):
await self._execute_response(enriched_alert, response_plan)
# 5. Update knowledge base
await self.knowledge_base.update_from_alert(enriched_alert, analysis, response_plan)
return {
"alert_id": alert.id,
"status": "processed",
"analysis": analysis,
"response_plan": response_plan
}
async def _enrich_alert(self, alert: Alert) -> Alert:
"""Enrich alert with additional context"""
# Get related metrics
for source in self.data_sources:
try:
metrics = await source.collect_metrics(
query=f"{alert.name}[5m]",
start_time=alert.timestamp - timedelta(minutes=5),
end_time=alert.timestamp
)
# Add metrics to alert annotations
alert.annotations[f"metrics_{type(source).__name__}"] = json.dumps([
{"name": m.name, "value": m.value} for m in metrics
])
except Exception as e:
logger.error(f"Failed to enrich alert with metrics: {e}")
# Get related logs
for source in self.data_sources:
try:
logs = await source.query_logs(
query=f"service:{alert.source} level:error",
start_time=alert.timestamp - timedelta(minutes=5),
end_time=alert.timestamp,
limit=10
)
# Add logs to alert context
alert.annotations["recent_errors"] = json.dumps([
{"timestamp": l.timestamp.isoformat(), "message": l.message}
for l in logs
])
except Exception as e:
logger.error(f"Failed to enrich alert with logs: {e}")
return alert
async def _analyze_alert(self, alert: Alert) -> Dict[str, Any]:
"""Analyze alert using ML models"""
analysis = {
"classification": None,
"severity_score": 0.0,
"predicted_impact": "low",
"recommendations": [],
"related_incidents": []
}
# Classification model
if "classification" in self.ml_models:
features = self._extract_features(alert)
analysis["classification"] = self.ml_models["classification"].predict([features])[0]
# Severity prediction
if "severity" in self.ml_models:
features = self._extract_features(alert)
analysis["severity_score"] = self.ml_models["severity"].predict_proba([features])[0][1]
# Impact prediction
if "impact" in self.ml_models:
features = self._extract_features(alert)
analysis["predicted_impact"] = self.ml_models["impact"].predict([features])[0]
# Find related incidents from knowledge base
analysis["related_incidents"] = await self.knowledge_base.find_similar_incidents(alert)
return analysis
async def _generate_response_plan(
self,
alert: Alert,
analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate automated response plan"""
plan = {
"priority": alert.severity,
"actions": [],
"escalation_rules": [],
"auto_execute": False,
"estimated_resolution_time": None
}
# Determine if auto-execution is safe
if (analysis["classification"] in ["known_issue", "false_positive"] and
alert.severity in [AlertSeverity.LOW, AlertSeverity.MEDIUM]):
plan["auto_execute"] = True
# Generate actions based on classification
classification = analysis.get("classification")
if classification == "resource_exhaustion":
plan["actions"].append({
"type": "scale",
"target": alert.source,
"params": {"replicas": "+20%"}
})
plan["actions"].append({
"type": "notify",
"channel": "devops",
"message": f"Resource exhaustion detected in {alert.source}"
})
elif classification == "known_issue":
# Apply known fix
known_fix = await self.knowledge_base.get_known_fix(alert.name)
if known_fix:
plan["actions"].append(known_fix)
elif classification == "false_positive":
plan["actions"].append({
"type": "suppress",
"duration": "1h",
"reason": "False positive detected"
})
# Add escalation rules
if alert.severity == AlertSeverity.CRITICAL:
plan["escalation_rules"].append({
"condition": "no_response_5m",
"action": "escalate_to_oncall",
"timeout": 300
})
return plan
def _extract_features(self, alert: Alert) -> List[float]:
"""Extract features for ML models"""
# Simple feature extraction - in production, use more sophisticated methods
features = [
float(alert.severity == AlertSeverity.CRITICAL),
float(alert.severity == AlertSeverity.HIGH),
float(alert.severity == AlertSeverity.MEDIUM),
len(alert.description),
len(alert.labels),
alert.timestamp.hour / 24.0,
alert.timestamp.weekday() / 7.0
]
return features
async def _execute_response(
self,
alert: Alert,
response_plan: Dict[str, Any]
) -> None:
"""Execute automated response actions"""
for action in response_plan.get("actions", []):
try:
await self.automation_engine.execute(action)
logger.info(f"Executed action for alert {alert.id}: {action['type']}")
except Exception as e:
logger.error(f"Failed to execute action: {e}")
# aiops/integrations/monitoring.py
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime
from ..core.architecture import DataSource, Metric, LogEntry, Trace, Alert
class PrometheusDataSource(DataSource):
"""Prometheus metrics integration"""
def __init__(self, url: str):
self.url = url.rstrip("/")
self.session = None
async def _get_session(self) -> aiohttp.ClientSession:
if not self.session:
self.session = aiohttp.ClientSession()
return self.session
async def collect_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> List[Metric]:
"""Query Prometheus for metrics"""
session = await self._get_session()
params = {
"query": query,
"start": start_time.timestamp(),
"end": end_time.timestamp(),
"step": "15s"
}
url = f"{self.url}/api/v1/query_range"
async with session.get(url, params=params) as response:
data = await response.json()
metrics = []
if data["status"] == "success":
result = data["data"]
if result["resultType"] == "matrix":
for series in result["result"]:
labels = series["metric"]
values = series["values"]
for timestamp, value in values:
metrics.append(Metric(
name=query,
value=float(value),
timestamp=datetime.fromtimestamp(float(timestamp)),
labels=labels,
source="prometheus"
))
return metrics
async def query_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[LogEntry]:
"""Prometheus doesn't store logs - use Loki integration"""
return []
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Prometheus doesn't store traces - use Jaeger/Tempo integration"""
return None
class LokiDataSource(DataSource):
"""Loki log aggregation integration"""
def __init__(self, url: str):
self.url = url.rstrip("/")
self.session = None
async def _get_session(self) -> aiohttp.ClientSession:
if not self.session:
self.session = aiohttp.ClientSession()
return self.session
async def collect_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> List[Metric]:
"""Loki doesn't store metrics"""
return []
async def query_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[LogEntry]:
"""Query Loki for logs"""
session = await self._get_session()
params = {
"query": query,
"start": start_time.isoformat(),
"end": end_time.isoformat(),
"limit": str(limit)
}
url = f"{self.url}/loki/api/v1/query_range"
async with session.get(url, params=params) as response:
data = await response.json()
logs = []
if data["status"] == "success":
result = data["data"]
for series in result["result"]:
stream = series["stream"]
for timestamp, line in series["values"]:
logs.append(LogEntry(
timestamp=datetime.fromtimestamp(float(timestamp) / 1e9),
level=stream.get("level", "info"),
message=line,
service=stream.get("service", "unknown"),
labels=stream
))
return logs
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Loki doesn't store traces"""
return None
class JaegerDataSource(DataSource):
"""Jaeger distributed tracing integration"""
def __init__(self, url: str):
self.url = url.rstrip("/")
self.session = None
async def _get_session(self) -> aiohttp.ClientSession:
if not self.session:
self.session = aiohttp.ClientSession()
return self.session
async def collect_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> List[Metric]:
"""Jaeger doesn't store metrics"""
return []
async def query_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[LogEntry]:
"""Jaeger doesn't store logs"""
return []
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Get trace from Jaeger"""
session = await self._get_session()
url = f"{self.url}/api/traces/{trace_id}"
async with session.get(url) as response:
data = await response.json()
if data["data"] and data["data"][0]["spans"]:
spans = data["data"][0]["spans"]
service_map = {}
error_count = 0
for span in spans:
service = span["process"]["serviceName"]
if service not in service_map:
service_map[service] = []
service_map[service].append(span["spanID"])
if span.get("tags"):
for tag in span["tags"]:
if tag.get("key") == "error" and tag.get("value"):
error_count += 1
return Trace(
trace_id=trace_id,
spans=spans,
duration=timedelta(
microseconds=data["data"][0]["duration"]
),
service_map=service_map,
error_count=error_count
)
return None
class DatadogDataSource(DataSource):
"""Datadog APM integration"""
def __init__(self, api_key: str, app_key: str, site: str = "datadoghq.com"):
self.api_key = api_key
self.app_key = app_key
self.site = site
self.base_url = f"https://api.{site}"
self.session = None
async def _get_session(self) -> aiohttp.ClientSession:
if not self.session:
headers = {
"DD-API-KEY": self.api_key,
"DD-APPLICATION-KEY": self.app_key
}
self.session = aiohttp.ClientSession(headers=headers)
return self.session
async def collect_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> List[Metric]:
"""Query Datadog metrics"""
session = await self._get_session()
params = {
"query": query,
"from": int(start_time.timestamp()),
"to": int(end_time.timestamp())
}
url = f"{self.base_url}/api/v1/query"
async with session.get(url, params=params) as response:
data = await response.json()
metrics = []
if "series" in data:
for series in data["series"]:
for point in series["pointlist"]:
metrics.append(Metric(
name=series["metric"],
value=point[1],
timestamp=datetime.fromtimestamp(point[0]),
labels={k: str(v) for k, v in series["tag_set"].items()},
source="datadog",
unit=series.get("unit", "")
))
return metrics
async def query_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[LogEntry]:
"""Query Datadog logs"""
session = await self._get_session()
payload = {
"query": query,
"time": {
"from": start_time.isoformat(),
"to": end_time.isoformat()
},
"limit": limit
}
url = f"{self.base_url}/api/v2/logs/events/search"
async with session.post(url, json=payload) as response:
data = await response.json()
logs = []
for log in data.get("data", []):
attributes = log.get("attributes", {})
logs.append(LogEntry(
timestamp=datetime.fromisoformat(attributes["timestamp"]),
level=attributes.get("status", "info"),
message=attributes.get("message", ""),
service=attributes.get("service", "unknown"),
trace_id=attributes.get("trace_id"),
labels=attributes.get("tags", [])
))
return logs
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Get trace from Datadog"""
session = await self._get_session()
url = f"{self.base_url}/api/v1/trace/{trace_id}"
async with session.get(url) as response:
data = await response.json()
# Parse Datadog trace format
# Implementation depends on Datadog API response structure
return None
# aiops/ml/anomaly_detection.py
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Optional
from datetime import datetime, timedelta
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import joblib
import asyncio
from collections import deque
class AnomalyDetector:
"""Generic anomaly detection for metrics and logs"""
def __init__(self, model_type: str = "isolation_forest"):
self.model_type = model_type
self.models: Dict[str, Any] = {}
self.scalers: Dict[str, StandardScaler] = {}
self.is_trained: Dict[str, bool] = {}
self.feature_history: Dict[str, deque] = {}
async def detect_metric_anomaly(
self,
metric_name: str,
current_value: float,
historical_values: List[float],
window_size: int = 100
) -> Tuple[bool, float]:
"""Detect anomalies in time series metrics"""
if metric_name not in self.models:
await self._train_metric_model(metric_name, historical_values)
# Prepare features
features = self._extract_time_series_features(
current_value,
historical_values,
window_size
)
# Scale features
if metric_name in self.scalers:
features = self.scalers[metric_name].transform([features])
# Predict anomaly
model = self.models[metric_name]
anomaly_score = model.decision_function([features])[0]
is_anomaly = model.predict([features])[0] == -1
return is_anomaly, float(anomaly_score)
async def detect_log_anomaly(
self,
log_entries: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Detect anomalies in log patterns"""
# Extract log features
features = []
for entry in log_entries:
features.append([
len(entry.get("message", "")),
entry.get("level_numeric", 0),
entry.get("hour_of_day", 0),
entry.get("day_of_week", 0)
])
features = np.array(features)
# Train or use existing model
model_name = "log_patterns"
if model_name not in self.models:
self.models[model_name] = IsolationForest(
contamination=0.1,
random_state=42
)
self.models[model_name].fit(features)
# Predict anomalies
predictions = self.models[model_name].predict(features)
scores = self.models[model_name].decision_function(features)
# Return anomalous logs
anomalies = []
for i, (pred, score) in enumerate(zip(predictions, scores)):
if pred == -1: # Anomaly
anomalies.append({
"log_index": i,
"log_entry": log_entries[i],
"anomaly_score": float(score),
"reason": "Unusual log pattern detected"
})
return anomalies
async def detect_performance_anomaly(
self,
metrics: Dict[str, List[float]],
threshold: float = 0.1
) -> Dict[str, Dict[str, Any]]:
"""Detect performance anomalies across multiple metrics"""
results = {}
for metric_name, values in metrics.items():
if len(values) < 10:
continue
# Calculate statistical anomalies
mean_val = np.mean(values)
std_val = np.std(values)
recent_values = values[-5:]
for i, value in enumerate(recent_values):
z_score = abs(value - mean_val) / (std_val + 1e-8)
if z_score > 3: # Statistical outlier
if metric_name not in results:
results[metric_name] = {
"anomalies": [],
"severity": "medium"
}
results[metric_name]["anomalies"].append({
"value": value,
"z_score": z_score,
"index": i,
"type": "statistical"
})
return results
async def _train_metric_model(
self,
metric_name: str,
values: List[float]
) -> None:
"""Train anomaly detection model for a metric"""
if len(values) < 30:
# Not enough data for training
return
# Prepare training data
training_features = []
for i in range(10, len(values)):
# Create window of previous values
window = values[i-10:i]
features = [
np.mean(window),
np.std(window),
values[i], # Current value
(values[i] - np.mean(window)) / (np.std(window) + 1e-8), # Z-score
max(window) - min(window), # Range
]
training_features.append(features)
training_features = np.array(training_features)
# Scale features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(training_features)
self.scalers[metric_name] = scaler
# Train model
if self.model_type == "isolation_forest":
model = IsolationForest(
contamination=0.05,
random_state=42
)
model.fit(scaled_features)
else:
# Add other model types as needed
model = IsolationForest(contamination=0.05, random_state=42)
model.fit(scaled_features)
self.models[metric_name] = model
self.is_trained[metric_name] = True
def _extract_time_series_features(
self,
current_value: float,
historical_values: List[float],
window_size: int
) -> List[float]:
"""Extract features from time series data"""
if len(historical_values) < window_size:
window = historical_values
else:
window = historical_values[-window_size:]
features = [
np.mean(window),
np.std(window),
min(window),
max(window),
current_value,
(current_value - np.mean(window)) / (np.std(window) + 1e-8),
len([v for v in window if v > np.mean(window)]) / len(window), # Percentage above mean
]
return features
class PredictiveAnalyzer:
"""Predictive analytics for capacity planning and incident prediction"""
def __init__(self):
self.models: Dict[str, Any] = {}
async def predict_capacity_needs(
self,
metric_history: Dict[str, List[float]],
forecast_horizon: int = 7
) -> Dict[str, Dict[str, Any]]:
"""Predict future resource needs"""
predictions = {}
for metric_name, values in metric_history.items():
if len(values) < 30:
continue
# Simple linear trend prediction (use more sophisticated models in production)
x = np.arange(len(values))
coeffs = np.polyfit(x, values, 1)
# Predict future values
future_x = np.arange(len(values), len(values) + forecast_horizon * 24)
future_values = np.polyval(coeffs, future_x)
# Calculate growth rate
growth_rate = (future_values[-1] - values[-1]) / values[-1] * 100
predictions[metric_name] = {
"current": values[-1],
"predicted_peak": float(max(future_values)),
"predicted_average": float(np.mean(future_values)),
"growth_rate": growth_rate,
"recommendation": self._get_capacity_recommendation(growth_rate),
"confidence": 0.75 # Should be calculated based on model accuracy
}
return predictions
async def predict_incidents(
self,
alert_history: List[Dict[str, Any]],
system_metrics: Dict[str, List[float]]
) -> Dict[str, Any]:
"""Predict likelihood of future incidents"""
# Feature engineering
features = self._extract_incident_features(alert_history, system_metrics)
# Simplified risk score calculation
risk_factors = {
"error_rate_trend": self._calculate_trend(
system_metrics.get("error_rate", [])
),
"cpu_utilization_avg": np.mean(
system_metrics.get("cpu_utilization", [0])
),
"memory_utilization_avg": np.mean(
system_metrics.get("memory_utilization", [0])
),
"recent_incident_count": len([
a for a in alert_history[-24:]
if a.get("severity") == "critical"
]),
}
# Calculate risk score (0-100)
risk_score = min(100, max(0, (
risk_factors["error_rate_trend"] * 30 +
risk_factors["cpu_utilization_avg"] * 25 +
risk_factors["memory_utilization_avg"] * 25 +
risk_factors["recent_incident_count"] * 20
)))
return {
"risk_score": risk_score,
"risk_level": self._get_risk_level(risk_score),
"contributing_factors": risk_factors,
"recommendations": self._get_mitigation_recommendations(risk_factors)
}
def _extract_incident_features(
self,
alert_history: List[Dict[str, Any]],
system_metrics: Dict[str, List[float]]
) -> List[float]:
"""Extract features for incident prediction"""
features = []
# Alert patterns
recent_alerts = alert_history[-24:] if len(alert_history) >= 24 else alert_history
features.append(len(recent_alerts))
features.append(len([a for a in recent_alerts if a.get("severity") == "critical"]))
# System metrics
for metric_name in ["cpu", "memory", "disk", "network"]:
values = system_metrics.get(f"{metric_name}_utilization", [0])
features.append(np.mean(values[-12:]) if values else 0)
features.append(np.std(values[-12:]) if values else 0)
return features
def _calculate_trend(self, values: List[float]) -> float:
"""Calculate trend coefficient (positive = increasing)"""
if len(values) < 2:
return 0.0
x = np.arange(len(values))
coeffs = np.polyfit(x, values, 1)
return coeffs[0]
def _get_capacity_recommendation(self, growth_rate: float) -> str:
"""Get capacity recommendation based on growth rate"""
if growth_rate > 50:
return "Immediate scaling required - consider 2x current capacity"
elif growth_rate > 20:
return "Plan for scaling within next week - recommend 1.5x capacity"
elif growth_rate > 0:
return "Monitor growth - consider 25% buffer"
else:
return "Capacity adequate - no immediate action needed"
def _get_risk_level(self, risk_score: float) -> str:
"""Get risk level from risk score"""
if risk_score >= 80:
return "critical"
elif risk_score >= 60:
return "high"
elif risk_score >= 40:
return "medium"
else:
return "low"
def _get_mitigation_recommendations(
self,
risk_factors: Dict[str, float]
) -> List[str]:
"""Get mitigation recommendations based on risk factors"""
recommendations = []
if risk_factors["cpu_utilization_avg"] > 80:
recommendations.append("Scale CPU resources or optimize CPU usage")
if risk_factors["memory_utilization_avg"] > 80:
recommendations.append("Increase memory allocation or check for memory leaks")
if risk_factors["error_rate_trend"] > 0:
recommendations.append("Investigate and fix increasing error rate")
if risk_factors["recent_incident_count"] > 3:
recommendations.append("Review recent incidents for root cause analysis")
return recommendations
class AILogAnalyzer:
"""AI-powered log analysis and pattern detection"""
def __init__(self):
self.patterns: Dict[str, List[str]] = {}
self.embeddings: Dict[str, np.ndarray] = {}
async def analyze_logs(
self,
logs: List[LogEntry]
) -> Dict[str, Any]:
"""Analyze logs for patterns and anomalies"""
analysis = {
"total_logs": len(logs),
"error_rate": 0.0,
"patterns": [],
"anomalies": [],
"trending_topics": [],
"recommendations": []
}
# Calculate error rate
error_count = len([l for l in logs if l.level.lower() in ["error", "critical"]])
analysis["error_rate"] = error_count / len(logs) * 100
# Extract patterns using regex and ML
analysis["patterns"] = await self._extract_patterns(logs)
# Find anomalies
analysis["anomalies"] = await self._detect_log_anomalies(logs)
# Identify trending topics/errors
analysis["trending_topics"] = await self._identify_trending_topics(logs)
# Generate recommendations
analysis["recommendations"] = self._generate_log_recommendations(analysis)
return analysis
async def _extract_patterns(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
"""Extract common patterns from logs"""
# Simplified pattern extraction
# In production, use more sophisticated NLP techniques
patterns = []
# Group logs by service
service_logs = {}
for log in logs:
if log.service not in service_logs:
service_logs[log.service] = []
service_logs[log.service].append(log)
# Find error patterns
for service, service_log_list in service_logs.items():
error_logs = [l for l in service_log_list if l.level.lower() == "error"]
if len(error_logs) > 0:
# Find common substrings in error messages
common_errors = {}
for log in error_logs:
# Simplified - use actual pattern matching in production
for word in log.message.split():
if len(word) > 5: # Only consider longer words
common_errors[word] = common_errors.get(word, 0) + 1
if common_errors:
most_common = max(common_errors, key=common_errors.get)
patterns.append({
"service": service,
"type": "error_pattern",
"pattern": most_common,
"count": common_errors[most_common],
"percentage": common_errors[most_common] / len(error_logs) * 100
})
return patterns
async def _detect_log_anomalies(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
"""Detect anomalies in logs"""
anomalies = []
# Check for unusual error spikes
error_by_minute = {}
for log in logs:
if log.level.lower() in ["error", "critical"]:
minute = log.timestamp.replace(second=0, microsecond=0)
error_by_minute[minute] = error_by_minute.get(minute, 0) + 1
if error_by_minute:
avg_errors = np.mean(list(error_by_minute.values()))
for minute, count in error_by_minute.items():
if count > avg_errors * 3: # 3x spike
anomalies.append({
"type": "error_spike",
"timestamp": minute.isoformat(),
"value": count,
"baseline": avg_errors,
"severity": "high"
})
return anomalies
async def _identify_trending_topics(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
"""Identify trending topics in logs"""
# Simplified trending detection
word_count = {}
for log in logs[-100:]: # Last 100 logs
for word in log.message.lower().split():
if len(word) > 5 and word.isalpha():
word_count[word] = word_count.get(word, 0) + 1
# Get top trending words
trending = sorted(word_count.items(), key=lambda x: x[1], reverse=True)[:10]
return [
{"topic": word, "count": count}
for word, count in trending
]
def _generate_log_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on log analysis"""
recommendations = []
if analysis["error_rate"] > 5:
recommendations.append(
f"High error rate ({analysis['error_rate']:.1f}%) detected - investigate immediately"
)
if len(analysis["anomalies"]) > 0:
recommendations.append(
f"Found {len(analysis['anomalies'])} anomalies - review and address"
)
for pattern in analysis["patterns"][:3]:
if pattern["percentage"] > 30:
recommendations.append(
f"Common error pattern in {pattern['service']}: {pattern['pattern']}"
)
return recommendations
# aiops/automation/engine.py
import asyncio
import subprocess
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class ActionType(str, Enum):
"""Types of automation actions"""
SCALE = "scale"
RESTART = "restart"
PATCH = "patch"
NOTIFY = "notify"
SUPPRESS = "suppress"
ISOLATE = "isolate"
FAILBACK = "failback"
@dataclass
class Action:
"""Automation action definition"""
type: ActionType
target: str
params: Dict[str, Any]
conditions: List[str] = None
timeout: int = 300
retries: int = 3
class AutomationEngine:
"""Automation engine for executing remediation actions"""
def __init__(self):
self.actions: Dict[str, Action] = {}
self.execution_history: List[Dict[str, Any]] = []
self.action_handlers: Dict[ActionType, Callable] = {
ActionType.SCALE: self._handle_scale,
ActionType.RESTART: self._handle_restart,
ActionType.PATCH: self._handle_patch,
ActionType.NOTIFY: self._handle_notify,
ActionType.SUPPRESS: self._handle_suppress,
ActionType.ISOLATE: self._handle_isolate,
ActionType.FAILBACK: self._handle_failback
}
def register_action(self, action: Action) -> None:
"""Register an automation action"""
self.actions[action.target] = action
logger.info(f"Registered action: {action.type} for {action.target}")
async def execute(self, action: Action) -> Dict[str, Any]:
"""Execute an automation action"""
execution_id = f"{action.type}_{action.target}_{int(datetime.utcnow().timestamp())}"
logger.info(f"Executing action {execution_id}: {action.type} on {action.target}")
execution = {
"id": execution_id,
"action": action,
"status": "running",
"started_at": datetime.utcnow(),
"result": None,
"error": None
}
try:
# Check conditions
if action.conditions and not await self._check_conditions(action.conditions):
execution["status"] = "skipped"
execution["error"] = "Conditions not met"
return execution
# Execute action with retries
result = await self._execute_with_retries(action)
execution["status"] = "success"
execution["result"] = result
logger.info(f"Action {execution_id} completed successfully")
except Exception as e:
execution["status"] = "failed"
execution["error"] = str(e)
logger.error(f"Action {execution_id} failed: {e}")
finally:
execution["completed_at"] = datetime.utcnow()
execution["duration"] = (execution["completed_at"] - execution["started_at"]).total_seconds()
self.execution_history.append(execution)
return execution
async def _execute_with_retries(self, action: Action) -> Any:
"""Execute action with retry logic"""
last_exception = None
for attempt in range(action.retries + 1):
try:
handler = self.action_handlers.get(action.type)
if not handler:
raise ValueError(f"No handler for action type: {action.type}")
# Execute with timeout
return await asyncio.wait_for(
handler(action),
timeout=action.timeout
)
except asyncio.TimeoutError:
last_exception = TimeoutError(f"Action timed out after {action.timeout}s")
if attempt < action.retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise last_exception
except Exception as e:
last_exception = e
if attempt < action.retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise last_exception
async def _check_conditions(self, conditions: List[str]) -> bool:
"""Check if action conditions are met"""
# Implement condition checking logic
# For example, check cluster health, time windows, etc.
return True
async def _handle_scale(self, action: Action) -> Dict[str, Any]:
"""Handle scale action"""
params = action.params
command = [
"kubectl", "scale",
f"--replicas={params.get('replicas', '+1')}",
f"deployment/{action.target}"
]
if "namespace" in params:
command.insert(1, f"-n={params['namespace']}")
result = await self._run_command(command)
return {"scaled": True, "result": result}
async def _handle_restart(self, action: Action) -> Dict[str, Any]:
"""Handle restart action"""
params = action.params
command = ["kubectl", "rollout", "restart", f"deployment/{action.target}"]
if "namespace" in params:
command.insert(2, f"-n={params['namespace']}")
result = await self._run_command(command)
return {"restarted": True, "result": result}
async def _handle_patch(self, action: Action) -> Dict[str, Any]:
"""Handle patch action"""
params = action.params
# Build patch command
command = ["kubectl", "patch"]
if "namespace" in params:
command.extend(["-n", params["namespace"]])
resource_type = params.get("type", "deployment")
command.extend([f"{resource_type}/{action.target}"])
patch_type = params.get("patch_type", "strategic")
command.extend(["--type", patch_type])
patch_data = params.get("patch", {})
command.extend(["-p", json.dumps(patch_data)])
result = await self._run_command(command)
return {"patched": True, "result": result}
async def _handle_notify(self, action: Action) -> Dict[str, Any]:
"""Handle notification action"""
params = action.params
channel = params.get("channel", "default")
message = params.get("message", f"AAction executed: {action.type} on {action.target}")
# Implement notification logic (Slack, Teams, Email, etc.)
# This is a placeholder for actual notification implementation
notification = {
"channel": channel,
"message": message,
"timestamp": datetime.utcnow().isoformat(),
"sent": True
}
# Example: Send to Slack
if channel == "slack":
webhook_url = params.get("webhook_url")
if webhook_url:
slack_payload = {
"text": message,
"attachments": [{
"color": "warning",
"fields": [{
"title": "Action",
"value": f"{action.type} on {action.target}",
"short": True
}, {
"title": "Timestamp",
"value": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"short": True
}]
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=slack_payload) as response:
if response.status != 200:
raise Exception(f"Slack notification failed: {response.status}")
return notification
async def _handle_suppress(self, action: Action) -> Dict[str, Any]:
"""Handle alert suppression"""
params = action.params
duration = params.get("duration", "1h")
reason = params.get("reason", "Automated suppression")
# Implement alert suppression logic
# This depends on your alerting system (Prometheus, AlertManager, etc.)
suppression = {
"target": action.target,
"duration": duration,
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
}
return {"suppressed": True, "suppression": suppression}
async def _handle_isolate(self, action: Action) -> Dict[str, Any]:
"""Handle service isolation"""
params = action.params
namespace = params.get("namespace", "default")
# Implement isolation logic
# For example, update service to point to maintenance page
command = [
"kubectl", "patch", "service", action.target,
"-n", namespace,
"-p", '{"spec": {"selector": {"app": "maintenance"}}}'
]
result = await self._run_command(command)
return {"isolated": True, "result": result}
async def _handle_failback(self, action: Action) -> Dict[str, Any]:
"""Handle failback to primary"""
params = action.params
namespace = params.get("namespace", "default")
# Implement failback logic
# Restore traffic to primary service
command = [
"kubectl", "patch", "service", action.target,
"-n", namespace,
"-p", '{"spec": {"selector": {"app": "' + action.target + '"}}}'
]
result = await self._run_command(command)
return {"failed_back": True, "result": result}
async def _run_command(self, command: List[str]) -> str:
"""Run shell command"""
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Command failed: {stderr.decode()}")
return stdout.decode()
class SelfHealingManager:
"""Manages self-healing policies and execution"""
def __init__(self, automation_engine: AutomationEngine):
self.automation_engine = automation_engine
self.healing_policies: Dict[str, Dict[str, Any]] = {}
self.active_healings: Dict[str, Dict[str, Any]] = {}
def register_healing_policy(
self,
service_name: str,
policy: Dict[str, Any]
) -> None:
"""Register a self-healing policy for a service"""
self.healing_policies[service_name] = policy
logger.info(f"Registered healing policy for {service_name}")
async def trigger_healing(
self,
service_name: str,
incident: Dict[str, Any]
) -> Dict[str, Any]:
"""Trigger self-healing for a service"""
if service_name not in self.healing_policies:
return {"error": "No healing policy registered"}
policy = self.healing_policies[service_name]
healing_id = f"healing_{service_name}_{int(datetime.utcnow().timestamp())}"
logger.info(f"Triggering self-healing {healing_id} for {service_name}")
healing_record = {
"id": healing_id,
"service": service_name,
"incident": incident,
"policy": policy,
"status": "running",
"actions": [],
"started_at": datetime.utcnow()
}
self.active_healings[healing_id] = healing_record
try:
# Execute healing actions based on policy
for action_def in policy.get("actions", []):
action = Action(
type=ActionType(action_def["type"]),
target=action_def.get("target", service_name),
params=action_def.get("params", {}),
timeout=action_def.get("timeout", 300)
)
result = await self.automation_engine.execute(action)
healing_record["actions"].append({
"action": action_def,
"result": result
})
# Check if healing was successful
if result["status"] != "success":
healing_record["status"] = "failed"
break
else:
healing_record["status"] = "success"
except Exception as e:
healing_record["status"] = "error"
healing_record["error"] = str(e)
logger.error(f"Self-healing failed: {e}")
finally:
healing_record["completed_at"] = datetime.utcnow()
healing_record["duration"] = (
healing_record["completed_at"] - healing_record["started_at"]
).total_seconds()
return healing_record
async def check_healing_status(self, healing_id: str) -> Optional[Dict[str, Any]]:
"""Get status of an active healing process"""
return self.active_healings.get(healing_id)
# aiops/observability/platform.py
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class ServiceHealth:
"""Service health status"""
service_name: str
status: str # healthy, degraded, unhealthy
latency_p99: float
error_rate: float
throughput: float
last_check: datetime
dependencies: List[str] = field(default_factory=list)
sla_status: str = "within_sla"
class ObservabilityPlatform:
"""Unified observability platform for AIOps"""
def __init__(self, data_sources: List[DataSource]):
self.data_sources = data_sources
self.service_health: Dict[str, ServiceHealth] = {}
self.slo_compliance: Dict[str, Dict[str, Any]] = {}
self.dashboard_configs: Dict[str, Dict[str, Any]] = {}
async def get_service_overview(self) -> Dict[str, Any]:
"""Get comprehensive service overview"""
overview = {
"total_services": len(self.service_health),
"healthy_services": 0,
"degraded_services": 0,
"unhealthy_services": 0,
"services": {}
}
for service_name, health in self.service_health.items():
overview["services"][service_name] = {
"status": health.status,
"latency_p99": health.latency_p99,
"error_rate": health.error_rate,
"throughput": health.throughput,
"sla_status": health.sla_status
}
if health.status == "healthy":
overview["healthy_services"] += 1
elif health.status == "degraded":
overview["degraded_services"] += 1
else:
overview["unhealthy_services"] += 1
return overview
async def check_slo_compliance(
self,
service_name: str,
slo_config: Dict[str, Any]
) -> Dict[str, Any]:
"""Check SLO compliance for a service"""
now = datetime.utcnow()
window = timedelta(days=30) # 30-day rolling window
# Collect metrics for SLO calculation
metrics = []
for source in self.data_sources:
try:
source_metrics = await source.collect_metrics(
query=slo_config["metric_query"],
start_time=now - window,
end_time=now
)
metrics.extend(source_metrics)
except Exception as e:
logger.error(f"Failed to collect metrics from {type(source).__name__}: {e}")
if not metrics:
return {"error": "No metrics available"}
# Calculate SLO compliance
total_requests = sum(m.value for m in metrics if m.name.endswith("_total"))
good_requests = sum(m.value for m in metrics if m.name.endswith("_success"))
if total_requests == 0:
return {"error": "No requests in time window"}
actual_availability = (good_requests / total_requests) * 100
target_availability = slo_config["target"]
compliance = {
"service": service_name,
"slo_name": slo_config["name"],
"target_availability": target_availability,
"actual_availability": actual_availability,
"compliance": actual_availability >= target_availability,
"error_budget": target_availability - actual_availability,
"time_window": window.days,
"total_requests": total_requests,
"good_requests": good_requests,
"calculated_at": now.isoformat()
}
self.slo_compliance[f"{service_name}_{slo_config['name']}"] = compliance
return compliance
async def generate_incident_report(
self,
incident_id: str,
start_time: datetime,
end_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""Generate comprehensive incident report"""
if not end_time:
end_time = datetime.utcnow()
report = {
"incident_id": incident_id,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_hours": (end_time - start_time).total_seconds() / 3600,
"affected_services": [],
"metrics_impact": {},
"error_rate_impact": {},
"user_impact": {},
"timeline": [],
"root_cause_analysis": {},
"action_items": []
}
# Collect data for affected services
for service_name, health in self.service_health.items():
if health.last_check >= start_time and health.last_check <= end_time:
report["affected_services"].append(service_name)
# Collect metrics impact
for source in self.data_sources:
try:
metrics = await source.collect_metrics(
query=f"rate(http_requests_total[5m])",
start_time=start_time,
end_time=end_time
)
if metrics:
report["metrics_impact"][service_name] = [
{"timestamp": m.timestamp.isoformat(), "value": m.value}
for m in metrics
]
except Exception as e:
logger.error(f"Failed to collect metrics for {service_name}: {e}")
return report
async def create_dashboard(
self,
dashboard_name: str,
config: Dict[str, Any]
) -> Dict[str, Any]:
"""Create a new observability dashboard"""
dashboard = {
"name": dashboard_name,
"description": config.get("description", ""),
"panels": config.get("panels", []),
"time_range": config.get("time_range", "1h"),
"refresh_interval": config.get("refresh_interval", "5m"),
"created_at": datetime.utcnow().isoformat()
}
self.dashboard_configs[dashboard_name] = dashboard
return dashboard
async def get_dashboard_data(
self,
dashboard_name: str,
time_range: Optional[str] = None
) -> Dict[str, Any]:
"""Get data for a specific dashboard"""
if dashboard_name not in self.dashboard_configs:
return {"error": "Dashboard not found"}
dashboard = self.dashboard_configs[dashboard_name]
now = datetime.utcnow()
# Parse time range
if time_range:
end_time = now
if time_range.endswith("h"):
start_time = now - timedelta(hours=int(time_range[:-1]))
elif time_range.endswith("d"):
start_time = now - timedelta(days=int(time_range[:-1]))
else:
start_time = now - timedelta(hours=1)
else:
start_time = now - timedelta(hours=1)
end_time = now
dashboard_data = {
"name": dashboard_name,
"time_range": {"start": start_time.isoformat(), "end": end_time.isoformat()},
"panels": []
}
# Collect data for each panel
for panel in dashboard["panels"]:
panel_data = {
"title": panel["title"],
"type": panel["type"],
"data": []
}
try:
# Collect metrics based on panel query
for source in self.data_sources:
metrics = await source.collect_metrics(
query=panel["query"],
start_time=start_time,
end_time=end_time
)
panel_data["data"].extend([
{"timestamp": m.timestamp.isoformat(), "value": m.value}
for m in metrics
])
except Exception as e:
panel_data["error"] = str(e)
dashboard_data["panels"].append(panel_data)
return dashboard_data
# SLO and SLA Management
class SLOManager:
"""Manages Service Level Objectives and Service Level Agreements"""
def __init__(self):
self.slos: Dict[str, Dict[str, Any]] = {}
self.slas: Dict[str, Dict[str, Any]] = {}
def define_slo(
self,
service_name: str,
slo_name: str,
target: float,
time_window: str,
alerting_burn_rate: float = 2.0
) -> Dict[str, Any]:
"""Define a Service Level Objective"""
slo = {
"service_name": service_name,
"slo_name": slo_name,
"target": target,
"time_window": time_window,
"alerting_burn_rate": alerting_burn_rate,
"created_at": datetime.utcnow().isoformat(),
"status": "active"
}
self.slos[f"{service_name}_{slo_name}"] = slo
return slo
def define_sla(
self,
service_name: str,
sla_name: str,
availability_target: float,
performance_targets: Dict[str, float],
penalties: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Define a Service Level Agreement"""
sla = {
"service_name": service_name,
"sla_name": sla_name,
"availability_target": availability_target,
"performance_targets": performance_targets,
"penalties": penalties or {},
"created_at": datetime.utcnow().isoformat(),
"status": "active"
}
self.slas[f"{service_name}_{sla_name}"] = sla
return sla
def calculate_error_budget(
self,
service_name: str,
slo_name: str,
period_days: int = 30
) -> Dict[str, Any]:
"""Calculate error budget for an SLO"""
slo_key = f"{service_name}_{slo_name}"
if slo_key not in self.slos:
return {"error": "SLO not found"}
slo = self.slos[slo_key]
target_availability = slo["target"]
# Calculate error budget
period_hours = period_days * 24
error_budget = (100 - target_availability) / 100 * period_hours
return {
"service": service_name,
"slo": slo_name,
"period_days": period_days,
"target_availability": target_availability,
"error_budget_hours": error_budget,
"error_budget_percentage": 100 - target_availability,
"remaining_error_budget": error_budget, # Would be calculated from actual metrics
"burn_rate": 0.0 # Would be calculated from actual metrics
}
This comprehensive AIOps skill provides production-ready patterns for implementing AI-powered operations in 2025, including intelligent monitoring, anomaly detection, predictive analytics, automation, and self-healing capabilities.
★ Insight ─────────────────────────────────────
The AIOps patterns shown here emphasize several key 2025 best practices:
These patterns ensure your AIOps implementation is intelligent, automated, and aligned with business objectives.
─────────────────────────────────────────────────