// Authentication and authorization including JWT, OAuth2, OIDC, sessions, RBAC, and security analysis. Activate for login, auth flows, security audits, threat modeling, access control, and identity management.
| name | authentication |
| description | Authentication and authorization including JWT, OAuth2, OIDC, sessions, RBAC, and security analysis. Activate for login, auth flows, security audits, threat modeling, access control, and identity management. |
| allowed-tools | ["Bash","Read","Write","Edit","Glob","Grep","Task","WebFetch","WebSearch"] |
| dependencies | ["extended-thinking","deep-analysis","complex-reasoning"] |
| triggers | ["authentication","authorization","auth flow","security audit","threat model","JWT","OAuth","OIDC","session","RBAC","access control"] |
Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.
Activate this skill when working with:
```python from jose import jwt from datetime import datetime, timedelta from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(user_id: str, roles: list[str]) -> str: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "roles": roles, "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = { "sub": user_id, "exp": expire, "type": "refresh" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token") ```
```python def hash_password(password: str) -> str: return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password)
async def authenticate_user(email: str, password: str) -> User | None: user = await get_user_by_email(email) if not user: return None if not verify_password(password, user.hashed_password): return None return user ```
```python from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: payload = verify_token(token) user = await get_user(payload["sub"]) if not user: raise HTTPException(status_code=401, detail="User not found") return user
async def get_current_active_user(user: User = Depends(get_current_user)) -> User: if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user
def require_roles(*roles: str): async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return role_checker
@app.get("/admin") async def admin_route(user: User = Depends(require_roles("admin"))): return {"message": "Admin access granted"} ```
```python from authlib.integrations.starlette_client import OAuth
oauth = OAuth() oauth.register( name='google', client_id=os.environ['GOOGLE_CLIENT_ID'], client_secret=os.environ['GOOGLE_CLIENT_SECRET'], server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} )
@app.get('/auth/google') async def google_login(request: Request): redirect_uri = request.url_for('google_callback') return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback') async def google_callback(request: Request): token = await oauth.google.authorize_access_token(request) user_info = token.get('userinfo')
# Find or create user
user = await get_or_create_user(
email=user_info['email'],
name=user_info['name'],
provider='google'
)
# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}
```
```python oauth.register( name='github', client_id=os.environ['GITHUB_CLIENT_ID'], client_secret=os.environ['GITHUB_CLIENT_SECRET'], authorize_url='https://github.com/login/oauth/authorize', access_token_url='https://github.com/login/oauth/access_token', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'} ) ```
```python from fastapi import Request, Response import secrets
async def create_session(user_id: str, response: Response) -> str: session_id = secrets.token_urlsafe(32)
# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400) # 24 hours
# Set cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400
)
return session_id
async def get_session(request: Request) -> dict | None: session_id = request.cookies.get("session_id") if not session_id: return None
session = await redis.hgetall(f"session:{session_id}")
if not session:
return None
# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return session
async def destroy_session(request: Request, response: Response): session_id = request.cookies.get("session_id") if session_id: await redis.delete(f"session:{session_id}") response.delete_cookie("session_id") ```
```python from enum import Enum from typing import Set
class Permission(str, Enum): READ_AGENTS = "read:agents" WRITE_AGENTS = "write:agents" DELETE_AGENTS = "delete:agents" ADMIN = "admin"
ROLE_PERMISSIONS: dict[str, Set[Permission]] = { "viewer": {Permission.READ_AGENTS}, "operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS}, "admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN}, }
def has_permission(user_roles: list[str], required: Permission) -> bool: for role in user_roles: if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]: return True return False
def require_permission(permission: Permission): async def permission_checker(user: User = Depends(get_current_user)): if not has_permission(user.roles, permission): raise HTTPException(status_code=403, detail="Permission denied") return user return permission_checker
@app.delete("/agents/{id}") async def delete_agent( id: str, user: User = Depends(require_permission(Permission.DELETE_AGENTS)) ): await agent_service.delete(id) return {"status": "deleted"} ```
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='custom_provider',
client_id=os.environ['OAUTH_CLIENT_ID'],
client_secret=os.environ['OAUTH_CLIENT_SECRET'],
authorize_url='https://provider.com/oauth/authorize',
authorize_params=None,
access_token_url='https://provider.com/oauth/token',
access_token_params=None,
refresh_token_url=None,
client_kwargs={'scope': 'openid profile email'}
)
@app.get('/auth/login')
async def login(request: Request):
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
await redis.set(f"oauth_state:{state}", "1", ex=600)
redirect_uri = request.url_for('auth_callback')
return await oauth.custom_provider.authorize_redirect(
request,
redirect_uri,
state=state
)
@app.get('/auth/callback')
async def auth_callback(request: Request):
# Verify state (CSRF protection)
state = request.query_params.get('state')
if not await redis.get(f"oauth_state:{state}"):
raise HTTPException(status_code=400, detail="Invalid state")
await redis.delete(f"oauth_state:{state}")
# Exchange authorization code for tokens
token = await oauth.custom_provider.authorize_access_token(request)
user_info = token.get('userinfo')
# Create or update user
user = await upsert_user(user_info)
# Issue application tokens
access_token = create_access_token(user.id, user.roles)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
# Frontend (JavaScript/TypeScript)
import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'
// Generate PKCE parameters
const codeVerifier = generateCodeVerifier()
const codeChallenge = await generateCodeChallenge(codeVerifier)
// Store verifier for later use
sessionStorage.setItem('code_verifier', codeVerifier)
// Redirect to authorization endpoint
const authUrl = new URL('https://provider.com/oauth/authorize')
authUrl.searchParams.set('client_id', CLIENT_ID)
authUrl.searchParams.set('redirect_uri', REDIRECT_URI)
authUrl.searchParams.set('response_type', 'code')
authUrl.searchParams.set('scope', 'openid profile email')
authUrl.searchParams.set('code_challenge', codeChallenge)
authUrl.searchParams.set('code_challenge_method', 'S256')
authUrl.searchParams.set('state', generateState())
window.location.href = authUrl.toString()
// In callback handler
const code = new URLSearchParams(window.location.search).get('code')
const codeVerifier = sessionStorage.getItem('code_verifier')
const tokenResponse = await fetch('https://provider.com/oauth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code: code,
redirect_uri: REDIRECT_URI,
client_id: CLIENT_ID,
code_verifier: codeVerifier
})
})
import httpx
from datetime import datetime, timedelta
class ServiceAuthClient:
def __init__(self):
self.token = None
self.expires_at = None
async def get_token(self) -> str:
# Return cached token if still valid
if self.token and self.expires_at > datetime.utcnow():
return self.token
# Request new token
async with httpx.AsyncClient() as client:
response = await client.post(
'https://provider.com/oauth/token',
data={
'grant_type': 'client_credentials',
'client_id': os.environ['SERVICE_CLIENT_ID'],
'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
'scope': 'api.read api.write'
}
)
response.raise_for_status()
data = response.json()
self.token = data['access_token']
self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)
return self.token
# Usage
auth_client = ServiceAuthClient()
async def call_protected_api():
token = await auth_client.get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
'https://api.service.com/resource',
headers={'Authorization': f'Bearer {token}'}
)
return response.json()
from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx
class OIDCValidator:
def __init__(self, issuer: str, client_id: str):
self.issuer = issuer
self.client_id = client_id
self.jwks = None
self.jwks_updated_at = None
async def get_jwks(self) -> dict:
# Refresh JWKS if stale (cache for 24 hours)
if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.issuer}/.well-known/jwks.json")
response.raise_for_status()
self.jwks = response.json()
self.jwks_updated_at = datetime.utcnow()
return self.jwks
async def validate_id_token(self, id_token: str) -> dict:
# Decode header to get key ID
header = jwt.get_unverified_header(id_token)
kid = header['kid']
# Get JWKS and find matching key
jwks = await self.get_jwks()
key = next((k for k in jwks['keys'] if k['kid'] == kid), None)
if not key:
raise ValueError("Public key not found in JWKS")
# Convert JWK to PEM
public_key = jwk.construct(key)
# Validate and decode ID token
try:
claims = jwt.decode(
id_token,
public_key.to_pem().decode('utf-8'),
algorithms=['RS256'],
audience=self.client_id,
issuer=self.issuer,
options={
'verify_exp': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True
}
)
# Additional validations
if claims.get('nonce'):
# Verify nonce matches what was sent in auth request
pass
return claims
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="ID token expired")
except jwt.JWTClaimsError as e:
raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
except Exception as e:
raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")
# Usage
validator = OIDCValidator(
issuer="https://provider.com",
client_id=os.environ['OIDC_CLIENT_ID']
)
@app.post("/auth/oidc/callback")
async def oidc_callback(id_token: str):
claims = await validator.validate_id_token(id_token)
# Extract user information
user = await get_or_create_user(
email=claims['email'],
name=claims['name'],
sub=claims['sub']
)
return {"user": user, "claims": claims}
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os
# Use RS256 (asymmetric) instead of HS256 for public verification
PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH')
PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')
def load_keys():
with open(PRIVATE_KEY_PATH, 'rb') as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None,
backend=default_backend()
)
with open(PUBLIC_KEY_PATH, 'rb') as f:
public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
return private_key, public_key
PRIVATE_KEY, PUBLIC_KEY = load_keys()
def create_secure_access_token(
user_id: str,
roles: list[str],
tenant_id: str = None,
custom_claims: dict = None
) -> str:
now = datetime.utcnow()
payload = {
# Standard claims (RFC 7519)
"iss": "https://api.yourdomain.com", # Issuer
"sub": user_id, # Subject (user ID)
"aud": ["https://api.yourdomain.com"], # Audience
"exp": now + timedelta(minutes=15), # Expiration (short-lived)
"nbf": now, # Not before
"iat": now, # Issued at
"jti": secrets.token_urlsafe(16), # JWT ID (unique token identifier)
# Custom claims
"roles": roles,
"type": "access",
}
# Add tenant context for multi-tenant applications
if tenant_id:
payload["tenant_id"] = tenant_id
# Add any custom claims
if custom_claims:
payload.update(custom_claims)
return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
def verify_secure_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=["RS256"],
audience=["https://api.yourdomain.com"],
issuer="https://api.yourdomain.com",
options={
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
'require_exp': True,
'require_iat': True,
'require_nbf': True
}
)
# Validate token type
if payload.get('type') != 'access':
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
class TokenBlacklist:
"""Redis-based token blacklist for revoked tokens"""
def __init__(self, redis_client):
self.redis = redis_client
async def revoke_token(self, jti: str, exp: int):
"""Add token to blacklist until expiration"""
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)
async def is_revoked(self, jti: str) -> bool:
"""Check if token is blacklisted"""
return await self.redis.exists(f"blacklist:{jti}")
# Global blacklist instance
token_blacklist = TokenBlacklist(redis)
async def get_current_user_with_revocation_check(
token: str = Depends(oauth2_scheme)
) -> User:
payload = verify_secure_token(token)
# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
raise HTTPException(status_code=401, detail="Token has been revoked")
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
payload = verify_secure_token(token)
await token_blacklist.revoke_token(payload['jti'], payload['exp'])
return {"message": "Logged out successfully"}
When reviewing authentication flows, use extended thinking for comprehensive security analysis.
## Authentication Flow Security Review
**Flow**: [Login/OAuth/SSO/API Authentication]
**Date**: [YYYY-MM-DD]
**Reviewer**: [Name/Agent]
### Flow Diagram
[Document the authentication flow step-by-step]
### Security Analysis Checklist
#### Confidentiality
- [ ] Credentials transmitted over HTTPS only
- [ ] Passwords hashed with strong algorithm (bcrypt/argon2)
- [ ] Tokens encrypted in transit
- [ ] Sensitive data not logged
- [ ] Secrets stored securely (env vars, secrets manager)
#### Integrity
- [ ] CSRF protection implemented
- [ ] Request tampering prevented
- [ ] Token signature validation
- [ ] State parameter validated (OAuth)
- [ ] Nonce validated (OIDC)
#### Availability
- [ ] Rate limiting on auth endpoints
- [ ] Account lockout after failed attempts
- [ ] DDoS protection in place
- [ ] Graceful degradation strategy
- [ ] Session timeout configured
#### Authentication
- [ ] MFA available for sensitive accounts
- [ ] Password complexity requirements
- [ ] Credential stuffing protection
- [ ] Brute force mitigation
- [ ] Session fixation prevention
#### Authorization
- [ ] Principle of least privilege
- [ ] Role-based access control
- [ ] Permission checks on every request
- [ ] Token scope validation
- [ ] Tenant isolation (multi-tenant apps)
#### Session Management
- [ ] Secure session tokens
- [ ] HttpOnly, Secure, SameSite cookies
- [ ] Session timeout implemented
- [ ] Logout functionality
- [ ] Concurrent session handling
### Extended Thinking Analysis
Use Claude with extended thinking for deep security review:
```python
import anthropic
client = anthropic.Anthropic()
security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:
[Paste authentication code/flow description]
Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws
Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""
response = client.messages.create(
model="claude-opus-4-5-20250514",
max_tokens=32000,
thinking={
"type": "enabled",
"budget_tokens": 20000 # High budget for security analysis
},
messages=[{
"role": "user",
"content": security_review_prompt
}]
)
# Extract thinking and analysis
for block in response.content:
if block.type == "thinking":
print(f"Deep Analysis:\n{block.thinking}\n")
elif block.type == "text":
print(f"Findings:\n{block.text}")
Adapted from [[deep-analysis]] skill threat modeling template:
## Authentication Threat Model
### System: [Auth System Name]
**Version**: 1.0
**Last Updated**: [Date]
### Trust Boundaries
┌─────────────────────────────────────────┐ │ External (Untrusted) │ │ [End Users] [Credential Stuffers] │ │ [MITM Attackers] │ └──────────────────┬──────────────────────┘ │ TLS/HTTPS ┌──────────────────┴──────────────────────┐ │ Public API (Semi-trusted) │ │ [API Gateway] [Auth Endpoints] │ │ [OAuth Providers] [OIDC IdP] │ └──────────────────┬──────────────────────┘ │ Internal Auth ┌──────────────────┴──────────────────────┐ │ Application Layer (Trusted) │ │ [Business Logic] [User Management] │ └──────────────────┬──────────────────────┘ │ DB Protocol ┌──────────────────┴──────────────────────┐ │ Data Layer (Highly Trusted) │ │ [User DB] [Session Store] [Secrets] │ └─────────────────────────────────────────┘
### STRIDE Analysis
#### Spoofing Identity
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credential theft via phishing | High | Critical | MFA, user education | ✅ |
| Session hijacking | Medium | High | Secure cookies, HTTPS | ✅ |
| Token replay attacks | Medium | High | Short token lifetime, JTI tracking | ✅ |
| OAuth state manipulation | Low | Medium | Cryptographic state validation | ✅ |
| Impersonation via stolen refresh token | Medium | Critical | Refresh token rotation, device binding | ⚠️ |
#### Tampering with Data
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| JWT payload manipulation | Low | Critical | Signature verification, RS256 | ✅ |
| Cookie tampering | Low | High | Signed cookies, HMAC validation | ✅ |
| OAuth callback manipulation | Medium | High | Redirect URI validation | ✅ |
| Password reset token tampering | Low | High | Cryptographic tokens, time limits | ✅ |
#### Repudiation
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Deny unauthorized access | Medium | Medium | Comprehensive audit logging | ✅ |
| Dispute authentication events | Low | Low | Immutable audit trail, timestamps | ✅ |
#### Information Disclosure
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credentials in logs | Medium | Critical | Sanitize logs, secret detection | ✅ |
| User enumeration via login | High | Medium | Generic error messages | ⚠️ |
| Token leakage in URLs | Low | High | Tokens in headers only | ✅ |
| Timing attacks on password check | Medium | Medium | Constant-time comparison | ✅ |
| JWKS endpoint information leak | Low | Low | Rate limiting, monitoring | ✅ |
#### Denial of Service
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Brute force attacks | High | High | Rate limiting, CAPTCHA, account lockout | ✅ |
| Resource exhaustion (bcrypt) | Medium | Medium | Request throttling, async processing | ✅ |
| Session store exhaustion | Low | High | Session limits per user, TTL | ✅ |
| OAuth callback flooding | Medium | Medium | State validation, rate limiting | ⚠️ |
#### Elevation of Privilege
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Role manipulation in JWT | Low | Critical | Server-side role verification | ✅ |
| Privilege escalation via API | Medium | Critical | Permission checks on every request | ✅ |
| Admin impersonation | Low | Critical | Additional auth for admin actions | ⚠️ |
| OAuth scope escalation | Low | High | Strict scope validation | ✅ |
### Risk Matrix
| Threat ID | Threat | Likelihood | Impact | Risk Score | Priority |
|-----------|--------|------------|--------|------------|----------|
| T1 | Credential stuffing attack | High | Critical | 9 | P0 |
| T2 | Refresh token theft | Medium | Critical | 8 | P1 |
| T3 | User enumeration | High | Medium | 6 | P2 |
| T4 | OAuth callback flooding | Medium | Medium | 4 | P2 |
| T5 | Admin impersonation | Low | Critical | 7 | P1 |
### Attack Scenarios
#### Scenario 1: Credential Stuffing Attack
**Attacker Goal**: Gain unauthorized access using leaked credentials
**Attack Steps**:
1. Obtain credential database from breach
2. Automate login attempts across accounts
3. Bypass rate limiting with distributed IPs
4. Identify valid credentials
5. Access user accounts
**Defenses**:
- Rate limiting per IP and per account
- CAPTCHA after N failed attempts
- Anomaly detection (impossible travel, new device)
- Breach database monitoring (HaveIBeenPwned)
- Mandatory MFA for sensitive accounts
#### Scenario 2: Token Theft via XSS
**Attacker Goal**: Steal access token to impersonate user
**Attack Steps**:
1. Inject malicious script via vulnerable input
2. Script reads token from localStorage
3. Exfiltrate token to attacker server
4. Use token to access API as victim
**Defenses**:
- Store tokens in HttpOnly cookies (not accessible to JS)
- Content Security Policy (CSP)
- Input sanitization and validation
- Regular security audits
- Short token lifetimes
### Recommendations by Priority
#### P0 (Critical - Immediate Action)
1. Implement credential stuffing protection
2. Add device fingerprinting for anomaly detection
3. Enable MFA for all admin accounts
#### P1 (High - Within Sprint)
1. Implement refresh token rotation
2. Add additional auth step for admin impersonation
3. Strengthen OAuth callback validation
#### P2 (Medium - Next Quarter)
1. Improve user enumeration protection
2. Implement risk-based authentication
3. Add behavioral biometrics
# VULNERABLE: Client-side role check only
@app.get("/admin/users")
async def get_users(user: User = Depends(get_current_user)):
# No server-side permission check!
return await db.users.find_all()
# SECURE: Server-side permission enforcement
@app.get("/admin/users")
async def get_users(user: User = Depends(require_permission(Permission.ADMIN))):
# Permission verified on server
return await db.users.find_all()
# VULNERABLE: Weak hashing
hashed = hashlib.md5(password.encode()).hexdigest()
# SECURE: Strong adaptive hashing
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Adjust based on security/performance needs
)
hashed = pwd_context.hash(password)
# VULNERABLE: SQL injection in auth query
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"
# SECURE: Parameterized queries
query = "SELECT * FROM users WHERE email = $1"
user = await db.fetch_one(query, email)
if user and pwd_context.verify(password, user.hashed_password):
return user
# VULNERABLE: Weak session management
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()
# SECURE: Cryptographically secure session tokens
import secrets
session_id = secrets.token_urlsafe(32)
# VULNERABLE: No rate limiting
@app.post("/auth/login")
async def login(credentials: LoginRequest):
return await authenticate(credentials)
# SECURE: Rate limiting with slowdown
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login")
@limiter.limit("5/minute") # 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
return await authenticate(credentials)
For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):
from keycloak import KeycloakOpenID
# Configure Keycloak client
keycloak_openid = KeycloakOpenID(
server_url="http://localhost:8080/",
client_id="your-client",
realm_name="your-realm",
client_secret_key="your-secret"
)
# Get token
token = keycloak_openid.token(username, password)
# Validate token
token_info = keycloak_openid.introspect(token['access_token'])
# Get user info
user_info = keycloak_openid.userinfo(token['access_token'])
# Decode and verify token locally
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + \
keycloak_openid.public_key() + \
"\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
token['access_token'],
key=KEYCLOAK_PUBLIC_KEY,
options=options
)
# Debug JWT token
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"
# Verify token signature
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt
# Check token expiration
date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Log OAuth flow steps
logger.debug(f"Redirect URI: {redirect_uri}")
logger.debug(f"State: {state}")
logger.debug(f"Code: {code}")
logger.debug(f"Token response: {token_response}")
# Check Redis session data
redis-cli
> KEYS session:*
> HGETALL session:abc123
> TTL session:abc123
Last Updated: 2025-12-12 Version: 2.0.0 Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices