| name | dask |
| description | A flexible library for parallel computing in Python. It scales Python libraries like NumPy, pandas, and scikit-learn to multi-core systems or distributed clusters. Features lazy evaluation and task scheduling for data that exceeds RAM capacity. Use for out-of-core computing, parallel processing, distributed computing, large-scale data analysis, dask.array, dask.dataframe, dask.delayed, dask.bag, task scheduling, lazy evaluation, and scaling beyond memory limits. |
| version | 2024.1 |
| license | BSD-3-Clause |
Dask - Scalable Parallel Computing
Dask provides high-level collections (Arrays, DataFrames, Bags) that mimic the APIs of NumPy and pandas but operate in parallel on data sets that are larger than memory.
When to Use
- Processing datasets that don't fit in RAM (Out-of-core computing).
- Speeding up computations by using all available CPU cores.
- Parallelizing custom Python functions or complex workflows (dask.delayed).
- Scaling machine learning pipelines to large clusters.
- Handling large-scale arrays in physics, climate science, or imaging.
- Analyzing massive log files or unstructured data (dask.bag).
Reference Documentation
Official docs: https://docs.dask.org/
Dask Examples: https://examples.dask.org/
Search patterns: dask.dataframe, dask.array, dask.delayed, client.compute, dask.distributed
Core Principles
Lazy Evaluation
Dask doesn't compute results immediately. Instead, it builds a Task Graph. Actual computation only happens when you explicitly call .compute() or .persist().
Chunks and Partitions
- Dask Array: Composed of many small NumPy arrays called chunks.
- Dask DataFrame: Composed of many small pandas DataFrames called partitions.
Use Dask For
| Collection | Analogy | Use Case |
|---|
dask.array | NumPy | Large-scale multidimensional math. |
dask.dataframe | pandas | Large CSV/Parquet/SQL tables. |
dask.bag | Lists/Toolz | Unstructured data (JSON, Logs). |
dask.delayed | Functions | Custom parallel logic. |
Do NOT Use For
- Data that fits easily in RAM (pandas/NumPy are faster due to lower overhead).
- Simple tasks where multiprocessing or concurrent.futures suffice.
- Situations where low-latency response is required (Dask adds scheduling overhead).
Quick Reference
Installation
pip install "dask[complete]"
Standard Imports
import dask.array as da
import dask.dataframe as dd
from dask import delayed, compute
from dask.distributed import Client
Basic Pattern - Initializing a Local Cluster
from dask.distributed import Client
client = Client()
print(client.dashboard_link)
Critical Rules
✅ DO
- Use the Dashboard - Always monitor the Dask dashboard to find bottlenecks (red blocks = bad).
- Chunk thoughtfully - Aim for chunk sizes of 100MB to 250MB. Too small = high overhead; too large = memory errors.
- Prefer Parquet - Use Parquet instead of CSV for DataFrames; it supports efficient metadata and partitioning.
- Call
.persist() on reused data - If you use the same intermediate result multiple times, persist it in memory.
- Let Dask handle the graph - Avoid calling
.compute() too early; try to keep calculations lazy as long as possible.
- Use
map_partitions - For custom logic on DataFrames, use this to apply pandas functions directly to each chunk.
❌ DON'T
- Compute too often - Every
.compute() triggers the entire graph execution and pulls data into RAM.
- Send large data to workers - Use
client.scatter for large objects needed by all workers instead of passing them as arguments.
- Iterate over rows -
for row in dask_df is incredibly slow; use vectorized operations.
- Use Dask if pandas is enough - Dask is slower for small data due to scheduling time.
Anti-Patterns (NEVER)
import dask.dataframe as dd
result = dd_df.compute()
mean_val = dd_df['column'].mean().compute()
import dask.array as da
x = da.arange(1000000, chunks=10000)
Dask Array (dask.array)
Scaling NumPy
import dask.array as da
x = da.random.random((100000, 100000), chunks=(10000, 10000))
y = x + x.T
z = y[::2, :5000].mean(axis=0)
result = z.compute()
Dask DataFrame (dask.dataframe)
Scaling pandas
import dask.dataframe as dd
df = dd.read_csv('data/*.csv')
result = (df[df['value'] > 0]
.groupby('category')
.amount.sum())
final_amounts = result.compute()
import pandas as pd
pdf = pd.DataFrame(...)
ddf = dd.from_pandas(pdf, npartitions=10)
Dask Delayed (dask.delayed)
Parallelizing Custom Code
from dask import delayed
@delayed
def load(filename):
...
return data
@delayed
def process(data):
...
return result
@delayed
def summarize(results):
return sum(results)
filenames = ['file1.csv', 'file2.csv', 'file3.csv']
outputs = [process(load(f)) for f in filenames]
total = summarize(outputs)
final_sum = total.compute()
Machine Learning with Dask (dask-ml)
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_dask)
model = LogisticRegression()
model.fit(X_dask, y_dask)
from sklearn.ensemble import RandomForestClassifier
from dask_ml.model_selection import GridSearchCV
clf = RandomForestClassifier()
grid = GridSearchCV(clf, param_grid, cv=3)
grid.fit(X, y)
Practical Workflows
1. Massive Log Processing (Bag)
import dask.bag as db
import json
def analyze_logs(pattern):
b = db.read_text('logs/2023-*.log')
records = b.map(json.loads).filter(lambda x: x['level'] == 'ERROR')
counts = records.pluck('message').frequencies()
return counts.compute()
2. Large Scale Imaging (Array)
def process_satellite_images(da_stack):
"""Calculate NDVI anomaly across time on 1TB of data."""
climatology = da_stack.mean(axis=0)
anomaly = da_stack - climatology
anomaly.to_zarr('anomalies.zarr')
3. Cleaning Data with Method Chaining
def clean_dataset(ddf):
return (ddf
.dropna(subset=['id'])
.fillna({'status': 'unknown'})
.assign(timestamp=dd.to_datetime(ddf['time_str']))
.groupby('user_id')
.last()
.persist())
Performance Optimization
The Dask Dashboard Guide
- Progress Bar: Shows how many tasks are finished.
- Task Stream: Shows which worker is doing what. White space = idle workers (bad).
- Memory Plot: Shows RAM usage. If it turns orange/red, workers are hitting limits.
- Worker Table: Check for skewed data distribution.
Optimizing Data Storage
- Zarr: Best for N-dimensional arrays.
- Parquet: Best for tabular DataFrames.
- Compression: Use snappy or lz4 for a balance between speed and size.
Common Pitfalls and Solutions
The "Worker Lost" Error
Problem: Workers crash because they ran out of RAM.
Solution: Decrease chunk size or use a machine with more memory. Check for data skew.
Serialization Errors (Pickle)
Problem: Dask can't send your custom object to workers.
Solution: Use dask.distributed.Client.register_plugin or ensure classes are defined in a separate file accessible by workers.
"Too Many Tasks" Warning
Problem: You created 1,000,000+ tiny tasks.
Solution: Re-chunk your data into larger pieces. Use dask_array.rechunk() or dask_df.repartition().
Best Practices
- Always monitor the Dask dashboard during development to identify bottlenecks.
- Choose chunk sizes carefully - aim for 100-250MB per chunk for optimal performance.
- Use Parquet format for DataFrames instead of CSV for better performance and metadata support.
- Persist intermediate results that are reused multiple times to avoid recomputation.
- Keep computations lazy as long as possible - only call
.compute() when you need the final result.
- Use
map_partitions for custom pandas operations on Dask DataFrames.
- Avoid iterating over rows in Dask DataFrames - use vectorized operations instead.
- Use shared storage (S3, HDFS, NFS) when working with distributed clusters.
- Batch small tasks together to avoid task overhead.
- Don't use Dask for data that fits in RAM - pandas/NumPy are faster for small datasets.
Dask transforms Python from a single-threaded scripting language into a world-class system for distributed computing. It is the bridge between a researcher's laptop and a high-performance compute cluster.