
Ray Data
Wire Ray Data parquet loads into Ray Train and PyTorch or TensorFlow loops for distributed ML without bespoke shard glue.
Overview
Ray-data is an agent skill most often used in Build (also Operate iterate) that connects Ray Data loaders to Ray Train and framework batch iterators.
Install
npx skills add https://github.com/orchestra-research/ai-research-skills --skill ray-dataWhat is this skill?
- TorchTrainer example with named train/val dataset shards and multi-worker GPU ScalingConfig
- to_torch, iter_torch_batches, and TensorFlow to_tf batch iteration paths
- S3 parquet read patterns for train and validation splits
- Ray Train get_dataset_shard usage inside train_func
- Example ScalingConfig uses 4 workers with GPU enabled
Adoption & trust: 1 installs on skills.sh; 9.4k GitHub stars; 1/3 security scanners passed (skills.sh audits).
What problem does it solve?
Distributed training jobs duplicate or break data loading because shards and framework batch APIs are wired inconsistently across workers.
Who is it for?
Indie teams already on Ray who need parquet-to-trainer integration without writing custom shard managers.
Skip if: Single-GPU notebook training with local CSV files and no Ray cluster.
When should I use this skill?
You are implementing Ray Train jobs that must read Ray Data datasets and feed PyTorch or TensorFlow training loops.
What do I get? / Deliverables
You have working Ray Train dataset wiring and PyTorch or TensorFlow iteration patterns ready to drop into scalable training jobs.
- TorchTrainer setup with named datasets
- Framework-specific batch iteration snippet
- ScalingConfig template for multi-worker runs
Recommended Skills
Journey fit
Spans multiple journey phases - primary shelf plus alternate fits below.
Ray Data integration is primary Build work for training pipelines, but the same patterns extend when you operate scaled retraining jobs. Dataset shards, iter_batches, and ScalingConfig are backend distributed-compute concerns, not mobile UI or launch SEO.
Where it fits
Stand up TorchTrainer with train and val parquet on S3 before a fine-tuning sprint.
Bridge an existing PyTorch loop to iter_torch_batches without rewriting the model step.
Adjust ScalingConfig and shard iteration when scheduled retraining jobs OOM or stall.
How it compares
Ray-specific data plane guide, not a generic pandas ETL or Spark skill.
Common Questions / FAQ
Who is ray-data for?
Builders running distributed ML on Ray who need Train, PyTorch, and TensorFlow dataset hooks aligned.
When should I use ray-data?
In Build when launching TorchTrainer with S3 parquet; in Operate when updating retrain pipelines that still use Ray Data shards.
Is ray-data safe to install?
Examples imply cloud data paths and Ray workers; review the Security Audits panel on this Prism page and lock down cluster credentials separately.
SKILL.md
READMESKILL.md - Ray Data
# Ray Data Integration Guide Integration with Ray Train and ML frameworks. ## Ray Train integration ### Basic training with datasets ```python import ray from ray.train import ScalingConfig from ray.train.torch import TorchTrainer # Create datasets train_ds = ray.data.read_parquet("s3://data/train/") val_ds = ray.data.read_parquet("s3://data/val/") def train_func(config): # Get dataset shards train_ds = ray.train.get_dataset_shard("train") val_ds = ray.train.get_dataset_shard("val") for epoch in range(config["epochs"]): # Iterate over batches for batch in train_ds.iter_batches(batch_size=32): # Train on batch pass # Launch training trainer = TorchTrainer( train_func, train_loop_config={"epochs": 10}, datasets={"train": train_ds, "val": val_ds}, scaling_config=ScalingConfig(num_workers=4, use_gpu=True) ) result = trainer.fit() ``` ## PyTorch integration ### Convert to PyTorch Dataset ```python # Option 1: to_torch (recommended) torch_ds = ds.to_torch( label_column="label", batch_size=32, drop_last=True ) for batch in torch_ds: inputs = batch["features"] labels = batch["label"] # Train model # Option 2: iter_torch_batches for batch in ds.iter_torch_batches(batch_size=32): # batch is dict of tensors pass ``` ## TensorFlow integration ```python tf_ds = ds.to_tf( feature_columns=["image", "text"], label_column="label", batch_size=32 ) for features, labels in tf_ds: # Train TensorFlow model pass ``` ## Best practices 1. **Shard datasets in Ray Train** - Automatic with `get_dataset_shard()` 2. **Use streaming** - Don't load entire dataset to memory 3. **Preprocess in Ray Data** - Distribute preprocessing across cluster 4. **Cache preprocessed data** - Write to Parquet, read in training # Ray Data Transformations Complete guide to data transformations in Ray Data. ## Core operations ### Map batches (vectorized) ```python # Recommended for performance def process_batch(batch): # batch is dict of numpy arrays or pandas Series batch["doubled"] = batch["value"] * 2 return batch ds = ds.map_batches(process_batch, batch_size=1000) ``` **Performance**: 10-100× faster than row-by-row ### Map (row-by-row) ```python # Use only when vectorization not possible def process_row(row): row["squared"] = row["value"] ** 2 return row ds = ds.map(process_row) ``` ### Filter ```python # Remove rows ds = ds.filter(lambda row: row["score"] > 0.5) ``` ### Flat map ```python # One row → multiple rows def expand_row(row): return [{"value": row["value"] + i} for i in range(3)] ds = ds.flat_map(expand_row) ``` ## GPU-accelerated transforms ```python def gpu_transform(batch): import torch data = torch.tensor(batch["data"]).cuda() # GPU processing result = data * 2 return {"processed": result.cpu().numpy()} ds = ds.map_batches(gpu_transform, num_gpus=1, batch_size=64) ``` ## Groupby operations ```python # Group by column grouped = ds.groupby("category") # Aggregate result = grouped.count() # Custom aggregation result = grouped.map_groups(lambda group: { "sum": group["value"].sum(), "mean": group["value"].mean() }) ``` ## Best practices 1. **Use map_batches over map** - 10-100× faster 2. **Tune batch_size** - Larger = faster (balance with memory) 3. **Use GPUs for heavy compute** - Image/audio preprocessing 4. **Stream large datasets** - Use iter_batches for >memory data --- name: ray-data description: Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s of nodes. Use for batch inference, data preprocessing, multi-modal data loading, or distributed ETL pipelines. version: 1.0.0 author: Orchestra Research license: MIT tags: [Data Processing, Ray Data, Distributed Computing, ML Pipelines, Batch Inference, ETL