// Language-agnostic database best practices covering migrations, schema design, ORM patterns, query optimization, and testing strategies. Activate when working with database files, migrations, schema changes, SQL, ORM code, database tests, or when user mentions migrations, schema design, SQL optimization, NoSQL, database patterns, or connection pooling.
| name | database-workflow |
| description | Language-agnostic database best practices covering migrations, schema design, ORM patterns, query optimization, and testing strategies. Activate when working with database files, migrations, schema changes, SQL, ORM code, database tests, or when user mentions migrations, schema design, SQL optimization, NoSQL, database patterns, or connection pooling. |
Language-agnostic guidelines for database design, migrations, schema management, ORM patterns, and query optimization.
Default to simplicity. Optimize only when you have measured evidence of a problem.
Treat migrations as code:
Naming convention:
migrations/
├── 001_create_users_table.sql
├── 002_add_email_index.sql
├── 003_create_orders_table.sql
└── 004_add_user_fk_to_orders.sql
Every migration must be reversible:
-- UP: Create table
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
email VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- DOWN: Drop table
DROP TABLE users;
Complex reversals require care:
-- UP: Add constraint
ALTER TABLE orders ADD CONSTRAINT fk_user
FOREIGN KEY (user_id) REFERENCES users(id);
-- DOWN: Remove constraint (PostgreSQL)
ALTER TABLE orders DROP CONSTRAINT fk_user;
Migrations must be safely re-runnable:
-- ✅ Good: Idempotent (safe to run multiple times)
CREATE TABLE IF NOT EXISTS users (...);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
-- ❌ Bad: Fails if run twice
CREATE TABLE users (...);
CREATE INDEX idx_users_email ON users(email);
Two approaches:
Down migrations (reversible):
Snapshot migrations (not reversible):
Best practice: Design migrations to be reversible when possible, but plan for forward-only rollbacks in production.
Use descriptive, action-oriented names:
✅ Good:
- 001_create_users_table
- 002_add_email_unique_constraint
- 003_create_index_users_email
- 004_rename_column_user_id_to_author_id
- 005_add_soft_delete_columns
❌ Bad:
- 001_update
- 002_fix
- 003_schema_change
- 004_v2
Include timestamp + sequence:
2024_11_17_001_create_users_table.sql
2024_11_17_002_add_email_index.sql
First Normal Form (1NF):
-- ❌ NOT 1NF: phone_numbers is a repeating group
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255),
phone_numbers VARCHAR(255) -- "555-1234, 555-5678"
);
-- ✅ 1NF: Separate table for phone numbers
CREATE TABLE user_phones (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT REFERENCES users(id),
phone_number VARCHAR(20)
);
Second Normal Form (2NF):
-- ❌ NOT 2NF: course_name depends only on course_id, not on (student_id, course_id)
CREATE TABLE enrollments (
student_id BIGINT,
course_id BIGINT,
course_name VARCHAR(255), -- Should be in courses table
grade CHAR(1),
PRIMARY KEY (student_id, course_id)
);
-- ✅ 2NF: Move course_name to separate table
CREATE TABLE courses (
id BIGINT PRIMARY KEY,
name VARCHAR(255)
);
CREATE TABLE enrollments (
student_id BIGINT,
course_id BIGINT REFERENCES courses(id),
grade CHAR(1),
PRIMARY KEY (student_id, course_id)
);
Third Normal Form (3NF):
-- ❌ NOT 3NF: city and state depend on zip_code, not on user_id
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR(255),
zip_code VARCHAR(5),
city VARCHAR(100),
state CHAR(2)
);
-- ✅ 3NF: Move location info to separate table
CREATE TABLE zip_codes (
code VARCHAR(5) PRIMARY KEY,
city VARCHAR(100),
state CHAR(2)
);
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR(255),
zip_code VARCHAR(5) REFERENCES zip_codes(code)
);
Denormalize when:
Common denormalization patterns:
-- Track order count on user without JOIN
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR(255),
order_count INT DEFAULT 0
);
-- Keep in sync with trigger or application code
-- Store user email on order to avoid JOIN at read time
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id BIGINT REFERENCES users(id),
user_email VARCHAR(255), -- Denormalized for reporting
total_amount DECIMAL(10,2)
);
-- Materialized view for dashboards
CREATE MATERIALIZED VIEW monthly_sales AS
SELECT
DATE_TRUNC('month', created_at) as month,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue
FROM orders
GROUP BY DATE_TRUNC('month', created_at);
-- Refresh periodically, not on every insert
REFRESH MATERIALIZED VIEW monthly_sales;
Index only when needed. Measure first.
-- ✅ Create index for frequently searched columns
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_created_at ON orders(created_at DESC);
-- ❌ Avoid: Index on every column
-- ❌ Avoid: Index on low-cardinality columns (boolean flags)
-- ❌ Avoid: Duplicate indexes
Index types:
CREATE INDEX idx_users_email ON users(email);
-- Good for: WHERE user_id = X AND created_at > Y
CREATE INDEX idx_orders_user_created ON orders(user_id, created_at DESC);
-- Index only active users (avoid indexing soft-deleted rows)
CREATE INDEX idx_active_users_email ON users(email)
WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX idx_users_email ON users(email);
Index maintenance:
Use surrogate keys (auto-incrementing ID) by default:
-- ✅ Recommended: Surrogate key
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255)
);
-- ✅ Good for high-volume tables
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
event_type VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Natural keys only if:
-- ✅ Natural key (country code is stable, never changes)
CREATE TABLE countries (
code CHAR(2) PRIMARY KEY,
name VARCHAR(255)
);
-- ❌ Natural key (email changes, should use surrogate)
CREATE TABLE users (
email VARCHAR(255) PRIMARY KEY
);
Always define foreign key relationships:
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL
);
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id),
total_amount DECIMAL(10,2) NOT NULL
);
-- With explicit constraint name for cleaner error messages
CREATE TABLE order_items (
id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL CHECK (quantity > 0),
CONSTRAINT fk_order FOREIGN KEY (order_id) REFERENCES orders(id),
CONSTRAINT fk_product FOREIGN KEY (product_id) REFERENCES products(id)
);
Cascade behaviors:
-- DELETE CASCADE: Delete orders when user is deleted (use with caution)
CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
-- RESTRICT: Prevent user deletion if orders exist (safer default)
CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE RESTRICT
-- SET NULL: Set user_id to NULL if user deleted (for optional relationships)
CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL
Use constraints to enforce data integrity at database level:
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
age INT CHECK (age >= 18),
role VARCHAR(50) DEFAULT 'user' CHECK (role IN ('user', 'admin', 'moderator')),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id),
status VARCHAR(50) NOT NULL DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL CHECK (total_amount > 0),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- Composite constraint
CONSTRAINT valid_status CHECK (status IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')),
-- Unique constraint on composite columns
CONSTRAINT unique_user_order_date UNIQUE (user_id, DATE(created_at))
);
Active Record:
# Active Record pattern
class User(Model):
name = CharField()
email = CharField()
def save(self):
# Object knows how to save itself
db.insert('users', {...})
@staticmethod
def find_by_email(email):
return db.query('SELECT * FROM users WHERE email = ?', email)
# Usage
user = User(name='John', email='john@example.com')
user.save()
found_user = User.find_by_email('john@example.com')
Data Mapper:
# Data Mapper pattern
class User:
def __init__(self, name, email):
self.name = name
self.email = email
class UserRepository:
def save(self, user):
# Repository handles persistence
db.insert('users', {'name': user.name, 'email': user.email})
def find_by_email(self, email):
row = db.query('SELECT * FROM users WHERE email = ?', email)
return User(row['name'], row['email']) if row else None
# Usage
user = User('John', 'john@example.com')
repo = UserRepository()
repo.save(user)
found_user = repo.find_by_email('john@example.com')
Choose based on project scale:
Use query builders to avoid string concatenation and SQL injection:
# ❌ Vulnerable to SQL injection
query = f"SELECT * FROM users WHERE email = '{email}'"
result = db.execute(query)
# ✅ Safe: Parameterized query
result = db.query('SELECT * FROM users WHERE email = ?', [email])
# ✅ Better: Query builder
result = (
db.select(User)
.where(User.email == email)
.where(User.active == True)
.order_by(User.created_at.desc())
.limit(10)
.execute()
)
Benefits of query builders:
Lazy Loading (default, but can cause N+1):
# Each user fetch triggers a separate query for posts
users = db.query(User).limit(10).all()
for user in users:
print(user.posts) # Additional query per user = 10+ queries!
Eager Loading (prevent N+1):
# Single query with JOIN
users = db.query(User).join(Post).limit(10).all()
for user in users:
print(user.posts) # No additional queries
# Or use explicit eager loading
users = db.query(User).options(joinedload(User.posts)).limit(10).all()
Recognize and fix N+1 problems:
# ❌ N+1 Problem: 1 query for users + N queries for posts
users = db.query(User).all() # 1 query
for user in users:
posts = db.query(Post).filter(Post.user_id == user.id).all() # N more queries
# ✅ Fix 1: Eager loading with JOIN
users = db.query(User).join(Post).distinct().all()
# ✅ Fix 2: Use IN clause for batch loading
user_ids = [u.id for u in users]
posts = db.query(Post).filter(Post.user_id.in_(user_ids)).all()
# Now merge posts back to users in memory
# ✅ Fix 3: Use ORM eager loading
users = db.query(User).options(selectinload(User.posts)).all()
Write queries that use indexes:
-- ✅ Uses index on email
SELECT * FROM users WHERE email = 'john@example.com';
-- ✅ Uses index on user_id, created_at
SELECT * FROM orders
WHERE user_id = 123 AND created_at > '2024-01-01'
ORDER BY created_at DESC;
-- ❌ Can't use index (function on column)
SELECT * FROM users WHERE LOWER(email) = 'john@example.com';
-- Fix: CREATE INDEX idx_users_email_lower ON users(LOWER(email));
-- ❌ Can't use index (leading wildcard)
SELECT * FROM users WHERE email LIKE '%@example.com';
-- Fix: Use full-text search or store domain separately
-- ✅ Can use index (trailing wildcard)
SELECT * FROM users WHERE email LIKE 'john%';
Always analyze slow queries before optimizing:
-- PostgreSQL
EXPLAIN ANALYZE
SELECT * FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.email = 'john@example.com'
ORDER BY o.created_at DESC
LIMIT 10;
-- MySQL
EXPLAIN
SELECT * FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.email = 'john@example.com'
ORDER BY o.created_at DESC
LIMIT 10;
Look for:
Fetch only columns you need:
-- ❌ Fetches all columns (slower, more bandwidth)
SELECT * FROM orders WHERE user_id = 123;
-- ✅ Fetch only needed columns
SELECT id, user_id, total_amount, created_at
FROM orders
WHERE user_id = 123;
-- ✅ Reduces memory/bandwidth especially for large text columns
SELECT id, user_id, total_amount
FROM orders
WHERE user_id = 123;
Use connection pooling in production:
# ❌ Anti-pattern: New connection per query
def get_user(user_id):
conn = connect() # New connection!
user = conn.query('SELECT * FROM users WHERE id = ?', user_id)
conn.close()
return user
# ✅ Use connection pool
pool = ConnectionPool(
host='localhost',
database='myapp',
min_size=5,
max_size=20,
timeout=30
)
def get_user(user_id):
with pool.get_connection() as conn: # Reuses from pool
return conn.query('SELECT * FROM users WHERE id = ?', user_id)
Pool configuration (tune for your workload):
min_size: Minimum idle connections (default 5-10)max_size: Maximum concurrent connections (default 20-50)timeout: Connection acquisition timeoutidle_timeout: Close idle connections after N secondsUse SQL when:
Example scenarios:
Use NoSQL when:
Example scenarios:
Document database (MongoDB, Firebase):
✅ Good: User profiles with flexible attributes
✅ Good: Product catalog with variable specs
❌ Bad: Complex multi-entity queries and JOINs
Document structure:
{
_id: 1,
name: "John",
email: "john@example.com",
preferences: {
language: "en",
theme: "dark",
notifications: true
}
}
Key-value store (Redis, Memcached):
✅ Good: Caching, sessions, rate limiting
✅ Good: Real-time leaderboards
❌ Bad: Complex queries, relationships
Structure:
user:123 → { name, email, created_at }
session:abc123 → { user_id, expires_at }
Graph database (Neo4j):
✅ Good: Social networks, recommendations
✅ Good: Complex relationship queries
❌ Bad: Simple CRUD operations
Relationships:
User -[:FOLLOWS]-> User
User -[:COMMENTED_ON]-> Post
Use separate test database:
# config.py
if os.getenv('ENV') == 'test':
DATABASE_URL = 'postgresql://test_user:test_pass@localhost/test_db'
else:
DATABASE_URL = os.getenv('DATABASE_URL')
# conftest.py (pytest)
@pytest.fixture(autouse=True)
def setup_test_db():
"""Create test database and tables before each test."""
# Create tables
db.create_all()
yield
# Cleanup
db.drop_all()
Use fixtures for test data:
# conftest.py
import pytest
from app.models import User, Order
@pytest.fixture
def sample_user(db):
"""Create a test user."""
user = User(name='John Doe', email='john@example.com')
db.add(user)
db.commit()
return user
@pytest.fixture
def sample_orders(db, sample_user):
"""Create orders for test user."""
orders = [
Order(user_id=sample_user.id, total_amount=100.00),
Order(user_id=sample_user.id, total_amount=200.00),
]
db.add_all(orders)
db.commit()
return orders
# test_orders.py
def test_get_user_orders(sample_user, sample_orders):
"""Test fetching user orders."""
orders = Order.query.filter_by(user_id=sample_user.id).all()
assert len(orders) == 2
assert sum(o.total_amount for o in orders) == 300.00
Rollback transactions to isolate tests:
# conftest.py - Automatic rollback after each test
@pytest.fixture(autouse=True)
def db_transaction(db):
"""Wrap each test in a transaction that rolls back."""
transaction = db.begin_nested()
yield
transaction.rollback() # Undo all changes from this test
# Or use explicit rollback
def test_create_user():
db.begin()
user = User(name='John', email='john@example.com')
db.add(user)
db.commit()
assert user.id is not None
db.rollback()
# Changes are undone, database is clean for next test
Mark records as deleted instead of removing:
-- Add deleted_at column
ALTER TABLE users ADD COLUMN deleted_at TIMESTAMP NULL;
-- Soft delete (update)
UPDATE users SET deleted_at = CURRENT_TIMESTAMP WHERE id = 123;
-- Query only active records
SELECT * FROM users WHERE deleted_at IS NULL;
-- Create index on deleted_at for efficient queries
CREATE INDEX idx_users_active ON users(deleted_at) WHERE deleted_at IS NULL;
Pros: Recoverable, audit trail, can restore data Cons: Need to remember to filter deleted records everywhere
Track all data changes:
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(100),
entity_id BIGINT,
action VARCHAR(20), -- CREATE, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by BIGINT REFERENCES users(id),
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- PostgreSQL trigger to auto-log changes
CREATE FUNCTION audit_trigger() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (entity_type, entity_id, action, new_values, changed_at)
VALUES ('user', NEW.id, TG_OP, row_to_json(NEW), NOW());
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_audit AFTER UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger();
Track record creation and modification:
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Auto-update updated_at on changes (PostgreSQL)
CREATE FUNCTION update_timestamp() RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_update_timestamp BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_timestamp();
Store tree structures in relational database:
-- Option 1: Adjacency List (simple, slow to query)
CREATE TABLE categories (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255),
parent_id BIGINT REFERENCES categories(id)
);
-- Query children
SELECT * FROM categories WHERE parent_id = 5;
-- Query ancestors (recursive, expensive)
WITH RECURSIVE ancestors AS (
SELECT id, name, parent_id FROM categories WHERE id = 5
UNION ALL
SELECT c.id, c.name, c.parent_id
FROM categories c
JOIN ancestors a ON c.id = a.parent_id
)
SELECT * FROM ancestors;
-- Option 2: Closure Table (trade space for query speed)
CREATE TABLE categories (id BIGSERIAL PRIMARY KEY, name VARCHAR(255));
CREATE TABLE category_closure (
ancestor_id BIGINT REFERENCES categories(id),
descendant_id BIGINT REFERENCES categories(id),
depth INT,
PRIMARY KEY (ancestor_id, descendant_id)
);
-- Query all descendants
SELECT c.* FROM categories c
JOIN category_closure cc ON c.id = cc.descendant_id
WHERE cc.ancestor_id = 5 AND cc.depth > 0;
Implement efficient pagination:
-- ❌ OFFSET is slow for large offsets
SELECT * FROM orders LIMIT 10 OFFSET 100000;
-- ✅ Better: Keyset pagination (cursor-based)
SELECT * FROM orders
WHERE id > 12345 -- Last ID from previous page
ORDER BY id
LIMIT 10;
-- ✅ With composite key
SELECT * FROM orders
WHERE (user_id, created_at) > (123, '2024-11-17')
ORDER BY user_id, created_at
LIMIT 10;
Use transactions for data consistency:
# ❌ No transaction - inconsistent state if error occurs
user = db.query(User).get(123)
user.balance -= 50
db.commit()
account = db.query(Account).get(456)
account.balance += 50
db.commit() # If this fails, money disappears!
# ✅ Transaction - all-or-nothing
try:
with db.transaction():
user = db.query(User).get(123)
user.balance -= 50
account = db.query(Account).get(456)
account.balance += 50
db.flush()
except Exception:
# Everything rolls back automatically
raise
Problem: One query per item instead of batching Fix: Use eager loading or batch queries
Problem: Slow queries on large tables Fix: Analyze slow queries with EXPLAIN, add indexes on WHERE/JOIN columns
Problem: Inconsistent data if error occurs mid-operation Fix: Wrap multi-step operations in transactions
Problem: Too many JOINs make queries slow and complex Fix: Denormalize strategically where proven necessary
Problem: SELECT * without LIMIT causes memory exhaustion Fix: Always LIMIT and paginate large result sets
Problem: Accidental data loss or orphaned records Fix: Choose CASCADE, RESTRICT, or SET NULL deliberately
Problem: Security breach if database compromised Fix: Store bcrypt hashes, never plain passwords
Problem: VARCHAR(255) used for numbers, can't query properly Fix: Use correct data types (INTEGER, BIGINT, DECIMAL, not VARCHAR)
Problem: Migration fails in production, downtime Fix: Test migrations on production-like data before deploying
Problem: Old application code breaks with new schema Fix: Support both old and new columns temporarily, add deprecation period
Problem: Slow query locks database, cascading failures Fix: Set statement timeouts and query timeouts in production
See project-specific database configuration in .claude/CLAUDE.md if present.