| name | api-authentication |
| description | API authentication patterns including JWT, OAuth 2.0, API keys, and session-based auth. Covers token generation, validation, refresh strategies, security best practices, and when to use each pattern. Use when implementing API authentication, choosing auth strategy, securing endpoints, or debugging auth issues. Prevents common vulnerabilities like token theft, replay attacks, and insecure storage. |
API Authentication Patterns
Comprehensive guide to implementing secure API authentication including JWT, OAuth 2.0, API keys, and session-based patterns. Covers when to use each approach, security best practices, and common vulnerabilities to avoid.
Quick Reference
When to use this skill:
- Implementing API authentication
- Choosing between auth strategies (JWT vs OAuth vs sessions)
- Securing API endpoints
- Implementing token refresh logic
- Debugging authentication issues
- Preventing auth vulnerabilities
Common triggers:
- "How should I implement authentication"
- "JWT vs OAuth vs API keys"
- "How to secure this API"
- "Implement refresh tokens"
- "Store authentication tokens securely"
- "Fix authentication vulnerability"
Prevents vulnerabilities:
- Token theft and replay attacks
- Insecure token storage
- Missing token expiration
- Weak password hashing
- CSRF attacks
Part 1: Authentication Strategy Decision Matrix
When to Use Each Pattern
| Pattern | Best For | Pros | Cons |
|---|
| JWT | Stateless APIs, microservices, mobile apps | Stateless, scalable, works across domains | Tokens can't be revoked easily, larger payload |
| OAuth 2.0 | Third-party access, social login, delegation | Industry standard, fine-grained permissions | Complex to implement, requires authorization server |
| API Keys | Server-to-server, public APIs, rate limiting | Simple, great for service accounts | Not for users, can't be scoped easily |
| Sessions | Traditional web apps, SSR, same-domain | Revocable, server-controlled, secure | Requires server state, doesn't scale horizontally easily |
Decision Tree
START: What type of client?
├─ Mobile app or SPA?
│ └─ Use JWT (stateless, works across domains)
│
├─ Third-party integration?
│ └─ Use OAuth 2.0 (delegation, scoped permissions)
│
├─ Service-to-service?
│ └─ Use API Keys (simple, rate-limitable)
│
└─ Traditional web app (same domain)?
└─ Use Sessions (revocable, server-controlled)
Part 2: JWT (JSON Web Tokens)
JWT Structure
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
[HEADER].[PAYLOAD].[SIGNATURE]
Header (algorithm and type):
{
"alg": "HS256",
"typ": "JWT"
}
Payload (claims):
{
"sub": "1234567890",
"name": "John Doe",
"iat": 1516239022,
"exp": 1516242622
}
Signature (verification):
HMACSHA256(
base64UrlEncode(header) + "." + base64UrlEncode(payload),
secret
)
JWT Implementation (Python)
import jwt
import datetime
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
SECRET_KEY = "your-256-bit-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
security = HTTPBearer()
def create_access_token(user_id: int) -> str:
"""Create short-lived access token."""
expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(user_id),
"exp": expires,
"iat": datetime.datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: int) -> str:
"""Create long-lived refresh token."""
expires = datetime.datetime.utcnow() + datetime.timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": str(user_id),
"exp": expires,
"iat": datetime.datetime.utcnow(),
"type": "refresh"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
"""Verify and decode JWT token."""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/login")
async def login(username: str, password: str):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.get("/protected")
async def protected_route(payload: dict = Depends(verify_token)):
user_id = payload["sub"]
return {"message": f"Hello user {user_id}"}
@app.post("/refresh")
async def refresh(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
user_id = int(payload["sub"])
new_access_token = create_access_token(user_id)
return {"access_token": new_access_token, "token_type": "bearer"}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Refresh token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
JWT Security Best Practices
✅ Do:
- Use strong secret keys (256-bit minimum)
- Always set expiration (
exp claim)
- Use short-lived access tokens (15 minutes)
- Use separate refresh tokens (7 days)
- Store secret in environment variables
- Use HTTPS only
- Validate signature on every request
- Check token type (
access vs refresh)
❌ Don't:
- Store sensitive data in payload (it's base64, not encrypted!)
- Use symmetric signing (HS256) for public APIs (use RS256)
- Store tokens in localStorage (XSS vulnerability)
- Skip expiration validation
- Use same token for access and refresh
- Hard-code secrets
Token Storage (Client-Side)
❌ Bad (localStorage - vulnerable to XSS):
localStorage.setItem('token', token);
✅ Good (httpOnly cookie):
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=True,
samesite="lax",
max_age=900
)
✅ Also Good (memory only for SPAs):
let accessToken = null;
async function login(username, password) {
const response = await fetch('/login', {
method: 'POST',
body: JSON.stringify({ username, password })
});
const data = await response.json();
accessToken = data.access_token;
}
Part 3: OAuth 2.0
OAuth 2.0 Flows
Authorization Code Flow (most common, for web apps):
1. Client → Authorization Server: "User wants to log in"
2. Authorization Server → User: Login page
3. User → Authorization Server: Credentials
4. Authorization Server → Client: Authorization code
5. Client → Authorization Server: Exchange code for access token
6. Authorization Server → Client: Access token + refresh token
Client Credentials Flow (for service-to-service):
1. Service → Authorization Server: Client ID + Secret
2. Authorization Server → Service: Access token
OAuth 2.0 Implementation (Authorization Code Flow)
from fastapi import FastAPI, HTTPException
from authlib.integrations.starlette_client import OAuth
import os
app = FastAPI()
oauth = OAuth()
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
@app.get('/login/google')
async def login_google(request: Request):
redirect_uri = request.url_for('auth_google')
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google')
async def auth_google(request: Request):
try:
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')
user = get_or_create_user(
email=user_info['email'],
name=user_info['name']
)
access_token = create_access_token(user.id)
return {"access_token": access_token, "token_type": "bearer"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
OAuth 2.0 Scopes
SCOPES = {
"read:posts": "Read posts",
"write:posts": "Create and edit posts",
"delete:posts": "Delete posts",
"read:profile": "Read user profile",
"write:profile": "Update user profile"
}
def create_access_token(user_id: int, scopes: list[str]) -> str:
payload = {
"sub": str(user_id),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=15),
"scopes": scopes
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def require_scopes(required_scopes: list[str]):
def decorator(func):
async def wrapper(payload: dict = Depends(verify_token)):
token_scopes = payload.get("scopes", [])
if not all(scope in token_scopes for scope in required_scopes):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(payload)
return wrapper
return decorator
@app.delete("/posts/{post_id}")
@require_scopes(["delete:posts"])
async def delete_post(post_id: int, payload: dict = Depends(verify_token)):
pass
Part 4: API Keys
API Key Implementation
import secrets
import hashlib
from datetime import datetime
def generate_api_key() -> tuple[str, str]:
"""Generate API key and return (key, hashed_key)."""
api_key = secrets.token_urlsafe(32)
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
return api_key, hashed_key
def create_api_key(user_id: int, name: str) -> str:
api_key, hashed_key = generate_api_key()
db.execute("""
INSERT INTO api_keys (user_id, name, key_hash, created_at)
VALUES (?, ?, ?, ?)
""", user_id, name, hashed_key, datetime.utcnow())
return api_key
def verify_api_key(api_key: str) -> dict:
"""Verify API key and return user info."""
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
result = db.execute("""
SELECT user_id, name, created_at, last_used_at
FROM api_keys
WHERE key_hash = ? AND revoked_at IS NULL
""", hashed_key).fetchone()
if not result:
raise HTTPException(status_code=401, detail="Invalid API key")
db.execute("""
UPDATE api_keys
SET last_used_at = ?
WHERE key_hash = ?
""", datetime.utcnow(), hashed_key)
return {"user_id": result[0], "key_name": result[1]}
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
@app.get("/api/data")
async def get_data(api_key: str = Security(api_key_header)):
user_info = verify_api_key(api_key)
return {"data": "protected data", "user_id": user_info["user_id"]}
API Key Best Practices
✅ Do:
- Hash keys before storing (use SHA-256 minimum)
- Generate cryptographically secure keys (
secrets module)
- Allow users to name keys ("Production Server", "CI/CD")
- Track last used timestamp
- Allow key revocation
- Rate limit by API key
- Log API key usage
❌ Don't:
- Store plain text keys
- Use predictable key generation
- Expose keys in URLs (use headers)
- Share keys across environments
API Key Revocation
@app.delete("/api-keys/{key_id}")
async def revoke_api_key(key_id: int, current_user: dict = Depends(get_current_user)):
db.execute("""
UPDATE api_keys
SET revoked_at = ?
WHERE id = ? AND user_id = ?
""", datetime.utcnow(), key_id, current_user["id"])
return {"message": "API key revoked"}
Part 5: Session-Based Authentication
Session Implementation
from fastapi import FastAPI, Cookie, Response
import redis
import secrets
import json
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def create_session(user_id: int) -> str:
"""Create session and return session ID."""
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
}
redis_client.setex(
f"session:{session_id}",
86400,
json.dumps(session_data)
)
return session_id
def verify_session(session_id: str) -> dict:
"""Verify session and return user data."""
session_data = redis_client.get(f"session:{session_id}")
if not session_data:
raise HTTPException(status_code=401, detail="Session expired")
return json.loads(session_data)
@app.post("/login")
async def login(username: str, password: str, response: Response):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
session_id = create_session(user.id)
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400
)
return {"message": "Logged in successfully"}
@app.get("/protected")
async def protected_route(session_id: str = Cookie(None)):
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
session_data = verify_session(session_id)
user_id = session_data["user_id"]
return {"message": f"Hello user {user_id}"}
@app.post("/logout")
async def logout(session_id: str = Cookie(None), response: Response):
if session_id:
redis_client.delete(f"session:{session_id}")
response.delete_cookie("session_id")
return {"message": "Logged out successfully"}
Part 6: Password Security
Password Hashing (Never Store Plain Text!)
import bcrypt
def hash_password(password: str) -> str:
"""Hash password with bcrypt."""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
@app.post("/register")
async def register(username: str, password: str):
if len(password) < 12:
raise HTTPException(status_code=400, detail="Password must be at least 12 characters")
hashed_password = hash_password(password)
db.execute("""
INSERT INTO users (username, password_hash)
VALUES (?, ?)
""", username, hashed_password)
return {"message": "User created"}
@app.post("/login")
async def login(username: str, password: str):
user = db.execute("SELECT id, password_hash FROM users WHERE username = ?", username).fetchone()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
if not verify_password(password, user[1]):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user[0])
return {"access_token": access_token}
Password Requirements
Minimum requirements:
- At least 12 characters (NIST recommendation)
- Mix of uppercase, lowercase, numbers, symbols
- Not in common password list
- Not similar to username
Implementation:
import re
def validate_password(password: str, username: str) -> tuple[bool, str]:
"""Validate password strength."""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not re.search(r"[a-z]", password):
return False, "Password must contain lowercase letter"
if not re.search(r"[A-Z]", password):
return False, "Password must contain uppercase letter"
if not re.search(r"\d", password):
return False, "Password must contain number"
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False, "Password must contain special character"
if username.lower() in password.lower():
return False, "Password cannot contain username"
if is_common_password(password):
return False, "Password is too common"
return True, "Password is strong"
Part 7: Common Vulnerabilities
Vulnerability 1: Token Replay Attacks
Problem: Attacker intercepts token and reuses it
✅ Solution: Short expiration + refresh tokens
def refresh_tokens(refresh_token: str):
payload = jwt.decode(refresh_token, SECRET_KEY)
if redis_client.get(f"used:{refresh_token}"):
raise HTTPException(status_code=401, detail="Token already used")
redis_client.setex(f"used:{refresh_token}", 604800, "1")
user_id = int(payload["sub"])
new_access = create_access_token(user_id)
new_refresh = create_refresh_token(user_id)
return {"access_token": new_access, "refresh_token": new_refresh}
Vulnerability 2: CSRF Attacks
Problem: Attacker tricks user into making authenticated request
✅ Solution: CSRF tokens + SameSite cookies
from fastapi import Cookie, Header
def verify_csrf(
csrf_token: str = Header(None, alias="X-CSRF-Token"),
session_id: str = Cookie(None)
):
"""Verify CSRF token matches session."""
if not csrf_token:
raise HTTPException(status_code=403, detail="CSRF token missing")
session_data = verify_session(session_id)
stored_csrf = session_data.get("csrf_token")
if csrf_token != stored_csrf:
raise HTTPException(status_code=403, detail="Invalid CSRF token")
@app.post("/sensitive-action")
async def sensitive_action(
csrf_check: None = Depends(verify_csrf)
):
pass
Vulnerability 3: Timing Attacks
Problem: Attacker uses response timing to guess credentials
✅ Solution: Constant-time comparison
import hmac
def constant_time_compare(a: str, b: str) -> bool:
"""Compare strings in constant time (prevents timing attacks)."""
return hmac.compare_digest(a, b)
if not constant_time_compare(provided_hash, stored_hash):
raise HTTPException(status_code=401)
Part 8: Rate Limiting
from fastapi import Request
import time
rate_limits = {}
def rate_limit(max_requests: int, window_seconds: int):
"""Rate limit decorator."""
def decorator(func):
async def wrapper(request: Request, *args, **kwargs):
client_ip = request.client.host
key = f"{client_ip}:{func.__name__}"
now = time.time()
if key not in rate_limits:
rate_limits[key] = []
rate_limits[key] = [
req_time for req_time in rate_limits[key]
if now - req_time < window_seconds
]
if len(rate_limits[key]) >= max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window_seconds} seconds."
)
rate_limits[key].append(now)
return await func(request, *args, **kwargs)
return wrapper
return decorator
@app.post("/login")
@rate_limit(max_requests=5, window_seconds=60)
async def login(request: Request, username: str, password: str):
pass
Quick Security Checklist
Token Security:
Password Security:
API Security:
Resources
JWT:
OAuth 2.0:
Security:
Libraries: