// Universal database operations skill for modern applications. Expert in SQLModel/SQLAlchemy patterns, async database operations, connection pooling, migrations, performance optimization, and multi-database support (PostgreSQL, MySQL, SQLite). Provides production-ready patterns for any database-driven application.
| name | database |
| description | Universal database operations skill for modern applications. Expert in SQLModel/SQLAlchemy patterns, async database operations, connection pooling, migrations, performance optimization, and multi-database support (PostgreSQL, MySQL, SQLite). Provides production-ready patterns for any database-driven application. |
| license | MIT |
This skill provides comprehensive database patterns and best practices for 2025, focusing on async operations, performance optimization, and production-ready configurations that work across different database systems.
Use this skill when you need to:
# Database connection setup with connection pooling
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.pool import NullPool
from contextlib import asynccontextmanager
import os
# Environment-based configuration
DATABASE_CONFIG = {
"development": {
"url": "sqlite+aiosqlite:///./app.db",
"poolclass": NullPool,
"echo": True
},
"production": {
"url": os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/db"),
"pool_size": 30,
"max_overflow": 40,
"pool_pre_ping": True,
"pool_recycle": 3600,
"echo": False
}
}
class DatabaseManager:
"""Universal database manager for async operations"""
def __init__(self, env: str = "development"):
config = DATABASE_CONFIG[env]
self.engine = create_async_engine(**config)
self.async_session = async_sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_session(self):
"""Get async database session with proper cleanup"""
async with self.async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def create_tables(self, metadata):
"""Create all tables from metadata"""
async with self.engine.begin() as conn:
await conn.run_sync(metadata.create_all)
async def close(self):
"""Close database connections"""
await self.engine.dispose()
# Singleton instance
db_manager = DatabaseManager(os.getenv("ENV", "development"))
# models/base.py
from sqlmodel import SQLModel, Field, DateTime, func
from typing import Optional
from datetime import datetime
from sqlalchemy import Column, DateTime as SQLDateTime, Index
class TimestampMixin(SQLModel):
"""Mixin for timestamp fields"""
created_at: datetime = Field(
default_factory=datetime.utcnow,
sa_column=Column(
SQLDateTime(timezone=True),
server_default=func.now(),
nullable=False
)
)
updated_at: Optional[datetime] = Field(
default=None,
sa_column=Column(
SQLDateTime(timezone=True),
onupdate=func.now(),
nullable=True
)
)
class SoftDeleteMixin(SQLModel):
"""Mixin for soft delete functionality"""
deleted_at: Optional[datetime] = None
is_deleted: bool = Field(default=False)
class BaseModel(SQLModel, TimestampMixin):
"""Base model with common fields and patterns"""
id: Optional[int] = Field(default=None, primary_key=True)
class Config:
# Enable Pydantic's strict mode
strict = True
# Validate defaults
validate_assignment = True
# Use enum values
use_enum_values = True
# Indexes for common queries
Index("idx_base_created_at", BaseModel.created_at)
# models/examples.py
from enum import Enum
from typing import Optional, List
from sqlalchemy import Column, String, Text, Boolean, Integer, Index, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship
from sqlmodel import SQLModel, Field, Session, select, update, delete
# Enums for type safety
class Status(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
PENDING = "pending"
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class User(BaseModel, table=True):
"""User model with optimized fields and indexes"""
__tablename__ = "users"
email: str = Field(
sa_column=Column(String(255), unique=True, nullable=False, index=True)
)
username: str = Field(
sa_column=Column(String(100), unique=True, nullable=False, index=True)
)
full_name: Optional[str] = Field(default=None, max_length=200)
is_active: bool = Field(default=True, sa_column=Column(Boolean, default=True))
# Optimized text fields
bio: Optional[str] = Field(
default=None,
sa_column=Column(Text) # Use Text for longer content
)
# Relationships
tasks: List["Task"] = Relationship(back_populates="owner")
# Optimized queries
__table_args__ = (
Index("idx_user_active_email", "is_active", "email"),
Index("idx_user_created_at", "created_at"),
)
class Task(BaseModel, SoftDeleteMixin, table=True):
"""Task model with advanced patterns"""
__tablename__ = "tasks"
# Foreign key with index
owner_id: int = Field(
foreign_key="users.id",
sa_column=Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
)
# Optimized fields
title: str = Field(
sa_column=Column(String(200), nullable=False)
)
description: Optional[str] = Field(
default=None,
sa_column=Column(Text)
)
# Enum fields
status: Status = Field(default=Status.PENDING)
priority: Priority = Field(default=Priority.MEDIUM)
# JSON field for flexible metadata
metadata: dict = Field(
default_factory=dict,
sa_column=Column("metadata", JSON)
)
# Relationships
owner: User = Relationship(back_populates="tasks")
tags: List["TaskTag"] = Relationship(back_populates="task")
# Optimized queries
__table_args__ = (
Index("idx_task_owner_status", "owner_id", "status"),
Index("idx_task_priority_created", "priority", "created_at"),
Index("idx_task_deleted", "is_deleted"),
)
class Tag(BaseModel, table=True):
"""Tag model for many-to-many relationships"""
__tablename__ = "tags"
name: str = Field(
sa_column=Column(String(50), unique=True, nullable=False, index=True)
)
color: Optional[str] = Field(default=None, max_length=7) # Hex color code
class TaskTag(BaseModel, table=True):
"""Many-to-many relationship table"""
__tablename__ = "task_tags"
task_id: int = Field(foreign_key="tasks.id", primary_key=True)
tag_id: int = Field(foreign_key="tags.id", primary_key=True)
task: Task = Relationship()
tag: Tag = Relationship()
# repositories/base.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, List, Optional, Dict, Any
from sqlmodel import SQLModel, Session, select, update, delete, func
ModelType = TypeVar("ModelType", bound=SQLModel)
class BaseRepository(Generic[ModelType], ABC):
"""Base repository with common CRUD operations"""
def __init__(self, model: type[ModelType]):
self.model = model
async def create(self, db: Session, *, obj_in: Dict[str, Any]) -> ModelType:
"""Create a new record"""
db_obj = self.model(**obj_in)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
async def get(self, db: Session, id: Any) -> Optional[ModelType]:
"""Get a record by ID"""
statement = select(self.model).where(self.model.id == id)
return db.exec(statement).first()
async def get_multi(
self,
db: Session,
*,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[ModelType]:
"""Get multiple records with pagination"""
statement = select(self.model)
# Apply filters
if filters:
for key, value in filters.items():
if hasattr(self.model, key):
statement = statement.where(getattr(self.model, key) == value)
statement = statement.offset(skip).limit(limit)
return db.exec(statement).all()
async def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Dict[str, Any]
) -> ModelType:
"""Update a record"""
for field, value in obj_in.items():
if hasattr(db_obj, field):
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
async def remove(self, db: Session, *, id: int) -> ModelType:
"""Remove a record"""
obj = await self.get(db, id=id)
if obj:
db.delete(obj)
db.commit()
return obj
async def count(self, db: Session, filters: Optional[Dict[str, Any]] = None) -> int:
"""Count records with optional filters"""
statement = select(func.count(self.model.id))
if filters:
for key, value in filters.items():
if hasattr(self.model, key):
statement = statement.where(getattr(self.model, key) == value)
return db.exec(statement).scalar()
# repositories/user.py
class UserRepository(BaseRepository[User]):
"""User repository with specific queries"""
async def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
"""Get user by email"""
statement = select(User).where(User.email == email)
return db.exec(statement).first()
async def get_active_users(
self,
db: Session,
skip: int = 0,
limit: int = 100
) -> List[User]:
"""Get active users"""
statement = select(User).where(User.is_active == True)
statement = statement.offset(skip).limit(limit)
return db.exec(statement).all()
# services/query_builder.py
from sqlalchemy import and_, or_, func, asc, desc, extract
from typing import List, Optional, Dict, Any, Union
from datetime import datetime, date
from models.base import db_manager
class QueryBuilder:
"""Advanced query builder for complex database operations"""
def __init__(self, model):
self.model = model
self._filters = []
self._order_by = []
self._joins = []
self._group_by = []
def filter(self, *conditions) -> "QueryBuilder":
"""Add filter conditions"""
self._filters.extend(conditions)
return self
def where(self, condition) -> "QueryBuilder":
"""Add where condition"""
self._filters.append(condition)
return self
def order_by(self, *columns) -> "QueryBuilder":
"""Add order by clauses"""
self._order_by.extend(columns)
return self
def join(self, model, condition=None) -> "QueryBuilder":
"""Add join"""
self._joins.append((model, condition))
return self
def group_by(self, *columns) -> "QueryBuilder":
"""Add group by clauses"""
self._group_by.extend(columns)
return self
async def execute(self, db: Session):
"""Execute the built query"""
statement = select(self.model)
# Apply filters
if self._filters:
statement = statement.where(and_(*self._filters))
# Apply joins
for model, condition in self._joins:
if condition:
statement = statement.join(model, condition)
else:
statement = statement.join(model)
# Apply group by
if self._group_by:
statement = statement.group_by(*self._group_by)
# Apply order by
if self._order_by:
statement = statement.order_by(*self._order_by)
return db.exec(statement).all()
# Usage examples
async def search_tasks_advanced(
owner_id: int,
keywords: Optional[str] = None,
status: Optional[Status] = None,
priority: Optional[Priority] = None,
date_range: Optional[tuple] = None
) -> List[Task]:
"""Advanced task search with multiple filters"""
async with db_manager.get_session() as db:
query = QueryBuilder(Task)
# Base filters
query = query.filter(Task.owner_id == owner_id, Task.is_deleted == False)
# Text search
if keywords:
search_pattern = f"%{keywords}%"
query = query.where(
or_(
Task.title.ilike(search_pattern),
Task.description.ilike(search_pattern)
)
)
# Status filter
if status:
query = query.filter(Task.status == status)
# Priority filter
if priority:
query = query.filter(Task.priority == priority)
# Date range filter
if date_range:
start_date, end_date = date_range
query = query.filter(
and_(
Task.created_at >= start_date,
Task.created_at <= end_date
)
)
# Order by priority and creation date
query = query.order_by(
desc(Task.priority),
desc(Task.created_at)
)
return await query.execute(db)
# migrations/versions/001_initial_tables.py
"""Initial tables migration
Revision ID: 001_initial
Revises:
Create Date: 2025-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers
revision = '001_initial'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
"""Create initial tables with optimized schema"""
# Users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('username', sa.String(length=100), nullable=False),
sa.Column('full_name', sa.String(length=200), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('bio', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
# Create indexes
op.create_index('idx_user_email', 'users', ['email'])
op.create_index('idx_user_username', 'users', ['username'])
op.create_index('idx_user_active_email', 'users', ['is_active', 'email'])
# Tasks table
op.create_table(
'tasks',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('owner_id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=200), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('status', sa.Enum('ACTIVE', 'INACTIVE', 'PENDING', name='status'), nullable=True),
sa.Column('priority', sa.Enum('LOW', 'MEDIUM', 'HIGH', 'CRITICAL', name='priority'), nullable=True),
sa.Column('metadata', sa.JSON(), nullable=True),
sa.Column('is_deleted', sa.Boolean(), nullable=True, default=False),
sa.Column('deleted_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# Create indexes
op.create_index('idx_task_owner', 'tasks', ['owner_id'])
op.create_index('idx_task_owner_status', 'tasks', ['owner_id', 'status'])
op.create_index('idx_task_priority_created', 'tasks', ['priority', 'created_at'])
op.create_index('idx_task_deleted', 'tasks', ['is_deleted'])
# Tags table
op.create_table(
'tags',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('color', sa.String(length=7), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
)
op.create_index('idx_tag_name', 'tags', ['name'])
# Task-Tag relationship table
op.create_table(
'task_tags',
sa.Column('task_id', sa.Integer(), nullable=False),
sa.Column('tag_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['task_id'], ['tasks.id'], ),
sa.ForeignKeyConstraint(['tag_id'], ['tags.id'], ),
sa.PrimaryKeyConstraint('task_id', 'tag_id')
)
def downgrade():
"""Drop all tables"""
op.drop_table('task_tags')
op.drop_table('tags')
op.drop_table('tasks')
op.drop_table('users')
# utils/performance.py
import time
from functools import wraps
from typing import Callable, Any
from sqlmodel import Session
def measure_query_time(func: Callable) -> Callable:
"""Decorator to measure query execution time"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
# Log slow queries
if execution_time > 1.0: # 1 second threshold
print(f"Slow query detected: {func.__name__} took {execution_time:.2f}s")
return result
return wrapper
class DatabaseOptimizer:
"""Database performance optimization utilities"""
@staticmethod
async def analyze_slow_queries(db: Session, threshold: float = 1.0) -> List[Dict]:
"""Analyze slow queries (PostgreSQL specific)"""
if db.bind.dialect.name != 'postgresql':
return []
query = """
SELECT query, calls, total_time, mean_time, rows
FROM pg_stat_statements
WHERE mean_time > %s
ORDER BY total_time DESC
LIMIT 10
"""
result = db.execute(query, threshold)
return [dict(row) for row in result]
@staticmethod
async def get_table_statistics(db: Session) -> Dict[str, Any]:
"""Get table statistics"""
stats = {}
if db.bind.dialect.name == 'postgresql':
# PostgreSQL statistics
tables_query = """
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup
FROM pg_stat_user_tables
"""
for row in db.execute(tables_query):
table_name = row.tablename
stats[table_name] = {
'inserts': row.n_tup_ins,
'updates': row.n_tup_upd,
'deletes': row.n_tup_del,
'live_rows': row.n_live_tup,
'dead_rows': row.n_dead_tup
}
return stats
# tests/conftest.py
import pytest
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from httpx import AsyncClient
from sqlmodel import SQLModel
from models.base import BaseModel
from repositories.base import BaseRepository
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
async def test_db_session():
"""Create test database session"""
# In-memory SQLite for testing
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(BaseModel.metadata.create_all)
# Create session
async_session = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
@pytest.fixture
def user_repository():
"""Create user repository instance"""
return UserRepository(User)
# tests/test_repositories.py
@pytest.mark.asyncio
async def test_create_user(test_db_session: AsyncSession, user_repository):
"""Test user creation"""
user_data = {
"email": "test@example.com",
"username": "testuser",
"full_name": "Test User"
}
user = await user_repository.create(test_db_session, obj_in=user_data)
assert user.email == user_data["email"]
assert user.username == user_data["username"]
assert user.is_active == True
# config/production_checklist.yaml
database:
production_ready:
connection_pooling: true
ssl_enabled: true
max_connections: 100
min_connections: 10
connection_timeout: 30
idle_timeout: 300
monitoring:
slow_query_log: true
query_threshold: 1.0
connection_pool_metrics: true
backup:
automated_backups: true
backup_retention: 30_days
point_in_time_recovery: true
security:
encryption_at_rest: true
encryption_in_transit: true
least_privilege_access: true
regular_rotation: true
This comprehensive database skill provides production-ready patterns optimized for 2025, including async operations, performance optimization, proper indexing, and comprehensive testing strategies that work across different database systems.