一键导入
hamilton-core
// Core Hamilton patterns for creating DAGs, applying decorators, testing, and debugging dataflows. Use for basic Hamilton development tasks.
// Core Hamilton patterns for creating DAGs, applying decorators, testing, and debugging dataflows. Use for basic Hamilton development tasks.
Hamilton UI and SDK patterns for tracking, monitoring, and debugging dataflows. Use for observability, lineage tracking, and production monitoring.
Systematic 5-step workflow for building Hamilton DAGs - DOT graphs, signatures, validation, TDD implementation. Use this workflow when creating new Hamilton modules from scratch.
Hamilton integration patterns for Airflow, Dagster, FastAPI, Streamlit, Jupyter notebooks, and other frameworks. Use when integrating Hamilton with other tools.
LLM and AI workflow patterns for Hamilton including RAG pipelines, embeddings, vector databases, and prompt engineering. Use for building AI applications with Hamilton.
Interactive Hamilton DAG development via MCP tools. Validate, visualize, scaffold, and execute Hamilton pipelines without leaving the conversation. Use when building or debugging Hamilton dataflows interactively.
Performance and parallelization patterns for Hamilton including async I/O, Spark, Ray, Dask, caching, and multithreading. Use for scaling Hamilton workflows.
| name | hamilton-core |
| description | Core Hamilton patterns for creating DAGs, applying decorators, testing, and debugging dataflows. Use for basic Hamilton development tasks. |
| allowed-tools | Read, Grep, Glob, Bash(python:*), Bash(pytest:*) |
| user-invocable | true |
| disable-model-invocation | false |
Apache Hamilton is a lightweight Python framework for building Directed Acyclic Graphs (DAGs) of data transformations using declarative, function-based definitions.
Function-Based DAG Definition
Key Architecture Components
.execute() runs the DAG)Separation of Concerns
Basic Module Structure:
"""
Module docstring explaining the DAG's purpose.
"""
import pandas as pd
from hamilton.function_modifiers import extract_columns
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data from source.
:param data_path: Path to data file (passed as input)
:return: Raw DataFrame
"""
return pd.read_csv(data_path)
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove null values and duplicates.
:param raw_data: Raw data from previous node
:return: Cleaned DataFrame
"""
return raw_data.dropna().drop_duplicates()
def feature_a(cleaned_data: pd.DataFrame) -> pd.Series:
"""Calculate feature A.
:param cleaned_data: Cleaned data
:return: Feature A values
"""
return cleaned_data['column_a'] * 2
Driver Setup:
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
results = dr.execute(
['feature_a', 'cleaned_data'],
inputs={'data_path': 'data.csv'}
)
Best Practices:
Configuration & Polymorphism:
from hamilton.function_modifiers import config
@config.when(model_type='linear')
def predictions(features: pd.DataFrame) -> pd.Series:
"""Linear model predictions."""
from sklearn.linear_model import LinearRegression
model = LinearRegression()
return model.fit_predict(features)
@config.when(model_type='tree')
def predictions(features: pd.DataFrame) -> pd.Series:
"""Tree model predictions."""
from sklearn.tree import DecisionTreeRegressor
model = DecisionTreeRegressor()
return model.fit_predict(features)
# Use: driver.Driver({'model_type': 'linear'}, module)
Parameterization - Creating Multiple Nodes:
from hamilton.function_modifiers import parameterize
@parameterize(
rolling_7d={'window': 7},
rolling_30d={'window': 30},
rolling_90d={'window': 90},
)
def rolling_average(spend: pd.Series, window: int) -> pd.Series:
"""Calculate rolling average for different windows."""
return spend.rolling(window).mean()
# Creates 3 nodes: rolling_7d, rolling_30d, rolling_90d
Column Extraction - DataFrames to Series:
from hamilton.function_modifiers import extract_columns
@extract_columns('feature_1', 'feature_2', 'feature_3')
def features(cleaned_data: pd.DataFrame) -> pd.DataFrame:
"""Generate multiple features."""
return pd.DataFrame({
'feature_1': cleaned_data['a'] * 2,
'feature_2': cleaned_data['b'] ** 2,
'feature_3': cleaned_data['a'] + cleaned_data['b'],
})
# Creates 3 nodes: feature_1, feature_2, feature_3 (each a Series)
Data Quality Validation:
from hamilton.function_modifiers import check_output
import pandera as pa
@check_output(
data_type=float,
range=(0, 100),
importance="fail"
)
def revenue_percentage(revenue: float, total: float) -> float:
"""Calculate revenue as percentage."""
return (revenue / total) * 100
# With Pandera schemas
@check_output(
schema=pa.SeriesSchema(float, pa.Check.greater_than(0)),
importance="fail"
)
def positive_values(data: pd.Series) -> pd.Series:
"""Ensure all values are positive."""
return data
I/O Materialization:
from hamilton.function_modifiers import save_to, load_from
from hamilton.io.materialization import to
@save_to(to.csv(path="output.csv"))
def final_results(aggregated_data: pd.DataFrame) -> pd.DataFrame:
"""Save final results to CSV."""
return aggregated_data
@load_from(from_='data.parquet', reader='parquet')
def input_data() -> pd.DataFrame:
"""Load data from parquet."""
pass # Function body ignored when using @load_from
Before (Script):
import pandas as pd
df = pd.read_csv('data.csv')
df = df.dropna()
df['feature'] = df['col_a'] * 2
result = df.groupby('category')['feature'].mean()
print(result)
After (Hamilton Module):
"""Data processing DAG."""
import pandas as pd
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data."""
return pd.read_csv(data_path)
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove nulls."""
return raw_data.dropna()
def feature(cleaned_data: pd.DataFrame) -> pd.Series:
"""Calculate feature."""
return cleaned_data['col_a'] * 2
def data_with_feature(cleaned_data: pd.DataFrame, feature: pd.Series) -> pd.DataFrame:
"""Add feature to dataset."""
df = cleaned_data.copy()
df['feature'] = feature
return df
def result(data_with_feature: pd.DataFrame) -> pd.Series:
"""Aggregate by category."""
return data_with_feature.groupby('category')['feature'].mean()
Conversion Guidelines:
Generate Visualization:
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
# Create visualization
dr.display_all_functions('dag.png') # All nodes
dr.visualize_execution(
['final_output'],
'execution.png',
inputs={'input_data': ...}
) # Execution path only
Understanding DAG Structure:
Debugging Tips:
dr.list_available_variables() to see all nodesdr.what_is_downstream_of('node_name') for dependenciesUnit Testing Pattern:
import pytest
import pandas as pd
from my_functions import cleaned_data, feature
def test_cleaned_data():
"""Test data cleaning."""
raw = pd.DataFrame({
'col_a': [1, 2, None, 4],
'col_b': ['a', 'b', 'c', 'd']
})
result = cleaned_data(raw)
assert len(result) == 3
assert result['col_a'].isna().sum() == 0
def test_feature():
"""Test feature calculation."""
data = pd.DataFrame({'col_a': [1, 2, 3]})
result = feature(data)
pd.testing.assert_series_equal(
result,
pd.Series([2, 4, 6], name='col_a')
)
Integration Testing with Driver:
def test_full_pipeline():
"""Test complete DAG execution."""
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
result = dr.execute(
['result'],
inputs={'data_path': 'test_data.csv'}
)
assert 'result' in result
assert result['result'].sum() > 0
Circular Dependencies:
# ❌ Bad - circular dependency
def a(b: int) -> int:
return b + 1
def b(a: int) -> int:
return a + 1
# ✅ Good - break the cycle
def a(input_value: int) -> int:
return input_value + 1
def b(a: int) -> int:
return a + 1
Missing Type Hints:
# ❌ Bad - no type hints
def process(data):
return data * 2
# ✅ Good - clear types
def process(data: pd.Series) -> pd.Series:
return data * 2
Mutating Inputs:
# ❌ Bad - mutates input
def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
df[col_name] = 0 # Modifies original!
return df
# ✅ Good - returns new object
def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
result = df.copy()
result[col_name] = 0
return result
hamilton/ - Main package codehamilton/driver.py - Main orchestration classhamilton/function_modifiers/ - Decoratorsexamples/ - Production examplestests/ - Unit and integration testsdocs/ - Official documentationdocs/ directory in repoexamples/ directory for patterns/hamilton-scale for async/Spark, /hamilton-llm for AI workflowsFor detailed reference material, see: