| name | ray-data |
| description | Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s of nodes. Use for batch inference, data preprocessing, multi-modal data loading, or distributed ETL pipelines. |
| version | 1.0.0 |
| author | Orchestra Research |
| license | MIT |
| tags | ["Data Processing","Ray Data","Distributed Computing","ML Pipelines","Batch Inference","ETL","Scalable","Ray","PyTorch","TensorFlow"] |
| dependencies | ["ray[data]","pyarrow","pandas"] |
Ray Data - Scalable ML Data Processing
Distributed data processing library for ML and AI workloads.
When to use Ray Data
Use Ray Data when:
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster
Key features:
- Streaming execution: Process data larger than memory
- GPU support: Accelerate transforms with GPUs
- Framework integration: PyTorch, TensorFlow, HuggingFace
- Multi-modal: Images, Parquet, CSV, JSON, audio, video
Use alternatives instead:
- Pandas: Small data (<1GB) on single machine
- Dask: Tabular data, SQL-like operations
- Spark: Enterprise ETL, SQL queries
Quick start
Installation
pip install -U 'ray[data]'
Load and transform data
import ray
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
for batch in ds.iter_batches(batch_size=100):
print(batch)
Integration with Ray Train
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
pass
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
Reading data
From cloud storage
import ray
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
ds = ray.data.read_csv("s3://bucket/data/*.csv")
ds = ray.data.read_json("gs://bucket/data/*.json")
ds = ray.data.read_images("s3://bucket/images/")
From Python objects
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
ds = ray.data.range(1000000)
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
Transformations
Map batches (vectorized)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
Row transformations
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
Filter
ds = ds.filter(lambda row: row["value"] > 100)
Group by and aggregate
ds = ds.groupby("category").count()
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
GPU-accelerated transforms
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1
)
Writing data
ds.write_parquet("s3://bucket/output/")
ds.write_csv("output/")
ds.write_json("output/")
Performance optimization
Repartition
ds = ds.repartition(100)
Batch size tuning
ds.map_batches(process_fn, batch_size=10000)
Streaming execution
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch)
Common patterns
Batch inference
import ray
def load_model():
return MyModel()
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
Data preprocessing pipeline
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
Integration with ML frameworks
PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
inputs, labels = batch["features"], batch["label"]
TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
pass
Supported data formats
| Format | Read | Write | Use Case |
|---|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |
Performance benchmarks
Scaling (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes
GPU acceleration (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec
Use cases
Production deployments:
- Pinterest: Last-mile data processing for model training
- ByteDance: Scaling offline inference with multi-modal LLMs
- Spotify: ML platform for batch inference
References
Resources