Building Conversational AI with LLMs and Agents
Appendix T: Distributed ML: PySpark, Databricks, and Ray

Production Data Pipelines and Model Serving at Scale

Big Picture

Production LLM systems require more than a trained model and a serving endpoint. They need data pipelines that continuously ingest, validate, and transform data; orchestration that coordinates training, evaluation, and deployment; monitoring that detects data drift and model degradation; and infrastructure that scales to handle unpredictable traffic. This section brings together the components from Sections T.1 through T.4 into end-to-end production architectures, covering pipeline orchestration with Airflow, CI/CD for ML, multi-model serving patterns, and observability for LLM systems.

T.7.1 End-to-End Pipeline Architecture

A production LLM pipeline typically consists of four stages: data ingestion (collecting raw data from APIs, databases, and event streams), data preparation (cleaning, filtering, and tokenizing into training-ready format), model training (fine-tuning or continued pretraining on prepared data), and deployment (packaging, validating, and serving the updated model). Each stage must be automated, idempotent, and observable.

┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│  Ingest  │──▶│ Validate │──▶│ Prepare  │──▶│  Train   │──▶│  Deploy  │
│          │   │          │   │          │   │          │   │          │
│ Kafka    │   │ Great    │   │ Spark /  │   │ Ray      │   │ vLLM /   │
│ Fivetran │   │ Expect.  │   │ Ray Data │   │ Train    │   │ Ray Serve│
│ APIs     │   │ Soda     │   │ Delta    │   │ DeepSpeed│   │ K8s      │
└──────────┘   └──────────┘   └──────────┘   └──────────┘   └──────────┘
       │              │              │              │              │
       └──────────────┴──────────────┴──────────────┴──────────────┘
                              Airflow / Dagster / Prefect
                         (Orchestration and Monitoring)
        
Figure T.7.1: End-to-end LLM pipeline. Each stage is an independently testable component, orchestrated by a workflow engine and monitored for data quality and model performance.

T.7.2 Pipeline Orchestration with Airflow

Apache Airflow is the most widely used orchestrator for ML pipelines. It represents workflows as directed acyclic graphs (DAGs) of tasks, with built-in support for scheduling, retries, alerting, and dependency management. For LLM pipelines, Airflow coordinates the steps that connect data preparation (using Spark on Databricks, as in Section T.1) with distributed training (using Ray, as in Section T.3) and deployment to an inference engine (as covered in Appendix S).

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
    "email_on_failure": True,
    "email": ["ml-team@company.com"],
}

with DAG(
    dag_id="llm_training_pipeline",
    default_args=default_args,
    schedule_interval="@weekly",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["llm", "training"],
) as dag:

    # Step 1: Run data validation on the latest training data
    validate_data = DatabricksRunNowOperator(
        task_id="validate_training_data",
        databricks_conn_id="databricks_default",
        job_id=12345,  # Databricks job that runs Great Expectations
    )

    # Step 2: Prepare training data (tokenize, filter, split)
    prepare_data = DatabricksRunNowOperator(
        task_id="prepare_training_data",
        databricks_conn_id="databricks_default",
        job_id=12346,
    )

    # Step 3: Launch distributed training on Ray cluster
    train_model = KubernetesPodOperator(
        task_id="train_model",
        name="ray-training-job",
        image="company/ray-train:latest",
        cmds=["python", "train.py"],
        arguments=["--config", "configs/llama_sft.yaml"],
        namespace="ml-training",
        get_logs=True,
        startup_timeout_seconds=600,
    )

    # Step 4: Evaluate the trained model
    evaluate_model = KubernetesPodOperator(
        task_id="evaluate_model",
        name="model-evaluation",
        image="company/eval-harness:latest",
        cmds=["python", "evaluate.py"],
        namespace="ml-training",
    )

    # Define task dependencies
    validate_data >> prepare_data >> train_model >> evaluate_model
Tip

Use Airflow's ShortCircuitOperator to skip downstream tasks when a validation step fails. For example, if data validation detects that fewer than 1,000 new examples have been added since the last training run, short-circuit the DAG to avoid wasting GPU hours on a training run that will produce negligible improvement.

T.7.3 Data Validation with Great Expectations

Data quality is the most common root cause of model degradation in production. Great Expectations is an open-source framework that lets you define expectations (assertions about your data) and run them automatically before each training run. For LLM training data, useful expectations include checking that instruction and response columns are non-null, that text lengths fall within expected ranges, and that quality scores meet a minimum threshold.

import great_expectations as gx

context = gx.get_context()

# Connect to a Delta Lake data source
datasource = context.data_sources.add_spark(name="delta_lake")
data_asset = datasource.add_dataframe_asset(name="instruction_pairs")

# Define expectations for training data quality
batch = data_asset.get_batch()
validator = context.get_validator(batch=batch)

# No null values in critical columns
validator.expect_column_values_to_not_be_null("instruction")
validator.expect_column_values_to_not_be_null("response")

# Text length within expected range (catch truncation or corruption)
validator.expect_column_value_lengths_to_be_between(
    "instruction", min_value=10, max_value=4096
)
validator.expect_column_value_lengths_to_be_between(
    "response", min_value=5, max_value=8192
)

# Quality score meets minimum threshold
validator.expect_column_values_to_be_between(
    "quality_score", min_value=0.6, max_value=1.0
)

# Run validation and check results
results = validator.validate()
if not results.success:
    failing = [r for r in results.results if not r.success]
    raise ValueError(
        f"Data validation failed: {len(failing)} expectations not met"
    )

T.7.4 CI/CD for ML: Automated Model Evaluation

Continuous integration for ML extends beyond code tests to include model evaluation. When a training pipeline produces a new model version, an automated evaluation step should run a standardized benchmark suite before the model is promoted to production. This prevents regressions where a newly trained model performs worse on specific tasks despite having a lower overall loss.

# .github/workflows/model-evaluation.yml
name: Model Evaluation Pipeline

on:
  workflow_dispatch:
    inputs:
      model_version:
        description: 'MLflow model version to evaluate'
        required: true
      model_name:
        description: 'Registered model name'
        default: 'ml_catalog.llm_models.llama_sft'

jobs:
  evaluate:
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4

      - name: Install dependencies
        run: pip install mlflow vllm lm-eval

      - name: Download model from MLflow
        run: |
          python scripts/download_model.py \
            --model-name ${{ inputs.model_name }} \
            --version ${{ inputs.model_version }} \
            --output-dir ./model

      - name: Run evaluation benchmarks
        run: |
          python -m lm_eval \
            --model vllm \
            --model_args pretrained=./model \
            --tasks mmlu,hellaswag,truthfulqa \
            --batch_size auto \
            --output_path ./eval_results

      - name: Check quality gates
        run: |
          python scripts/check_quality_gates.py \
            --results ./eval_results \
            --min-mmlu 0.65 \
            --min-hellaswag 0.75 \
            --max-regression 0.02

      - name: Promote model if gates pass
        if: success()
        run: |
          python scripts/promote_model.py \
            --model-name ${{ inputs.model_name }} \
            --version ${{ inputs.model_version }} \
            --stage production
Key Insight

The quality gate pattern is essential for production ML. Define explicit thresholds for each benchmark, and include a maximum regression check that compares the new model against the currently deployed version. A model can have excellent absolute scores but still be a regression on specific tasks that matter to your users. Always compare against the production baseline, not just a fixed threshold.

T.7.5 Multi-Model Serving Architecture

Production LLM applications rarely serve a single model. A typical deployment includes a primary generation model, an embedding model for retrieval, a reranker, and possibly a smaller classifier for intent routing. Managing these models requires an architecture that handles independent scaling, version management, and traffic routing for each model.

# Kubernetes deployment for multi-model serving with Ray Serve
# deploy_serving.py

from ray import serve
from ray.serve.config import AutoscalingConfig

@serve.deployment(
    autoscaling_config=AutoscalingConfig(
        min_replicas=2,
        max_replicas=10,
        target_ongoing_requests=5,
    ),
    ray_actor_options={"num_gpus": 1},
)
class GenerationModel:
    def __init__(self):
        from vllm import LLM
        self.llm = LLM(model="meta-llama/Llama-3.1-8B-Instruct")

@serve.deployment(
    autoscaling_config=AutoscalingConfig(
        min_replicas=1, max_replicas=4,
    ),
    ray_actor_options={"num_gpus": 0.5},
)
class EmbeddingModel:
    def __init__(self):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer("BAAI/bge-large-en-v1.5")

    async def embed(self, texts):
        return self.model.encode(texts).tolist()

@serve.deployment(num_replicas=1)
class Router:
    def __init__(self, generator, embedder):
        self.generator = generator
        self.embedder = embedder

    async def __call__(self, request):
        data = await request.json()
        endpoint = data.get("endpoint", "generate")

        if endpoint == "embed":
            return await self.embedder.embed.remote(data["texts"])
        elif endpoint == "generate":
            return await self.generator.generate.remote(data["prompt"])

# Compose the application
generator = GenerationModel.bind()
embedder = EmbeddingModel.bind()
app = Router.bind(generator=generator, embedder=embedder)
serve.run(app, host="0.0.0.0", port=8000)

T.7.6 Observability and Monitoring

LLM systems require monitoring beyond traditional application metrics. In addition to latency, throughput, and error rates, you need to track token usage (for cost management), output quality (for detecting model degradation), and data drift (for knowing when to retrain). Prometheus and Grafana provide the infrastructure layer, while specialized LLM observability tools like LangSmith, Langfuse, or Phoenix handle trace-level analysis of prompt chains and retrieval quality.

# Prometheus metrics for LLM serving
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Request-level metrics
REQUEST_COUNT = Counter(
    "llm_requests_total", "Total LLM requests",
    ["model", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
    "llm_request_duration_seconds", "Request latency",
    ["model", "endpoint"],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
TOKENS_GENERATED = Counter(
    "llm_tokens_generated_total", "Total tokens generated",
    ["model"],
)
ACTIVE_REQUESTS = Gauge(
    "llm_active_requests", "Currently processing requests",
    ["model"],
)

# Start metrics endpoint
start_http_server(9090)

# Instrument your serving code
import time

async def serve_request(model_name, prompt):
    ACTIVE_REQUESTS.labels(model=model_name).inc()
    start = time.perf_counter()
    try:
        response = await generate(prompt)
        duration = time.perf_counter() - start

        REQUEST_COUNT.labels(
            model=model_name, endpoint="generate", status="success"
        ).inc()
        REQUEST_LATENCY.labels(
            model=model_name, endpoint="generate"
        ).observe(duration)
        TOKENS_GENERATED.labels(model=model_name).inc(
            response.usage.completion_tokens
        )
        return response
    except Exception as e:
        REQUEST_COUNT.labels(
            model=model_name, endpoint="generate", status="error"
        ).inc()
        raise
    finally:
        ACTIVE_REQUESTS.labels(model=model_name).dec()
Warning

Traditional ML monitoring tools are not sufficient for LLM systems. Input/output data drift detection requires embedding-based methods (not just statistical tests on numeric features), and output quality monitoring requires either automated evaluation with a judge model or structured human feedback loops. Budget for LLM-specific observability from the start; retrofitting it after launch is significantly harder.

Summary

Production LLM systems are distributed systems that require the same rigor as any critical infrastructure: automated pipelines, data validation, CI/CD with quality gates, multi-model serving, and comprehensive observability. Airflow or similar orchestrators coordinate the flow from data ingestion through training to deployment. Great Expectations catches data quality issues before they reach the model. CI/CD pipelines with benchmark evaluation prevent regressions. Multi-model serving architectures handle the reality that production LLM applications depend on several models working in concert. Finally, Prometheus metrics and LLM-specific observability tools provide the visibility needed to maintain reliability at scale. Together with the Databricks platform (Section T.1), Delta Lake storage (Section T.2), Ray compute (Section T.3), and feature stores (Section T.4), these components form a complete infrastructure stack for production LLM applications.