| name | sql-tables |
| description | Read and write SQL tables from Python code (pipelines or notebooks). Use when needing to query database tables, write DataFrames to database, or manage table schemas. Provides best practices for SQLAlchemy, type mapping, and safe table operations. |
SQL Tables
Best practices for reading and writing SQL tables from Python code in OpenHEXA pipelines and notebooks.
Database Connection
Using Environment Variable
import os
import pandas as pd
from sqlalchemy import create_engine
db_url = os.environ.get("WORKSPACE_DATABASE_URL")
engine = create_engine(db_url)
Using OpenHEXA SDK (Pipelines)
from openhexa.sdk import workspace
db_url = workspace.database_url
engine = create_engine(db_url)
Reading Tables
Basic Query
df = pd.read_sql("SELECT * FROM my_table LIMIT 1000", engine)
df = pd.read_sql("SELECT id, name, value FROM my_table", engine)
df = pd.read_sql("""
SELECT * FROM my_table
WHERE created_at > '2024-01-01'
AND status = 'active'
""", engine)
Using Parameters (Prevent SQL Injection)
from sqlalchemy import text
query = text("SELECT * FROM my_table WHERE org_unit = :ou AND period = :pe")
df = pd.read_sql(query, engine, params={"ou": org_unit_id, "pe": period})
Check Table Exists
from sqlalchemy import inspect
def table_exists(engine, table_name: str) -> bool:
"""Check if a table exists in the database."""
inspector = inspect(engine)
return table_name in inspector.get_table_names()
if table_exists(engine, "my_table"):
df = pd.read_sql("SELECT * FROM my_table", engine)
Get Table Schema
from sqlalchemy import inspect
def get_table_columns(engine, table_name: str) -> list:
"""Get column names and types for a table."""
inspector = inspect(engine)
columns = inspector.get_columns(table_name)
return [(col["name"], str(col["type"])) for col in columns]
columns = get_table_columns(engine, "my_table")
for name, dtype in columns:
print(f" {name}: {dtype}")
Writing Tables
CRITICAL: Safety Rules
- NEVER overwrite existing tables you didn't create
- Always check if table exists first
- Explicitly specify column types - don't rely on auto-inference
- Ask user to confirm types before writing
Check Before Writing
from sqlalchemy import inspect
def safe_table_name(engine, proposed_name: str) -> str:
"""Return a safe table name, appending suffix if exists."""
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
if proposed_name not in existing_tables:
return proposed_name
suffix = 1
while f"{proposed_name}_{suffix}" in existing_tables:
suffix += 1
return f"{proposed_name}_{suffix}"
table_name = safe_table_name(engine, "analysis_results")
print(f"Will write to: {table_name}")
Type Mapping
Always explicitly define column types:
from sqlalchemy import String, Integer, Float, DateTime, Boolean, Text, Date
dtype_mapping = {
"id": Integer,
"uid": String(11),
"name": String(255),
"description": Text,
"code": String(50),
"value": Float,
"count": Integer,
"amount": Float,
"created_at": DateTime,
"updated_at": DateTime,
"date": Date,
"period": String(10),
"is_active": Boolean,
"is_deleted": Boolean,
"org_unit_id": String(11),
"data_element_id": String(11),
"indicator_id": String(11),
}
Write with Explicit Types
dtype_mapping = {
"org_unit": String(11),
"period": String(10),
"indicator": String(11),
"value": Float,
"created_at": DateTime,
}
df.to_sql(
name="my_analysis_results",
con=engine,
if_exists="replace",
index=False,
dtype=dtype_mapping
)
Confirm Types Before Writing (Best Practice)
def propose_column_types(df: pd.DataFrame) -> dict:
"""Propose SQLAlchemy types based on DataFrame dtypes."""
from sqlalchemy import String, Integer, Float, DateTime, Boolean, Text
type_map = {
"int64": Integer,
"int32": Integer,
"float64": Float,
"float32": Float,
"bool": Boolean,
"datetime64[ns]": DateTime,
"object": String(255),
}
proposed = {}
for col in df.columns:
dtype_str = str(df[col].dtype)
proposed[col] = type_map.get(dtype_str, String(255))
if dtype_str == "object":
max_len = df[col].astype(str).str.len().max()
if max_len > 255:
proposed[col] = Text
elif max_len <= 50:
proposed[col] = String(50)
elif max_len <= 100:
proposed[col] = String(100)
return proposed
proposed_types = propose_column_types(df)
print("Proposed column types:")
for col, dtype in proposed_types.items():
print(f" - {col}: {dtype}")
print("\nPlease confirm these types are correct before writing.")
Common Patterns
Upsert (Insert or Update)
from sqlalchemy.dialects.postgresql import insert
def upsert_dataframe(df: pd.DataFrame, table_name: str, engine, key_columns: list):
"""Insert or update rows based on key columns."""
from sqlalchemy import Table, MetaData
metadata = MetaData()
table = Table(table_name, metadata, autoload_with=engine)
for _, row in df.iterrows():
stmt = insert(table).values(**row.to_dict())
stmt = stmt.on_conflict_do_update(
index_elements=key_columns,
set_={c: stmt.excluded[c] for c in df.columns if c not in key_columns}
)
engine.execute(stmt)
Chunked Writing (Large DataFrames)
def write_chunked(df: pd.DataFrame, table_name: str, engine, chunk_size: int = 10000):
"""Write large DataFrame in chunks."""
total_rows = len(df)
for i in range(0, total_rows, chunk_size):
chunk = df.iloc[i:i + chunk_size]
if_exists = "replace" if i == 0 else "append"
chunk.to_sql(
name=table_name,
con=engine,
if_exists=if_exists,
index=False
)
print(f"Written {min(i + chunk_size, total_rows)}/{total_rows} rows")
Transaction Management
from sqlalchemy.orm import Session
def write_with_transaction(df: pd.DataFrame, table_name: str, engine):
"""Write with explicit transaction for rollback on error."""
with engine.begin() as conn:
df.to_sql(
name=table_name,
con=conn,
if_exists="replace",
index=False
)
Type Reference
| Python/Pandas Type | SQLAlchemy Type | PostgreSQL Type |
|---|
int64 | Integer | INTEGER |
float64 | Float | DOUBLE PRECISION |
bool | Boolean | BOOLEAN |
datetime64 | DateTime | TIMESTAMP |
object (short) | String(n) | VARCHAR(n) |
object (long) | Text | TEXT |
date | Date | DATE |
Checklist Before Writing