// Create and manage data pipelines using the FlowerPower framework with Hamilton DAGs and uv. Use when users request creating flowerpower projects, pipelines, Hamilton dataflows, or ask about flowerpower configuration, execution, or CLI commands.
| name | flowerpower |
| description | Create and manage data pipelines using the FlowerPower framework with Hamilton DAGs and uv. Use when users request creating flowerpower projects, pipelines, Hamilton dataflows, or ask about flowerpower configuration, execution, or CLI commands. |
Create and manage data processing pipelines using FlowerPower with Hamilton DAGs.
# Install flowerpower
uv pip install flowerpower
# Initialize project
flowerpower init --name my-project
# Create pipeline
flowerpower pipeline new my_pipeline
# Run pipeline
flowerpower pipeline run my_pipeline
Use scripts/init_project.py or CLI:
# CLI
flowerpower init --name <project-name>
# Python
from flowerpower import FlowerPowerProject
project = FlowerPowerProject.init(name='my-project')
Creates structure:
my-project/
โโโ conf/
โ โโโ project.yml
โ โโโ pipelines/
โโโ pipelines/
โโโ hooks/
Use scripts/create_pipeline.py or CLI:
flowerpower pipeline new <name>
Creates:
pipelines/<name>.py - Hamilton functionsconf/pipelines/<name>.yml - Configurationfrom hamilton.function_modifiers import parameterize
from pathlib import Path
from flowerpower.cfg import Config
PARAMS = Config.load(
Path(__file__).parents[1], pipeline_name="my_pipeline"
).pipeline.h_params
@parameterize(**PARAMS.input_config)
def load_data(source: str) -> dict:
"""Load data from source."""
return {"source": source}
def process_data(load_data: dict) -> dict:
"""Process loaded data."""
return {"processed": load_data}
def final_result(process_data: dict) -> str:
"""Return final result."""
return str(process_data)
params:
input_config:
source: "data.csv"
run:
final_vars:
- final_result
executor:
type: threadpool
max_workers: 4
retry:
max_retries: 3
retry_delay: 1.0
# Basic run
flowerpower pipeline run my_pipeline
# With inputs
flowerpower pipeline run my_pipeline --inputs '{"key": "value"}'
# With executor
flowerpower pipeline run my_pipeline --executor threadpool --executor-max-workers 8
# With retries
flowerpower pipeline run my_pipeline --max-retries 3 --retry-delay 2.0
Python API:
from flowerpower import FlowerPowerProject
project = FlowerPowerProject.load('.')
result = project.run('my_pipeline')
# With RunConfig
from flowerpower.cfg.pipeline.run import RunConfig
config = RunConfig(inputs={"key": "value"}, final_vars=["output"])
result = project.run('my_pipeline', run_config=config)
| Command | Description |
|---|---|
flowerpower init --name <name> | Initialize project |
flowerpower pipeline new <name> | Create pipeline |
flowerpower pipeline run <name> | Run pipeline |
flowerpower pipeline show-pipelines | List pipelines |
flowerpower pipeline show-dag <name> | Visualize DAG |
flowerpower pipeline delete <name> | Delete pipeline |
| Type | Use Case | Config |
|---|---|---|
synchronous | Default, sequential | - |
threadpool | I/O-bound tasks | max_workers: N |
processpool | CPU-bound tasks | max_workers: N |
ray | Distributed computing | num_cpus: N |
dask | Distributed computing | num_cpus: N |
uv pip install flowerpower[io] # I/O plugins
uv pip install flowerpower[ui] # Hamilton UI
uv pip install flowerpower[all] # All extras