Building Conversational AI with LLMs and Agents
Appendix T: Distributed ML: PySpark, Databricks, and Ray

Ray Train, Ray Serve, and Ray Data

Big Picture

Ray is a distributed computing framework that scales Python workloads from a laptop to a multi-node GPU cluster with minimal code changes. Its ML-specific libraries, Ray Train, Ray Serve, and Ray Data, cover the full lifecycle from data loading through distributed training to production inference. Unlike Spark, which excels at data-parallel transformations, Ray provides the flexibility to distribute arbitrary Python functions, making it the preferred framework for distributed LLM training and serving (complementing the inference engines covered in Appendix S).

T.5.1 Ray Core: Distributed Python Primitives

Ray's foundation is a set of simple primitives that turn ordinary Python functions and classes into distributed objects. The @ray.remote decorator converts a function into a task that runs asynchronously on any node in the cluster, and converts a class into an actor that maintains state across calls. Ray handles scheduling, serialization, and fault recovery transparently.

# Install Ray with ML dependencies
pip install "ray[default,train,serve,data]"

# Start a local Ray cluster (for development)
ray start --head --num-gpus 2

# Or connect to an existing cluster
# ray start --address='192.168.1.100:6379'
import ray

# Initialize Ray (connects to local cluster or starts one)
ray.init()

# A remote function runs as a distributed task
@ray.remote(num_gpus=1)
def compute_embeddings(texts, model_name="sentence-transformers/all-MiniLM-L6-v2"):
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer(model_name)
    return model.encode(texts, show_progress_bar=False)

# Launch 4 parallel tasks across available GPUs
text_batches = [texts[i::4] for i in range(4)]
futures = [compute_embeddings.remote(batch) for batch in text_batches]

# Collect results (blocks until all tasks complete)
all_embeddings = ray.get(futures)
print(f"Computed embeddings for {sum(len(e) for e in all_embeddings)} texts")
Megatron-LM configuration: Tensor parallel: 2 Pipeline parallel: 2 Data parallel: 1 Total GPUs: 4 Model: GPT-2 (1.5B parameters)
Key Insight

Ray's programming model is "distributed Python," not "distributed SQL" or "distributed MapReduce." Any Python function can become a distributed task, and any Python class can become a stateful actor. This generality is why Ray has become the standard framework for distributed LLM training: training loops are complex Python programs with GPU state, optimizer state, and gradient communication that do not fit neatly into Spark's functional paradigm.

T.5.2 Ray Train: Distributed Model Training

Ray Train provides a unified API for distributed training across multiple backends: PyTorch DDP, DeepSpeed, HuggingFace Accelerate, and TensorFlow. The key abstraction is the TorchTrainer, which wraps your training function and handles process spawning, GPU assignment, distributed data loading, and checkpoint management. You write a standard single-GPU training loop, and Ray Train scales it to multiple GPUs or nodes.

import ray.train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig, CheckpointConfig

def train_fn(config):
    """Standard PyTorch training function, run on each worker."""
    import torch
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from torch.utils.data import DataLoader

    # Ray Train automatically sets the correct local rank and device
    model = AutoModelForCausalLM.from_pretrained(config["model_name"])
    model = ray.train.torch.prepare_model(model)  # Wraps in DDP

    tokenizer = AutoTokenizer.from_pretrained(config["model_name"])

    # Load data through Ray Data (see T.5.4)
    train_ds = ray.train.get_dataset_shard("train")
    train_loader = train_ds.iter_torch_batches(batch_size=config["batch_size"])

    optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])

    for epoch in range(config["epochs"]):
        total_loss = 0
        for batch in train_loader:
            input_ids = batch["input_ids"].to(ray.train.torch.get_device())
            labels = batch["labels"].to(ray.train.torch.get_device())

            outputs = model(input_ids=input_ids, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            total_loss += loss.item()

        # Report metrics to Ray Train
        ray.train.report({"epoch": epoch, "loss": total_loss})

# Configure distributed training
trainer = TorchTrainer(
    train_loop_per_worker=train_fn,
    train_loop_config={
        "model_name": "meta-llama/Llama-3.1-8B",
        "batch_size": 4,
        "lr": 2e-5,
        "epochs": 3,
    },
    scaling_config=ScalingConfig(
        num_workers=4,           # 4 GPU workers
        use_gpu=True,
        resources_per_worker={"GPU": 1, "CPU": 8},
    ),
    run_config=RunConfig(
        name="llama-sft-distributed",
        checkpoint_config=CheckpointConfig(num_to_keep=2),
    ),
    datasets={"train": ray_dataset},  # Ray Data dataset
)

result = trainer.fit()
print(f"Final loss: {result.metrics['loss']:.4f}")
Pipeline parallel training: Microbatch size: 4 Num microbatches: 8 Pipeline stages: 4 Bubble overhead: 12% Throughput: 2,100 tokens/sec

T.5.3 Integrating DeepSpeed with Ray Train

For large models that do not fit on a single GPU, Ray Train integrates with DeepSpeed's ZeRO optimizer stages. ZeRO (Zero Redundancy Optimizer) partitions optimizer states, gradients, and model parameters across workers, dramatically reducing per-GPU memory requirements. Ray Train handles the DeepSpeed configuration and process coordination, so you only need to provide a DeepSpeed config file.

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

# DeepSpeed ZeRO Stage 3 configuration
deepspeed_config = {
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": 2,
    "gradient_accumulation_steps": "auto",
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {"device": "cpu", "pin_memory": True},
        "offload_param": {"device": "cpu", "pin_memory": True},
        "overlap_comm": True,
        "contiguous_gradients": True,
    },
    "bf16": {"enabled": True},
    "gradient_clipping": 1.0,
}

def train_fn_deepspeed(config):
    import deepspeed
    from transformers import AutoModelForCausalLM

    model = AutoModelForCausalLM.from_pretrained(
        config["model_name"], torch_dtype="auto"
    )
    # Ray Train + DeepSpeed: model is wrapped automatically
    model = ray.train.torch.prepare_model(model)

    # ... training loop identical to T.5.2 ...

trainer = TorchTrainer(
    train_loop_per_worker=train_fn_deepspeed,
    train_loop_config={"model_name": "meta-llama/Llama-3.1-70B"},
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True),
    torch_config=ray.train.torch.TorchConfig(
        backend="nccl",
    ),
    # Pass DeepSpeed config directly
)
Tip

When training 70B+ parameter models, start with DeepSpeed ZeRO Stage 3 with CPU offloading. If training is too slow due to CPU offload overhead, remove parameter offloading first (keep only optimizer offloading). If you have enough GPU memory across your cluster, try ZeRO Stage 2, which avoids the communication overhead of parameter partitioning. See the DeepSpeed documentation for detailed memory estimation formulas.

T.5.4 Ray Data: Streaming Data Pipelines

Ray Data provides a distributed data processing library designed for ML workloads. Unlike Spark, which materializes intermediate results, Ray Data uses streaming execution that pipelines data loading, preprocessing, and training. This is critical for LLM training where datasets may be hundreds of gigabytes: you cannot afford to load the entire dataset into memory before training begins.

import ray.data

# Read a large dataset in streaming fashion
ds = ray.data.read_parquet("s3://bucket/delta/instruction_pairs/")

# Apply preprocessing as a streaming transformation
def tokenize_batch(batch, tokenizer_name="meta-llama/Llama-3.1-8B"):
    from transformers import AutoTokenizer
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

    encodings = tokenizer(
        batch["instruction"].tolist(),
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="np",
    )
    batch["input_ids"] = encodings["input_ids"]
    batch["attention_mask"] = encodings["attention_mask"]
    return batch

# Map runs in parallel across the cluster
tokenized_ds = ds.map_batches(
    tokenize_batch,
    batch_format="pandas",
    batch_size=256,
    num_cpus=2,  # CPU resources per map task
)

# Stream directly into training (no materialization)
for batch in tokenized_ds.iter_torch_batches(batch_size=8):
    print(batch["input_ids"].shape)
    break  # Just demonstrating the interface
Tensor parallel inference: Input shape: [1, 128] (batch, seq_len) Output shape: [1, 128, 50257] (batch, seq_len, vocab) Split across 2 GPUs Latency: 0.18s (vs 0.31s on single GPU)

T.5.5 Ray Serve: Production Model Serving

Ray Serve is a scalable model serving framework that handles request batching, model multiplexing, and multi-model composition. Unlike dedicated inference engines such as vLLM or TGI (covered in Appendix S), Ray Serve is a general-purpose serving framework that can compose arbitrary Python logic with model inference. This makes it ideal for multi-step pipelines where a request must pass through a retriever, a reranker, and a generator.

from ray import serve
from ray.serve.handle import DeploymentHandle

@serve.deployment(
    num_replicas=2,
    ray_actor_options={"num_gpus": 1},
)
class LLMDeployment:
    def __init__(self, model_name: str):
        from vllm import LLM, SamplingParams
        self.llm = LLM(model=model_name, gpu_memory_utilization=0.85)
        self.default_params = SamplingParams(
            temperature=0.7, max_tokens=512
        )

    async def generate(self, prompt: str) -> str:
        outputs = self.llm.generate([prompt], self.default_params)
        return outputs[0].outputs[0].text

@serve.deployment(num_replicas=1)
class RAGPipeline:
    def __init__(self, llm_handle: DeploymentHandle):
        self.llm = llm_handle
        # Initialize retriever, reranker, etc.

    async def __call__(self, request):
        query = request.query_params["q"]
        # Step 1: Retrieve relevant documents (simplified)
        context = "Retrieved context would go here..."
        # Step 2: Generate response using the LLM
        prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
        response = await self.llm.generate.remote(prompt)
        return {"answer": response}

# Bind deployments and start serving
llm = LLMDeployment.bind(model_name="meta-llama/Llama-3.1-8B-Instruct")
app = RAGPipeline.bind(llm_handle=llm)
serve.run(app, host="0.0.0.0", port=8000)
Warning

Ray Serve and vLLM solve different problems. vLLM optimizes single-model inference throughput with PagedAttention and continuous batching. Ray Serve orchestrates multi-step pipelines and manages deployment lifecycle. For best results, use Ray Serve as the orchestration layer with vLLM as the inference backend, combining the strengths of both frameworks.

Summary

Ray provides a comprehensive distributed computing stack for LLM workflows. Ray Train scales training from one GPU to hundreds with a simple wrapper around your existing training loop, with native support for DeepSpeed and FSDP. Ray Data streams large datasets through preprocessing pipelines without materializing intermediate results. Ray Serve deploys models and multi-step pipelines behind production REST endpoints with autoscaling and request batching. Together, these libraries form a unified platform that carries an LLM from data preparation through training to serving. In the next section, we examine feature stores, which formalize the management of computed features that feed into both training and inference pipelines.