com um clique
daft-distributed-scaling
// Scale Daft workflows to distributed Ray clusters. Invoke when optimizing performance or handling large data.
// Scale Daft workflows to distributed Ray clusters. Invoke when optimizing performance or handling large data.
Navigate Daft documentation. Invoke when user asks general questions about APIs, concepts, or examples, or wants to search the docs.
Optimize Daft UDF performance. Invoke when user needs GPU inference, encounters slow UDFs, or asks about async/batch processing.
| name | daft-distributed-scaling |
| description | Scale Daft workflows to distributed Ray clusters. Invoke when optimizing performance or handling large data. |
Scale single-node workflows to distributed execution.
| Strategy | API | Use Case | Pros/Cons |
|---|---|---|---|
| Shuffle | repartition(N) | Light data (e.g. file paths), Joins | Global balance. High memory usage (materializes data). |
| Streaming | into_batches(N) | Heavy data (images, tensors) | Low memory (streaming). High scheduling overhead if batches too small. |
Best for distributing file paths before heavy reads.
# Create enough partitions to saturate workers
df = daft.read_parquet("s3://metadata").repartition(100)
df = df.with_column("data", read_heavy_data(df["path"]))
Best for processing large partitions without OOM.
# Stream 1GB partition in 64-row chunks to control memory
df = df.read_parquet("heavy_data").into_batches(64)
df = df.with_column("embed", model.predict(df["img"]))
Target: Keep all actors busy without OOM or scheduling bottlenecks.
Calculate the Max Partition Count to ensure each task has enough data to feed local actors.
Batch Size * (Total Concurrency / Nodes)Total Rows / Min Rows Per PartitionExample:
64 * (16/4) = 256.1,000,000 / 256 ≈ 3906.df = df.repartition(1000) # Balanced fan-out
Avoid creating tiny partitions. Use into_batches to stream data within larger partitions.
Strategy: Keep partitions large (e.g. 1GB+), use into_batches(Batch Size) to control memory.
# Stream batches to control memory usage per actor
df = df.into_batches(64).with_column("preds", model(max_concurrency=16).predict(df["img"]))