en un clic
ai-pipeline-orchestration
Orchestrate AI/ML pipelines for data ingestion, model training, batch inference, and RAG indexing using Prefect, Airflow, or Dagster. Build reliable, observable, and retriable workflows for production AI systems.
Menu
Orchestrate AI/ML pipelines for data ingestion, model training, batch inference, and RAG indexing using Prefect, Airflow, or Dagster. Build reliable, observable, and retriable workflows for production AI systems.
Implement multi-layer LLM caching with exact match, semantic similarity, and provider-side prompt caching. Reduce API costs by 30–70%, cut latency, and improve throughput using Redis, GPTCache, and provider caching APIs.
Reduce LLM API and infrastructure costs through model selection, prompt caching, batching, caching, quantization, and self-hosting strategies. Track spend by team and model, set budgets, and implement cost-aware routing.
Design and operationalize SRE dashboards that surface reliability, latency, error, saturation, and capacity signals across services. Use when building observability views for SLOs, incident response, and executive reliability reporting.
Deploy ML models on Kubernetes with KServe (formerly KFServing) and NVIDIA Triton Inference Server. Includes canary deployments, autoscaling, model versioning, A/B testing, and GPU resource management for production model serving.
Deploy, manage, and optimize vector databases for AI applications. Covers Qdrant, Weaviate, pgvector, and Pinecone — collection management, indexing strategies, backup, and performance tuning for production RAG and semantic search workloads.
Harden OpenClaw self-hosted environments with baseline host controls, auth tightening, secret handling, network segmentation, and safe update/rollback workflows. Use when deploying OpenClaw in home labs, startups, or production-like local AI infrastructure.
| name | ai-pipeline-orchestration |
| description | Orchestrate AI/ML pipelines for data ingestion, model training, batch inference, and RAG indexing using Prefect, Airflow, or Dagster. Build reliable, observable, and retriable workflows for production AI systems. |
| license | MIT |
| metadata | {"author":"devops-skills","version":"1.0"} |
Build reliable, observable AI workflows — from document ingestion to batch inference to model training pipelines.
Use this skill when:
| Tool | Best For | Complexity | GPU Jobs |
|---|---|---|---|
| Prefect | Modern Python-first; easy to adopt | Low | Good |
| Airflow | Complex DAGs; large teams; existing usage | High | Good |
| Dagster | Asset-centric; strong data lineage | Medium | Excellent |
| Temporal | Long-running workflows; reliability-first | Medium | Good |
pip install prefect prefect-kubernetes
# Start Prefect server (or use Prefect Cloud)
prefect server start
# In another terminal
prefect worker start --pool default-agent-pool
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def fetch_documents(source_url: str) -> list[dict]:
"""Fetch documents from source; cached to avoid re-fetching."""
logger = get_run_logger()
logger.info(f"Fetching from {source_url}")
# ... fetch logic
return documents
@task(retries=3, retry_delay_seconds=30)
def chunk_and_embed(documents: list[dict]) -> list[dict]:
"""Chunk documents and generate embeddings with retry on failure."""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-large-en-v1.5")
chunks = []
for doc in documents:
doc_chunks = chunk_text(doc["content"])
embeddings = model.encode(doc_chunks, batch_size=64)
for chunk, emb in zip(doc_chunks, embeddings):
chunks.append({"text": chunk, "embedding": emb.tolist(),
"source": doc["url"], "doc_hash": doc["hash"]})
return chunks
@task(retries=2)
def upsert_to_vector_store(chunks: list[dict]) -> int:
"""Upsert embeddings to Qdrant, skip unchanged documents."""
from qdrant_client import QdrantClient
client = QdrantClient("http://qdrant:6333")
client.upsert(collection_name="knowledge-base", points=[...])
return len(chunks)
@flow(name="rag-ingestion", log_prints=True)
def rag_ingestion_pipeline(sources: list[str]):
"""Full RAG ingestion flow — runs daily."""
logger = get_run_logger()
total = 0
for source in sources:
docs = fetch_documents(source)
chunks = chunk_and_embed(docs)
count = upsert_to_vector_store(chunks)
total += count
logger.info(f"Ingested {count} chunks from {source}")
logger.info(f"Pipeline complete: {total} total chunks indexed")
if __name__ == "__main__":
rag_ingestion_pipeline.serve(
name="daily-rag-ingestion",
cron="0 2 * * *", # 2 AM daily
parameters={"sources": ["https://docs.myapp.com", "https://api.myapp.com/kb"]},
)
from prefect import flow, task
from prefect.concurrency.sync import concurrency
import asyncio
from openai import AsyncOpenAI
@task(retries=3, retry_delay_seconds=60)
async def process_batch(items: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""Process a batch of items through LLM with rate limit protection."""
client = AsyncOpenAI()
async with concurrency("openai-api", occupy=len(items)): # rate limit
tasks = [
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=256,
)
for item in items
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for item, response in zip(items, responses):
if isinstance(response, Exception):
results.append({**item, "error": str(response), "output": None})
else:
results.append({**item, "output": response.choices[0].message.content})
return results
@flow(name="batch-llm-inference")
async def batch_inference_flow(input_file: str, output_file: str, batch_size: int = 50):
import json
items = [json.loads(line) for line in open(input_file)]
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
all_results = []
for batch in batches:
results = await process_batch(batch)
all_results.extend(results)
with open(output_file, "w") as f:
for result in all_results:
f.write(json.dumps(result) + "\n")
return len(all_results)
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
from kubernetes.client import models as k8s
@dag(
dag_id="llm_fine_tuning",
schedule="@weekly",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ai", "training"],
)
def llm_fine_tuning_dag():
@task
def prepare_dataset() -> str:
"""Download and preprocess training data."""
# ... data prep logic
return "s3://my-bucket/training-data/2025-03-01/"
train = KubernetesPodOperator(
task_id="train_model",
name="llm-training-job",
namespace="ml",
image="nvcr.io/nvidia/pytorch:24.05-py3",
cmds=["accelerate", "launch", "-m", "axolotl.cli.train", "/config/config.yaml"],
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "4", "memory": "320Gi"},
requests={"nvidia.com/gpu": "4"},
),
node_selector={"nvidia.com/gpu.product": "A100-SXM4-80GB"},
volumes=[...],
volume_mounts=[...],
get_logs=True,
is_delete_operator_pod=True,
)
@task
def evaluate_model(dataset_path: str) -> dict:
"""Run evals; fail pipeline if quality drops."""
metrics = run_evals()
if metrics["accuracy"] < 0.85:
raise ValueError(f"Model quality too low: {metrics}")
return metrics
@task
def deploy_model(metrics: dict):
"""Push merged model to HF Hub and update vLLM config."""
update_serving_config(new_model="org/fine-tuned-v2")
dataset = prepare_dataset()
train.set_upstream(dataset)
eval_result = evaluate_model(dataset)
eval_result.set_upstream(train)
deploy_model(eval_result)
llm_fine_tuning_dag()
from dagster import asset, AssetExecutionContext, define_asset_job, ScheduleDefinition
@asset(description="Raw documents fetched from knowledge sources")
def raw_documents(context: AssetExecutionContext) -> list[dict]:
context.log.info("Fetching documents...")
return fetch_all_documents()
@asset(
deps=[raw_documents],
description="Chunked and embedded document vectors",
)
def document_embeddings(context: AssetExecutionContext, raw_documents) -> int:
chunks = process_and_embed(raw_documents)
context.log.info(f"Generated {len(chunks)} embeddings")
upsert_to_qdrant(chunks)
return len(chunks)
@asset(
deps=[document_embeddings],
description="RAG system quality metrics",
)
def rag_quality_metrics(context: AssetExecutionContext) -> dict:
metrics = evaluate_rag_system()
context.add_output_metadata({"ragas_score": metrics["ragas_score"]})
return metrics
# Schedule: refresh embeddings nightly
nightly_refresh = ScheduleDefinition(
job=define_asset_job("rag_refresh_job", [raw_documents, document_embeddings]),
cron_schedule="0 1 * * *",
)
concurrency limits in Prefect or pool slots in Airflow to respect external rate limits.