Building Conversational AI with LLMs and Agents
Appendix T: Distributed ML: PySpark, Databricks, and Ray

Delta Lake and Lakehouse Architecture

Big Picture

Delta Lake is an open-source storage layer that brings ACID transactions, schema enforcement, and time travel to data lakes built on Parquet files. The Lakehouse architecture combines the low-cost scalability of data lakes with the reliability and performance features of data warehouses, creating a single platform for both analytics and ML. For LLM teams, Delta Lake provides versioned, auditable training datasets, efficient streaming ingestion of user feedback, and the foundation for reproducible training pipelines.

T.2.1 Why Data Lakes Alone Are Not Enough

Traditional data lakes store files (CSV, JSON, Parquet) in object storage like S3 or ADLS. While cheap and scalable, they lack the guarantees that ML pipelines demand. Without transactions, a concurrent write can corrupt a dataset mid-training. Without schema enforcement, an upstream change can silently add a column that breaks your tokenizer. Without versioning, you cannot answer the question "what exact data did we use to train model v2.3?"

Delta Lake solves these problems by adding a transaction log (the _delta_log directory) on top of Parquet files. Every write operation, whether an insert, update, delete, or schema change, is recorded as a JSON commit in this log. Readers consult the log to determine which Parquet files constitute the current version of the table, providing snapshot isolation without a database server.

my_table/
  _delta_log/
    00000000000000000000.json   # Initial commit: added file-001.parquet
    00000000000000000001.json   # Added file-002.parquet, file-003.parquet
    00000000000000000002.json   # Deleted file-001.parquet (compaction)
    00000000000000000010.checkpoint.parquet  # Checkpoint for fast reads
  file-002.parquet
  file-003.parquet
  file-004.parquet
        
Figure T.2.1: Delta Lake table structure. The _delta_log directory contains ordered JSON commits that track every mutation. Checkpoint files accelerate log replay for tables with many commits.

T.2.2 ACID Transactions and Concurrent Writes

Delta Lake uses optimistic concurrency control. When two writers attempt to modify the same table simultaneously, the first to commit wins, and the second must retry against the updated state. This guarantees that readers always see a consistent snapshot, even during active writes. For ML teams, this means a data engineer can append new training examples while a training job reads the table, without either operation blocking or corrupting the other.

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())

# Write a DataFrame as a Delta table
training_data = spark.read.json("s3://bucket/raw/instruction_pairs/")
training_data.write.format("delta").mode("overwrite").save(
    "s3://bucket/delta/instruction_pairs"
)

# Append new data (transactional, safe with concurrent readers)
new_examples = spark.read.json("s3://bucket/raw/new_batch_2024_03/")
new_examples.write.format("delta").mode("append").save(
    "s3://bucket/delta/instruction_pairs"
)
Key Insight

Delta Lake's ACID transactions solve the "training data corruption" problem that plagues raw data lakes. Without transactions, a crash during a multi-file write can leave the dataset in an inconsistent state, which produces silent data quality issues that surface only as degraded model performance weeks later.

T.2.3 Time Travel for Reproducible Training

Every commit in the Delta log creates a new version of the table. You can query any historical version by specifying a version number or a timestamp. This feature, called time travel, is indispensable for ML reproducibility. When you log the Delta table version alongside your MLflow run (see Section T.1), you can reconstruct the exact training dataset months or years later.

# Read a specific version of the Delta table
df_v5 = (spark.read.format("delta")
    .option("versionAsOf", 5)
    .load("s3://bucket/delta/instruction_pairs"))

# Read the table as it existed at a specific timestamp
df_historical = (spark.read.format("delta")
    .option("timestampAsOf", "2025-01-15T00:00:00Z")
    .load("s3://bucket/delta/instruction_pairs"))

# View the full commit history
delta_table = DeltaTable.forPath(spark, "s3://bucket/delta/instruction_pairs")
history = delta_table.history()
history.select("version", "timestamp", "operation", "operationMetrics").show(10)

# Restore the table to a previous version (useful for rollbacks)
delta_table.restoreToVersion(5)
Tip

In your training scripts, always log the Delta table version alongside your model artifacts. A simple mlflow.log_param("delta_version", delta_table.history(1).first()["version"]) creates a permanent link between the model and its training data, enabling full reproducibility.

T.2.4 Schema Evolution and Enforcement

Delta Lake enforces schemas by default: if you attempt to write data with columns that do not match the table schema, the write fails with a clear error. This prevents the subtle bugs that occur when an upstream data source adds or renames a field. When schema changes are intentional, you can enable schema evolution with a single option, allowing new columns to be added automatically.

# Schema enforcement: this will FAIL because "quality_label" is not in the schema
try:
    bad_data = spark.createDataFrame([
        {"instruction": "test", "response": "test", "quality_label": "good"}
    ])
    bad_data.write.format("delta").mode("append").save(
        "s3://bucket/delta/instruction_pairs"
    )
except Exception as e:
    print(f"Schema enforcement caught: {e}")

# Schema evolution: allow the new column to be added
new_data_with_extra_col = spark.createDataFrame([
    {"instruction": "test", "response": "test", "category": "general"}
])
(new_data_with_extra_col.write.format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .save("s3://bucket/delta/instruction_pairs"))
DeepSpeed ZeRO-3 training: Stage: 3 (full parameter partitioning) Offload: optimizer to CPU Per-GPU memory: 12.1 GB Effective batch size: 128 Throughput: 3,450 tokens/sec

T.2.5 The Lakehouse Architecture

The Lakehouse architecture, popularized by Databricks, layers structured metadata and governance on top of open file formats in object storage. Unlike a traditional data warehouse (which copies data into a proprietary format), the Lakehouse keeps all data in open formats (Parquet via Delta Lake) while providing warehouse features: ACID transactions, indexing, caching, and SQL access.

For LLM teams, the Lakehouse pattern has three practical advantages. First, training data and analytics data live in the same platform, eliminating ETL between systems. Second, the open format means any tool (Spark, pandas, DuckDB, PyArrow) can read the data directly. Third, the governance layer (Unity Catalog) applies consistently across all access patterns, whether a data scientist reads data from a notebook or a training pipeline reads it from a scheduled job.

┌──────────────────────────────────────────────────────────┐
│                    Unity Catalog                          │
│          (Governance, Access Control, Lineage)           │
├──────────────────────────────────────────────────────────┤
│                      Delta Lake                           │
│     (ACID Transactions, Schema, Time Travel, Z-Order)    │
├──────────────────────────────────────────────────────────┤
│                   Object Storage                          │
│          (S3 / ADLS / GCS, Parquet Files)                │
└──────────────────────────────────────────────────────────┘
        
Figure T.2.2: The Lakehouse architecture stack. Unity Catalog governs access, Delta Lake provides transactional semantics, and object storage provides durable, low-cost persistence.

T.2.6 Optimizing Delta Tables for ML Workloads

Delta Lake includes several optimization features that improve read performance for large training datasets. Z-ordering co-locates related data in the same Parquet files based on column values, which accelerates filtered reads. Compaction (the OPTIMIZE command) merges small files into larger ones, reducing the overhead of reading thousands of tiny files. Liquid clustering, a newer feature, automatically manages data layout without manual Z-order specifications.

-- Compact small files into larger ones (target ~1 GB per file)
OPTIMIZE delta.`s3://bucket/delta/instruction_pairs`;

-- Z-order by columns commonly used for filtering
OPTIMIZE delta.`s3://bucket/delta/instruction_pairs`
  ZORDER BY (source, quality_score);

-- Enable liquid clustering on a new table (Databricks Runtime 13.3+)
CREATE TABLE ml_catalog.llm_data.embeddings (
  doc_id BIGINT,
  chunk_id INT,
  embedding ARRAY<FLOAT>,
  source STRING,
  created_at TIMESTAMP
)
USING DELTA
CLUSTER BY (source, created_at);

-- Vacuum old files (default retention: 7 days)
VACUUM delta.`s3://bucket/delta/instruction_pairs` RETAIN 168 HOURS;
Warning

The VACUUM command permanently deletes old Parquet files that are no longer referenced by the Delta log. After vacuuming, time travel queries to versions older than the retention period will fail. Always set the retention period longer than your longest-running training job, and keep MLflow logs that record the exact Delta version used for each training run.

Summary

Delta Lake transforms raw object storage into a reliable, versioned data platform suitable for production ML. ACID transactions prevent data corruption during concurrent reads and writes, time travel enables reproducible training by pinning exact dataset versions, and schema enforcement catches upstream data drift before it reaches your model. Combined with Unity Catalog's governance layer, the Lakehouse architecture provides a single source of truth for both analytics and ML workloads. In the next section, we explore Ray, which complements Databricks by providing distributed compute for training and serving workloads that extend beyond Spark's data-parallel paradigm.