Production LLM systems require more than a trained model and a serving endpoint. They need data pipelines that continuously ingest, validate, and transform data; orchestration that coordinates training, evaluation, and deployment; monitoring that detects data drift and model degradation; and infrastructure that scales to handle unpredictable traffic. This section brings together the components from Sections T.1 through T.4 into end-to-end production architectures, covering pipeline orchestration with Airflow, CI/CD for ML, multi-model serving patterns, and observability for LLM systems.
T.7.1 End-to-End Pipeline Architecture
A production LLM pipeline typically consists of four stages: data ingestion (collecting raw data from APIs, databases, and event streams), data preparation (cleaning, filtering, and tokenizing into training-ready format), model training (fine-tuning or continued pretraining on prepared data), and deployment (packaging, validating, and serving the updated model). Each stage must be automated, idempotent, and observable.
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Ingest │──▶│ Validate │──▶│ Prepare │──▶│ Train │──▶│ Deploy │
│ │ │ │ │ │ │ │ │ │
│ Kafka │ │ Great │ │ Spark / │ │ Ray │ │ vLLM / │
│ Fivetran │ │ Expect. │ │ Ray Data │ │ Train │ │ Ray Serve│
│ APIs │ │ Soda │ │ Delta │ │ DeepSpeed│ │ K8s │
└──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │ │
└──────────────┴──────────────┴──────────────┴──────────────┘
Airflow / Dagster / Prefect
(Orchestration and Monitoring)
T.7.2 Pipeline Orchestration with Airflow
Apache Airflow is the most widely used orchestrator for ML pipelines. It represents workflows as directed acyclic graphs (DAGs) of tasks, with built-in support for scheduling, retries, alerting, and dependency management. For LLM pipelines, Airflow coordinates the steps that connect data preparation (using Spark on Databricks, as in Section T.1) with distributed training (using Ray, as in Section T.3) and deployment to an inference engine (as covered in Appendix S).
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
"owner": "ml-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
"email": ["ml-team@company.com"],
}
with DAG(
dag_id="llm_training_pipeline",
default_args=default_args,
schedule_interval="@weekly",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["llm", "training"],
) as dag:
# Step 1: Run data validation on the latest training data
validate_data = DatabricksRunNowOperator(
task_id="validate_training_data",
databricks_conn_id="databricks_default",
job_id=12345, # Databricks job that runs Great Expectations
)
# Step 2: Prepare training data (tokenize, filter, split)
prepare_data = DatabricksRunNowOperator(
task_id="prepare_training_data",
databricks_conn_id="databricks_default",
job_id=12346,
)
# Step 3: Launch distributed training on Ray cluster
train_model = KubernetesPodOperator(
task_id="train_model",
name="ray-training-job",
image="company/ray-train:latest",
cmds=["python", "train.py"],
arguments=["--config", "configs/llama_sft.yaml"],
namespace="ml-training",
get_logs=True,
startup_timeout_seconds=600,
)
# Step 4: Evaluate the trained model
evaluate_model = KubernetesPodOperator(
task_id="evaluate_model",
name="model-evaluation",
image="company/eval-harness:latest",
cmds=["python", "evaluate.py"],
namespace="ml-training",
)
# Define task dependencies
validate_data >> prepare_data >> train_model >> evaluate_model
Use Airflow's ShortCircuitOperator to skip downstream tasks when a validation step fails. For example, if data validation detects that fewer than 1,000 new examples have been added since the last training run, short-circuit the DAG to avoid wasting GPU hours on a training run that will produce negligible improvement.
T.7.3 Data Validation with Great Expectations
Data quality is the most common root cause of model degradation in production. Great Expectations is an open-source framework that lets you define expectations (assertions about your data) and run them automatically before each training run. For LLM training data, useful expectations include checking that instruction and response columns are non-null, that text lengths fall within expected ranges, and that quality scores meet a minimum threshold.
import great_expectations as gx
context = gx.get_context()
# Connect to a Delta Lake data source
datasource = context.data_sources.add_spark(name="delta_lake")
data_asset = datasource.add_dataframe_asset(name="instruction_pairs")
# Define expectations for training data quality
batch = data_asset.get_batch()
validator = context.get_validator(batch=batch)
# No null values in critical columns
validator.expect_column_values_to_not_be_null("instruction")
validator.expect_column_values_to_not_be_null("response")
# Text length within expected range (catch truncation or corruption)
validator.expect_column_value_lengths_to_be_between(
"instruction", min_value=10, max_value=4096
)
validator.expect_column_value_lengths_to_be_between(
"response", min_value=5, max_value=8192
)
# Quality score meets minimum threshold
validator.expect_column_values_to_be_between(
"quality_score", min_value=0.6, max_value=1.0
)
# Run validation and check results
results = validator.validate()
if not results.success:
failing = [r for r in results.results if not r.success]
raise ValueError(
f"Data validation failed: {len(failing)} expectations not met"
)
T.7.4 CI/CD for ML: Automated Model Evaluation
Continuous integration for ML extends beyond code tests to include model evaluation. When a training pipeline produces a new model version, an automated evaluation step should run a standardized benchmark suite before the model is promoted to production. This prevents regressions where a newly trained model performs worse on specific tasks despite having a lower overall loss.
# .github/workflows/model-evaluation.yml
name: Model Evaluation Pipeline
on:
workflow_dispatch:
inputs:
model_version:
description: 'MLflow model version to evaluate'
required: true
model_name:
description: 'Registered model name'
default: 'ml_catalog.llm_models.llama_sft'
jobs:
evaluate:
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Install dependencies
run: pip install mlflow vllm lm-eval
- name: Download model from MLflow
run: |
python scripts/download_model.py \
--model-name ${{ inputs.model_name }} \
--version ${{ inputs.model_version }} \
--output-dir ./model
- name: Run evaluation benchmarks
run: |
python -m lm_eval \
--model vllm \
--model_args pretrained=./model \
--tasks mmlu,hellaswag,truthfulqa \
--batch_size auto \
--output_path ./eval_results
- name: Check quality gates
run: |
python scripts/check_quality_gates.py \
--results ./eval_results \
--min-mmlu 0.65 \
--min-hellaswag 0.75 \
--max-regression 0.02
- name: Promote model if gates pass
if: success()
run: |
python scripts/promote_model.py \
--model-name ${{ inputs.model_name }} \
--version ${{ inputs.model_version }} \
--stage production
The quality gate pattern is essential for production ML. Define explicit thresholds for each benchmark, and include a maximum regression check that compares the new model against the currently deployed version. A model can have excellent absolute scores but still be a regression on specific tasks that matter to your users. Always compare against the production baseline, not just a fixed threshold.
T.7.5 Multi-Model Serving Architecture
Production LLM applications rarely serve a single model. A typical deployment includes a primary generation model, an embedding model for retrieval, a reranker, and possibly a smaller classifier for intent routing. Managing these models requires an architecture that handles independent scaling, version management, and traffic routing for each model.
# Kubernetes deployment for multi-model serving with Ray Serve
# deploy_serving.py
from ray import serve
from ray.serve.config import AutoscalingConfig
@serve.deployment(
autoscaling_config=AutoscalingConfig(
min_replicas=2,
max_replicas=10,
target_ongoing_requests=5,
),
ray_actor_options={"num_gpus": 1},
)
class GenerationModel:
def __init__(self):
from vllm import LLM
self.llm = LLM(model="meta-llama/Llama-3.1-8B-Instruct")
@serve.deployment(
autoscaling_config=AutoscalingConfig(
min_replicas=1, max_replicas=4,
),
ray_actor_options={"num_gpus": 0.5},
)
class EmbeddingModel:
def __init__(self):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer("BAAI/bge-large-en-v1.5")
async def embed(self, texts):
return self.model.encode(texts).tolist()
@serve.deployment(num_replicas=1)
class Router:
def __init__(self, generator, embedder):
self.generator = generator
self.embedder = embedder
async def __call__(self, request):
data = await request.json()
endpoint = data.get("endpoint", "generate")
if endpoint == "embed":
return await self.embedder.embed.remote(data["texts"])
elif endpoint == "generate":
return await self.generator.generate.remote(data["prompt"])
# Compose the application
generator = GenerationModel.bind()
embedder = EmbeddingModel.bind()
app = Router.bind(generator=generator, embedder=embedder)
serve.run(app, host="0.0.0.0", port=8000)
T.7.6 Observability and Monitoring
LLM systems require monitoring beyond traditional application metrics. In addition to latency, throughput, and error rates, you need to track token usage (for cost management), output quality (for detecting model degradation), and data drift (for knowing when to retrain). Prometheus and Grafana provide the infrastructure layer, while specialized LLM observability tools like LangSmith, Langfuse, or Phoenix handle trace-level analysis of prompt chains and retrieval quality.
# Prometheus metrics for LLM serving
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Request-level metrics
REQUEST_COUNT = Counter(
"llm_requests_total", "Total LLM requests",
["model", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"llm_request_duration_seconds", "Request latency",
["model", "endpoint"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
TOKENS_GENERATED = Counter(
"llm_tokens_generated_total", "Total tokens generated",
["model"],
)
ACTIVE_REQUESTS = Gauge(
"llm_active_requests", "Currently processing requests",
["model"],
)
# Start metrics endpoint
start_http_server(9090)
# Instrument your serving code
import time
async def serve_request(model_name, prompt):
ACTIVE_REQUESTS.labels(model=model_name).inc()
start = time.perf_counter()
try:
response = await generate(prompt)
duration = time.perf_counter() - start
REQUEST_COUNT.labels(
model=model_name, endpoint="generate", status="success"
).inc()
REQUEST_LATENCY.labels(
model=model_name, endpoint="generate"
).observe(duration)
TOKENS_GENERATED.labels(model=model_name).inc(
response.usage.completion_tokens
)
return response
except Exception as e:
REQUEST_COUNT.labels(
model=model_name, endpoint="generate", status="error"
).inc()
raise
finally:
ACTIVE_REQUESTS.labels(model=model_name).dec()
Traditional ML monitoring tools are not sufficient for LLM systems. Input/output data drift detection requires embedding-based methods (not just statistical tests on numeric features), and output quality monitoring requires either automated evaluation with a judge model or structured human feedback loops. Budget for LLM-specific observability from the start; retrofitting it after launch is significantly harder.
Summary
Production LLM systems are distributed systems that require the same rigor as any critical infrastructure: automated pipelines, data validation, CI/CD with quality gates, multi-model serving, and comprehensive observability. Airflow or similar orchestrators coordinate the flow from data ingestion through training to deployment. Great Expectations catches data quality issues before they reach the model. CI/CD pipelines with benchmark evaluation prevent regressions. Multi-model serving architectures handle the reality that production LLM applications depend on several models working in concert. Finally, Prometheus metrics and LLM-specific observability tools provide the visibility needed to maintain reliability at scale. Together with the Databricks platform (Section T.1), Delta Lake storage (Section T.2), Ray compute (Section T.3), and feature stores (Section T.4), these components form a complete infrastructure stack for production LLM applications.