// API security best practices and common vulnerability prevention. Enforces security checks for authentication, input validation, SQL injection, XSS, and OWASP Top 10 vulnerabilities. Use when building or modifying APIs.
| name | api-security |
| description | API security best practices and common vulnerability prevention. Enforces security checks for authentication, input validation, SQL injection, XSS, and OWASP Top 10 vulnerabilities. Use when building or modifying APIs. |
This guardrail skill enforces critical security practices when building APIs. It helps prevent common vulnerabilities including OWASP Top 10 threats, ensuring your API is secure by design.
Auto-activates when:
Every API endpoint must have explicit authentication:
# Good - Authentication required
@app.post("/api/users")
@require_auth # Explicit authentication decorator
async def create_user(request: Request):
user = get_current_user(request)
# Implementation
// Good - Authentication middleware
router.post('/api/users', authenticate, async (req, res) => {
const user = req.user; // Set by authenticate middleware
// Implementation
});
Never skip authentication:
# BAD - No authentication!
@app.post("/api/users")
async def create_user(request: Request):
# Anyone can call this!
pass
Authentication (who you are) is not enough - check authorization (what you can do):
@app.delete("/api/users/{user_id}")
@require_auth
async def delete_user(user_id: str, request: Request):
current_user = get_current_user(request)
# Authorization check
if not current_user.is_admin and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Forbidden")
# Proceed with deletion
await delete_user_by_id(user_id)
Use industry-standard tokens:
# Good - JWT with expiration
import jwt
from datetime import datetime, timedelta
def create_access_token(user_id: str) -> str:
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow(),
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
# Validate tokens properly
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
Never trust user input - always validate:
from pydantic import BaseModel, Field, validator
class CreateUserRequest(BaseModel):
"""Validated user creation request."""
username: str = Field(..., min_length=3, max_length=50, regex="^[a-zA-Z0-9_]+$")
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
age: int = Field(..., ge=0, le=150)
@validator("username")
def username_no_admin(cls, v):
if "admin" in v.lower():
raise ValueError("Username cannot contain 'admin'")
return v
@app.post("/api/users")
async def create_user(data: CreateUserRequest): # Automatic validation
# data is guaranteed valid here
pass
Prevent XSS by escaping output:
import html
@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
user = await get_user_by_id(user_id)
# Sanitize output for web display
return {
"username": html.escape(user.username),
"bio": html.escape(user.bio),
}
Prevent abuse with rate limiting:
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/login")
@limiter.limit("5/minute") # Max 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
# Implementation
pass
NEVER concatenate user input into SQL:
# CRITICAL VULNERABILITY - SQL Injection!
user_id = request.query_params.get("id")
query = f"SELECT * FROM users WHERE id = {user_id}" # NEVER DO THIS!
result = db.execute(query)
# Good - Parameterized query
user_id = request.query_params.get("id")
query = "SELECT * FROM users WHERE id = ?"
result = db.execute(query, (user_id,))
# Better - Use ORM
user = await User.filter(id=user_id).first()
Use ORMs correctly to prevent injection:
from sqlalchemy import select
# Good - ORM with parameters
async def get_users_by_role(role: str):
query = select(User).where(User.role == role) # Parameterized
result = await session.execute(query)
return result.scalars().all()
# BAD - Raw SQL with concatenation
async def get_users_by_role_bad(role: str):
query = f"SELECT * FROM users WHERE role = '{role}'" # Vulnerable!
result = await session.execute(query)
return result.all()
Set CSP headers to prevent XSS:
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:;"
)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response
Always escape user-generated content:
import html
import json
# Escape for HTML
safe_html = html.escape(user_input)
# Escape for JavaScript
safe_js = json.dumps(user_input)
# Use templating engines with auto-escaping
# Jinja2 auto-escapes by default
return templates.TemplateResponse("page.html", {"content": user_input})
Redirect HTTP to HTTPS:
@app.middleware("http")
async def https_redirect(request: Request, call_next):
if request.url.scheme != "https" and not request.url.hostname == "localhost":
url = request.url.replace(scheme="https")
return RedirectResponse(url, status_code=301)
return await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
Don't use wildcard origins in production:
from fastapi.middleware.cors import CORSMiddleware
# BAD - Too permissive
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Anyone can call your API!
allow_credentials=True,
)
# Good - Specific origins
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://myapp.com",
"https://www.myapp.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
import logging
logger = logging.getLogger(__name__)
# BAD - Logs password!
logger.info(f"User {username} logging in with password {password}")
# Good - No sensitive data
logger.info(f"User {username} attempting login")
# Redact sensitive fields
def redact_sensitive(data: dict) -> dict:
sensitive_fields = {"password", "ssn", "credit_card", "token"}
return {
k: "***REDACTED***" if k in sensitive_fields else v
for k, v in data.items()
}
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash password
hashed = pwd_context.hash(plain_password)
# Verify password
is_valid = pwd_context.verify(plain_password, hashed)
# NEVER store passwords in plain text!
from cryptography.fernet import Fernet
# Generate key (store securely, not in code!)
key = Fernet.generate_key()
cipher = Fernet(key)
# Encrypt
encrypted = cipher.encrypt(sensitive_data.encode())
# Decrypt
decrypted = cipher.decrypt(encrypted).decode()
# BAD - Reveals internal details
@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
try:
user = await db.query(f"SELECT * FROM users WHERE id = {user_id}")
return user
except Exception as e:
# Leaks SQL structure and database details!
raise HTTPException(status_code=500, detail=str(e))
# Good - Generic error messages
@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
try:
user = await User.get(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
except Exception as e:
# Log detailed error internally
logger.error(f"Error fetching user {user_id}: {e}")
# Return generic message to client
raise HTTPException(status_code=500, detail="Internal server error")
Before deploying any API endpoint, verify: