// Comprehensive logging and observability patterns for production systems. Use when implementing structured logging, distributed tracing, metrics collection, or alerting. Triggers: logging, logs, observability, tracing, metrics, OpenTelemetry, correlation ID, spans, alerts, monitoring, JSON logs.
Observability enables understanding system behavior through logs, metrics, and traces. This skill covers structured logging, distributed tracing, metrics collection, OpenTelemetry integration, and effective alert design.
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any
# Context variables for request tracking
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)
def setup_logging():
"""Configure structured logging."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
# Usage
logger = logging.getLogger(__name__)
logger.info("User logged in", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}
| Level | Usage | Examples |
|---|---|---|
| TRACE | Fine-grained debugging | Loop iterations, variable values |
| DEBUG | Diagnostic information | Function entry/exit, intermediate states |
| INFO | Normal operations | Request started, job completed, user action |
| WARN | Potential issues | Deprecated API usage, retry attempted, slow query |
| ERROR | Failures requiring attention | Exception caught, operation failed |
| FATAL | Critical failures | System cannot continue, data corruption |
# Log level usage examples
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}})
logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# Restore parent span if exists
# In production, use a span stack
def _export(self, span: Span):
"""Export span to tracing backend."""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})
# Context manager for spans
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
# Usage
tracer = Tracer("order-service")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]
# Usage
metrics = MetricsRegistry()
# Counter for requests
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
# Gauge for active connections
active_connections = metrics.gauge("active_connections")
active_connections.inc()
# ... handle connection ...
active_connections.dec()
# Histogram for request duration
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
# ... handle request ...
request_duration.observe(time.time() - start)
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""Initialize OpenTelemetry with OTLP export."""
# Tracing setup
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# Metrics setup
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# Auto-instrumentation
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# Usage with FastAPI
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
# Custom spans
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
# Prometheus alerting rules example
groups:
- name: service-alerts
rules:
# High error rate alert
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m]))
/ sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
# High latency alert
- alert: HighLatency
expr: |
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s"
# Service down alert
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.instance }} is down"
description: "{{ $labels.job }} has been down for more than 1 minute"
Log at Appropriate Levels: Use DEBUG for development, INFO for normal operations, ERROR for failures.
Include Context: Always include correlation IDs, user IDs, and relevant business identifiers.
Avoid Sensitive Data: Never log passwords, tokens, or PII. Use redaction when necessary.
Use Structured Logging: JSON logs enable easy parsing and querying.
Trace Boundaries: Create spans at service boundaries and significant operations.
Meaningful Metrics: Track the Four Golden Signals - latency, traffic, errors, saturation.
Alert on Symptoms: Alert on user-impacting issues, not every metric fluctuation.
Include Runbooks: Every alert should link to a runbook with investigation steps.
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# Record metrics
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# Add correlation ID to response
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise