Ray is a distributed computing framework that scales Python workloads from a laptop to a multi-node GPU cluster with minimal code changes. Its ML-specific libraries, Ray Train, Ray Serve, and Ray Data, cover the full lifecycle from data loading through distributed training to production inference. Unlike Spark, which excels at data-parallel transformations, Ray provides the flexibility to distribute arbitrary Python functions, making it the preferred framework for distributed LLM training and serving (complementing the inference engines covered in Appendix S).
T.5.1 Ray Core: Distributed Python Primitives
Ray's foundation is a set of simple primitives that turn ordinary Python functions and classes into
distributed objects. The @ray.remote decorator converts a function into a
task that runs asynchronously on any node in the cluster, and converts a class
into an actor that maintains state across calls. Ray handles scheduling,
serialization, and fault recovery transparently.
# Install Ray with ML dependencies
pip install "ray[default,train,serve,data]"
# Start a local Ray cluster (for development)
ray start --head --num-gpus 2
# Or connect to an existing cluster
# ray start --address='192.168.1.100:6379'
import ray
# Initialize Ray (connects to local cluster or starts one)
ray.init()
# A remote function runs as a distributed task
@ray.remote(num_gpus=1)
def compute_embeddings(texts, model_name="sentence-transformers/all-MiniLM-L6-v2"):
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(model_name)
return model.encode(texts, show_progress_bar=False)
# Launch 4 parallel tasks across available GPUs
text_batches = [texts[i::4] for i in range(4)]
futures = [compute_embeddings.remote(batch) for batch in text_batches]
# Collect results (blocks until all tasks complete)
all_embeddings = ray.get(futures)
print(f"Computed embeddings for {sum(len(e) for e in all_embeddings)} texts")
Ray's programming model is "distributed Python," not "distributed SQL" or "distributed MapReduce." Any Python function can become a distributed task, and any Python class can become a stateful actor. This generality is why Ray has become the standard framework for distributed LLM training: training loops are complex Python programs with GPU state, optimizer state, and gradient communication that do not fit neatly into Spark's functional paradigm.
T.5.2 Ray Train: Distributed Model Training
Ray Train provides a unified API for distributed training across multiple backends: PyTorch
DDP, DeepSpeed, HuggingFace Accelerate, and TensorFlow. The key abstraction is the
TorchTrainer, which wraps your training function and handles process spawning, GPU
assignment, distributed data loading, and checkpoint management. You write a standard single-GPU
training loop, and Ray Train scales it to multiple GPUs or nodes.
import ray.train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig, CheckpointConfig
def train_fn(config):
"""Standard PyTorch training function, run on each worker."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.utils.data import DataLoader
# Ray Train automatically sets the correct local rank and device
model = AutoModelForCausalLM.from_pretrained(config["model_name"])
model = ray.train.torch.prepare_model(model) # Wraps in DDP
tokenizer = AutoTokenizer.from_pretrained(config["model_name"])
# Load data through Ray Data (see T.5.4)
train_ds = ray.train.get_dataset_shard("train")
train_loader = train_ds.iter_torch_batches(batch_size=config["batch_size"])
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
for epoch in range(config["epochs"]):
total_loss = 0
for batch in train_loader:
input_ids = batch["input_ids"].to(ray.train.torch.get_device())
labels = batch["labels"].to(ray.train.torch.get_device())
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
total_loss += loss.item()
# Report metrics to Ray Train
ray.train.report({"epoch": epoch, "loss": total_loss})
# Configure distributed training
trainer = TorchTrainer(
train_loop_per_worker=train_fn,
train_loop_config={
"model_name": "meta-llama/Llama-3.1-8B",
"batch_size": 4,
"lr": 2e-5,
"epochs": 3,
},
scaling_config=ScalingConfig(
num_workers=4, # 4 GPU workers
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 8},
),
run_config=RunConfig(
name="llama-sft-distributed",
checkpoint_config=CheckpointConfig(num_to_keep=2),
),
datasets={"train": ray_dataset}, # Ray Data dataset
)
result = trainer.fit()
print(f"Final loss: {result.metrics['loss']:.4f}")
T.5.3 Integrating DeepSpeed with Ray Train
For large models that do not fit on a single GPU, Ray Train integrates with DeepSpeed's ZeRO optimizer stages. ZeRO (Zero Redundancy Optimizer) partitions optimizer states, gradients, and model parameters across workers, dramatically reducing per-GPU memory requirements. Ray Train handles the DeepSpeed configuration and process coordination, so you only need to provide a DeepSpeed config file.
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
# DeepSpeed ZeRO Stage 3 configuration
deepspeed_config = {
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": 2,
"gradient_accumulation_steps": "auto",
"zero_optimization": {
"stage": 3,
"offload_optimizer": {"device": "cpu", "pin_memory": True},
"offload_param": {"device": "cpu", "pin_memory": True},
"overlap_comm": True,
"contiguous_gradients": True,
},
"bf16": {"enabled": True},
"gradient_clipping": 1.0,
}
def train_fn_deepspeed(config):
import deepspeed
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
config["model_name"], torch_dtype="auto"
)
# Ray Train + DeepSpeed: model is wrapped automatically
model = ray.train.torch.prepare_model(model)
# ... training loop identical to T.5.2 ...
trainer = TorchTrainer(
train_loop_per_worker=train_fn_deepspeed,
train_loop_config={"model_name": "meta-llama/Llama-3.1-70B"},
scaling_config=ScalingConfig(num_workers=8, use_gpu=True),
torch_config=ray.train.torch.TorchConfig(
backend="nccl",
),
# Pass DeepSpeed config directly
)
When training 70B+ parameter models, start with DeepSpeed ZeRO Stage 3 with CPU offloading. If training is too slow due to CPU offload overhead, remove parameter offloading first (keep only optimizer offloading). If you have enough GPU memory across your cluster, try ZeRO Stage 2, which avoids the communication overhead of parameter partitioning. See the DeepSpeed documentation for detailed memory estimation formulas.
T.5.4 Ray Data: Streaming Data Pipelines
Ray Data provides a distributed data processing library designed for ML workloads. Unlike Spark, which materializes intermediate results, Ray Data uses streaming execution that pipelines data loading, preprocessing, and training. This is critical for LLM training where datasets may be hundreds of gigabytes: you cannot afford to load the entire dataset into memory before training begins.
import ray.data
# Read a large dataset in streaming fashion
ds = ray.data.read_parquet("s3://bucket/delta/instruction_pairs/")
# Apply preprocessing as a streaming transformation
def tokenize_batch(batch, tokenizer_name="meta-llama/Llama-3.1-8B"):
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
encodings = tokenizer(
batch["instruction"].tolist(),
padding="max_length",
truncation=True,
max_length=512,
return_tensors="np",
)
batch["input_ids"] = encodings["input_ids"]
batch["attention_mask"] = encodings["attention_mask"]
return batch
# Map runs in parallel across the cluster
tokenized_ds = ds.map_batches(
tokenize_batch,
batch_format="pandas",
batch_size=256,
num_cpus=2, # CPU resources per map task
)
# Stream directly into training (no materialization)
for batch in tokenized_ds.iter_torch_batches(batch_size=8):
print(batch["input_ids"].shape)
break # Just demonstrating the interface
T.5.5 Ray Serve: Production Model Serving
Ray Serve is a scalable model serving framework that handles request batching, model multiplexing, and multi-model composition. Unlike dedicated inference engines such as vLLM or TGI (covered in Appendix S), Ray Serve is a general-purpose serving framework that can compose arbitrary Python logic with model inference. This makes it ideal for multi-step pipelines where a request must pass through a retriever, a reranker, and a generator.
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment(
num_replicas=2,
ray_actor_options={"num_gpus": 1},
)
class LLMDeployment:
def __init__(self, model_name: str):
from vllm import LLM, SamplingParams
self.llm = LLM(model=model_name, gpu_memory_utilization=0.85)
self.default_params = SamplingParams(
temperature=0.7, max_tokens=512
)
async def generate(self, prompt: str) -> str:
outputs = self.llm.generate([prompt], self.default_params)
return outputs[0].outputs[0].text
@serve.deployment(num_replicas=1)
class RAGPipeline:
def __init__(self, llm_handle: DeploymentHandle):
self.llm = llm_handle
# Initialize retriever, reranker, etc.
async def __call__(self, request):
query = request.query_params["q"]
# Step 1: Retrieve relevant documents (simplified)
context = "Retrieved context would go here..."
# Step 2: Generate response using the LLM
prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
response = await self.llm.generate.remote(prompt)
return {"answer": response}
# Bind deployments and start serving
llm = LLMDeployment.bind(model_name="meta-llama/Llama-3.1-8B-Instruct")
app = RAGPipeline.bind(llm_handle=llm)
serve.run(app, host="0.0.0.0", port=8000)
Ray Serve and vLLM solve different problems. vLLM optimizes single-model inference throughput with PagedAttention and continuous batching. Ray Serve orchestrates multi-step pipelines and manages deployment lifecycle. For best results, use Ray Serve as the orchestration layer with vLLM as the inference backend, combining the strengths of both frameworks.
Summary
Ray provides a comprehensive distributed computing stack for LLM workflows. Ray Train scales training from one GPU to hundreds with a simple wrapper around your existing training loop, with native support for DeepSpeed and FSDP. Ray Data streams large datasets through preprocessing pipelines without materializing intermediate results. Ray Serve deploys models and multi-step pipelines behind production REST endpoints with autoscaling and request batching. Together, these libraries form a unified platform that carries an LLM from data preparation through training to serving. In the next section, we examine feature stores, which formalize the management of computed features that feed into both training and inference pipelines.