"Your model is only as good as the data you feed it. The hard part is not generating data; it is converting the chaos of production logs into something a model can learn from."
Synth, Log-Wrangling AI Agent
Enterprise data is a goldmine that most teams leave buried. Every production LLM system generates logs: user queries, model responses, latency traces, tool invocations, and feedback signals. These raw artifacts contain exactly the signal needed for fine-tuning, evaluation, and preference alignment, but only if you can transform them into properly formatted, validated, and balanced datasets. This section covers the end-to-end pipeline: extracting training examples from production traces, converting raw transcripts into structured formats, constructing preference pairs, designing tool-use datasets, enforcing data contracts, filtering for quality, and mixing datasets for optimal training outcomes.
Prerequisites
This section builds on synthetic data principles from Section 13.1: Principles of Synthetic Data Generation, alignment techniques from Chapter 17: Alignment, RLHF, and DPO, and LLM API concepts from Section 10.1: API Landscape and Architecture.
1. Log-to-Dataset Pipelines
Production LLM systems generate an enormous volume of structured and semi-structured logs. A typical chatbot serving 10,000 daily users produces between 50,000 and 200,000 conversation turns per day, each containing a user query, system prompt, model response, latency measurement, and (occasionally) an explicit feedback signal. The challenge is not volume; it is extracting clean, representative training examples from noisy, privacy-sensitive, and often poorly formatted production data.
The log-to-dataset pipeline has four stages: extraction, normalization, filtering, and formatting. Each stage introduces potential failure modes that compound downstream. A malformed timestamp in extraction causes deduplication failures in filtering, which produces training data with repeated examples that bias the model toward common queries.
Before building any log-to-dataset pipeline, instrument your production system to log the full conversation context (system prompt, user turns, assistant turns, tool calls) in a structured format from day one. Retrofitting structured logging into an existing system is painful. Teams that start with good logging infrastructure can begin fine-tuning within weeks; teams that do not often spend months just getting their data into a usable format.
Think of dataset engineering as an oil refinery. Crude oil (raw production logs) enters one end, and refined products (instruction datasets, preference pairs, evaluation sets) come out the other. Each processing stage removes impurities and separates the crude into useful fractions. Skip a stage and you get contaminated fuel that damages the engine. The analogy extends further: just as different engines need different fuel grades, different training objectives need different dataset formats. SFT needs instruction/response pairs. DPO needs preference triplets. Tool-use fine-tuning needs structured call/response schemas.
# Code Fragment 12.8.5: Extracting training examples from production logs
import json
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
@dataclass
class ConversationTurn:
"""A single turn extracted from production logs."""
conversation_id: str
turn_index: int
user_message: str
assistant_message: str
model: str
latency_ms: float
timestamp: datetime
feedback: str | None = None # "thumbs_up", "thumbs_down", or None
tool_calls: list[dict] = field(default_factory=list)
def extract_turns_from_jsonl(log_path: Path) -> list[ConversationTurn]:
"""Parse production JSONL logs into structured conversation turns."""
turns = []
with open(log_path) as f:
for line_num, line in enumerate(f, 1):
try:
record = json.loads(line)
# Skip health checks, system events, incomplete exchanges
if record.get("event_type") != "completion":
continue
if not record.get("response", {}).get("content"):
continue
turn = ConversationTurn(
conversation_id=record["conversation_id"],
turn_index=record.get("turn_index", 0),
user_message=record["request"]["messages"][-1]["content"],
assistant_message=record["response"]["content"],
model=record["request"]["model"],
latency_ms=record.get("latency_ms", 0.0),
timestamp=datetime.fromisoformat(record["timestamp"]),
feedback=record.get("feedback", {}).get("signal"),
tool_calls=record.get("response", {}).get("tool_calls", []),
)
turns.append(turn)
except (KeyError, json.JSONDecodeError) as e:
print(f"Skipping malformed record at line {line_num}: {e}")
return turns
# Usage: extract from a day's logs
turns = extract_turns_from_jsonl(Path("logs/2025-03-15.jsonl"))
print(f"Extracted {len(turns)} turns from production logs")
2. Conversation Formatting
Raw production transcripts rarely match the format expected by training frameworks. A user conversation might span 15 turns, include system messages that should not appear in training data, contain PII that must be redacted, and follow a chat template that differs from your target model's expected format. Conversation formatting converts these raw transcripts into the specific structure your training pipeline requires.
The two most common target formats are single-turn instruction/response pairs (for SFT on individual exchanges) and multi-turn chat templates (for training conversational models). Each format has distinct requirements for how system prompts, tool outputs, and multi-turn context are handled.
# Code Fragment 12.8.9: Converting raw transcripts to training formats
import re
from typing import Literal
def redact_pii(text: str) -> str:
"""Remove emails, phone numbers, and common PII patterns."""
text = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.]+\b', '[EMAIL]', text)
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
return text
def turns_to_instruction_pairs(
turns: list[ConversationTurn],
include_system_prompt: bool = False,
max_context_turns: int = 3,
) -> list[dict]:
"""Convert conversation turns into instruction/response training pairs.
Each pair includes up to max_context_turns of prior conversation
as context, formatted as a single instruction string.
"""
pairs = []
# Group turns by conversation
convos: dict[str, list[ConversationTurn]] = {}
for turn in turns:
convos.setdefault(turn.conversation_id, []).append(turn)
for conv_id, conv_turns in convos.items():
conv_turns.sort(key=lambda t: t.turn_index)
for i, turn in enumerate(conv_turns):
# Build context from previous turns
context_start = max(0, i - max_context_turns)
context_lines = []
for prev in conv_turns[context_start:i]:
context_lines.append(f"User: {redact_pii(prev.user_message)}")
context_lines.append(f"Assistant: {redact_pii(prev.assistant_message)}")
instruction = redact_pii(turn.user_message)
if context_lines:
context_block = "\n".join(context_lines)
instruction = f"Previous conversation:\n{context_block}\n\nCurrent request: {instruction}"
pairs.append({
"instruction": instruction,
"response": redact_pii(turn.assistant_message),
"source": "production_logs",
"conversation_id": conv_id,
"turn_index": i,
})
return pairs
def turns_to_chatml(
turns: list[ConversationTurn],
system_prompt: str = "You are a helpful assistant.",
) -> list[dict]:
"""Convert turns into ChatML multi-turn format for chat fine-tuning."""
convos: dict[str, list[ConversationTurn]] = {}
for turn in turns:
convos.setdefault(turn.conversation_id, []).append(turn)
datasets = []
for conv_id, conv_turns in convos.items():
conv_turns.sort(key=lambda t: t.turn_index)
messages = [{"role": "system", "content": system_prompt}]
for turn in conv_turns:
messages.append({"role": "user", "content": redact_pii(turn.user_message)})
messages.append({"role": "assistant", "content": redact_pii(turn.assistant_message)})
datasets.append({"messages": messages, "conversation_id": conv_id})
return datasets
3. Preference Data Construction
Direct Preference Optimization (DPO) and related algorithms require datasets of pairwise comparisons: for a given prompt, which of two responses does the human prefer? Production systems generate this signal through several channels: explicit thumbs up/down feedback, response regeneration (the user asked for a redo, implying the first response was inadequate), A/B test results, and manual annotation by quality reviewers. The challenge is converting these sparse, noisy signals into clean preference pairs with sufficient volume for training.
A common pitfall is treating thumbs-down as a universal rejection signal. In practice, users click thumbs-down for many reasons: the response was factually wrong, it was too long, it used the wrong tone, or the user simply misclicked. Without understanding why a response was rejected, preference pairs can teach the model contradictory lessons. The best practice is to combine feedback signals with heuristic quality scores to produce preference pairs where the chosen and rejected responses differ along a clear quality dimension.
# Code Fragment 12.8.7: Building DPO preference pairs from feedback signals
from dataclasses import dataclass
@dataclass
class PreferencePair:
prompt: str
chosen: str
rejected: str
source: str # "feedback", "regeneration", "quality_score"
margin: float # confidence in the preference ordering
def build_preference_pairs_from_feedback(
turns: list[ConversationTurn],
) -> list[PreferencePair]:
"""Create DPO pairs from thumbs up/down feedback signals.
Strategy: for each thumbs_down response, find a thumbs_up response
to a semantically similar prompt. Fall back to quality scoring
when direct feedback pairs are unavailable.
"""
thumbs_up = [t for t in turns if t.feedback == "thumbs_up"]
thumbs_down = [t for t in turns if t.feedback == "thumbs_down"]
pairs = []
for rejected_turn in thumbs_down:
# Find best matching positive example (simplified: same conversation)
candidates = [
t for t in thumbs_up
if t.conversation_id != rejected_turn.conversation_id
]
if not candidates:
continue
# In production, use embedding similarity to match prompts
# Here we use the first candidate as a simplified example
chosen_turn = candidates[0]
pairs.append(PreferencePair(
prompt=redact_pii(rejected_turn.user_message),
chosen=redact_pii(chosen_turn.assistant_message),
rejected=redact_pii(rejected_turn.assistant_message),
source="feedback",
margin=1.0,
))
return pairs
def build_preference_pairs_from_regeneration(
turns: list[ConversationTurn],
) -> list[PreferencePair]:
"""Create DPO pairs from regenerated responses.
When a user regenerates a response, the original is 'rejected'
and the regeneration is 'chosen' (the user continued the conversation
with the regenerated version).
"""
convos: dict[str, list[ConversationTurn]] = {}
for turn in turns:
convos.setdefault(turn.conversation_id, []).append(turn)
pairs = []
for conv_turns in convos.values():
conv_turns.sort(key=lambda t: t.timestamp)
for i in range(1, len(conv_turns)):
prev, curr = conv_turns[i - 1], conv_turns[i]
# Detect regeneration: same user message, different response
if (prev.user_message == curr.user_message
and prev.assistant_message != curr.assistant_message):
pairs.append(PreferencePair(
prompt=redact_pii(prev.user_message),
chosen=redact_pii(curr.assistant_message),
rejected=redact_pii(prev.assistant_message),
source="regeneration",
margin=0.7, # weaker signal than explicit feedback
))
return pairs
Three common mistakes undermine preference dataset quality. First, position bias: if chosen responses are always generated by a stronger model and rejected by a weaker one, the preference signal may reflect model capability rather than response quality for the specific prompt. Second, length bias: longer responses often receive higher ratings regardless of content quality, so include length-controlled pairs where the shorter response is preferred. Third, temporal contamination: user expectations shift over time as they learn what the model can do, so preference data from month one may not reflect the quality standards of month six.
4. Tool-Use Dataset Design
Fine-tuning models for tool calling requires datasets that capture the full lifecycle of a tool interaction: the user intent, the model's decision to invoke a tool, the structured tool call, the tool's response, and the model's synthesis of the tool output into a final answer. Unlike standard instruction/response pairs, tool-use datasets must encode function schemas, argument validation, error handling, and multi-step tool chains.
The most effective approach extracts tool-use examples from production traces where the model successfully completed a task using tools. These examples capture real argument patterns, realistic error conditions, and natural user intents that synthetic generation often misses.
# Code Fragment 12.8.9: Extracting tool-use training examples from production traces
def extract_tool_use_examples(
turns: list[ConversationTurn],
tool_schemas: dict[str, dict],
) -> list[dict]:
"""Build tool-use fine-tuning examples from production tool call logs.
Each example includes the user query, available tool definitions,
the model's tool call(s), tool response(s), and final answer.
"""
examples = []
for turn in turns:
if not turn.tool_calls:
continue
# Validate tool calls against known schemas
valid_calls = []
for call in turn.tool_calls:
fn_name = call.get("function", {}).get("name", "")
if fn_name in tool_schemas:
valid_calls.append(call)
if not valid_calls:
continue
# Build the training example in OpenAI function-calling format
example = {
"messages": [
{
"role": "user",
"content": redact_pii(turn.user_message),
},
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call.get("id", f"call_{i}"),
"type": "function",
"function": {
"name": call["function"]["name"],
"arguments": json.dumps(call["function"]["arguments"]),
},
}
for i, call in enumerate(valid_calls)
],
},
],
"tools": [
{"type": "function", "function": tool_schemas[c["function"]["name"]]}
for c in valid_calls
],
}
# Add tool responses if available
for call in valid_calls:
if "response" in call:
example["messages"].append({
"role": "tool",
"tool_call_id": call.get("id", "call_0"),
"content": json.dumps(call["response"]),
})
# Add final assistant response
if turn.assistant_message:
example["messages"].append({
"role": "assistant",
"content": redact_pii(turn.assistant_message),
})
examples.append(example)
return examples
5. Data Contracts and Schema Design
A data contract is a formal specification of what a dataset must contain, how it is structured, and what quality criteria it must meet before entering a training pipeline. Without data contracts, dataset engineering devolves into ad hoc scripting where every team member makes slightly different formatting decisions, validation rules drift over time, and breaking changes propagate silently into model training.
The contract should specify the schema (field names, types, constraints), validation rules (minimum/maximum lengths, required fields, format patterns), versioning policy (how schema changes are tracked), and quality thresholds (minimum examples per category, maximum duplication rate, required feedback coverage).
# Code Fragment 12.8.6: Enforcing data contracts with Pydantic schemas
from pydantic import BaseModel, Field, field_validator
from enum import Enum
class DatasetFormat(str, Enum):
INSTRUCTION = "instruction"
CHATML = "chatml"
DPO = "dpo"
TOOL_USE = "tool_use"
class InstructionExample(BaseModel):
"""Schema for a single instruction fine-tuning example."""
instruction: str = Field(min_length=10, max_length=4096)
response: str = Field(min_length=10, max_length=8192)
source: str = Field(pattern=r"^(production_logs|synthetic|human_annotated)$")
category: str | None = None
difficulty: int = Field(default=1, ge=1, le=5)
@field_validator("instruction")
@classmethod
def instruction_not_empty_after_strip(cls, v: str) -> str:
if not v.strip():
raise ValueError("Instruction must contain non-whitespace content")
return v.strip()
@field_validator("response")
@classmethod
def response_not_boilerplate(cls, v: str) -> str:
boilerplate = ["I cannot help with that", "As an AI language model"]
for phrase in boilerplate:
if v.strip().startswith(phrase):
raise ValueError(f"Response appears to be boilerplate: starts with '{phrase}'")
return v
class DPOExample(BaseModel):
"""Schema for a DPO preference pair."""
prompt: str = Field(min_length=10, max_length=4096)
chosen: str = Field(min_length=10, max_length=8192)
rejected: str = Field(min_length=10, max_length=8192)
margin: float = Field(ge=0.0, le=2.0)
@field_validator("rejected")
@classmethod
def chosen_and_rejected_differ(cls, v: str, info) -> str:
if "chosen" in info.data and v.strip() == info.data["chosen"].strip():
raise ValueError("Chosen and rejected responses must differ")
return v
class DatasetManifest(BaseModel):
"""Top-level manifest validating an entire dataset release."""
version: str = Field(pattern=r"^\d+\.\d+\.\d+$")
format: DatasetFormat
num_examples: int = Field(ge=100)
created_at: datetime
sources: list[str]
quality_metrics: dict[str, float] # e.g. {"dedup_rate": 0.02, "avg_length": 342}
Data contracts should be versioned alongside your code using semantic versioning. A field rename is a breaking change (major version bump). Adding an optional field is a minor change. Tightening a validation rule is a patch. Store contracts in your repository and validate every dataset against the contract in CI before it can be used for training. This prevents the silent data quality regressions that are notoriously difficult to debug after a model has already been trained.
6. Quality Filtering
Raw datasets extracted from production logs contain noise at every level: duplicate entries from retry logic, toxic content from adversarial users, trivially easy examples that waste training compute, and excessively long responses that skew length distributions. Quality filtering removes these problems through a multi-stage pipeline that applies deduplication, toxicity detection, difficulty calibration, and length balancing in sequence.
The order of operations matters. Deduplication should come first because it reduces the volume of data that downstream filters must process. Toxicity filtering comes second because toxic examples should never enter the difficulty calibration stage where they might be scored as "hard" and preserved. Length balancing comes last because earlier stages may shift the length distribution.
# Code Fragment 12.8.3: Multi-stage quality filtering pipeline
import hashlib
from collections import Counter
class QualityFilter:
"""Multi-stage quality filtering for LLM training datasets."""
def __init__(self, toxicity_threshold: float = 0.7):
self.toxicity_threshold = toxicity_threshold
self.seen_hashes: set[str] = set()
self.stats: Counter = Counter()
def content_hash(self, text: str) -> str:
"""Normalize and hash text for deduplication."""
normalized = " ".join(text.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def deduplicate(self, examples: list[dict], key: str = "instruction") -> list[dict]:
"""Remove exact and near-duplicate examples."""
unique = []
for ex in examples:
h = self.content_hash(ex[key])
if h not in self.seen_hashes:
self.seen_hashes.add(h)
unique.append(ex)
else:
self.stats["duplicates_removed"] += 1
return unique
def filter_toxicity(self, examples: list[dict]) -> list[dict]:
"""Remove examples with toxic content using a classifier.
In production, replace this with a real toxicity classifier
such as Perspective API or a fine-tuned model.
"""
toxic_patterns = [
"ignore previous instructions",
"you are now",
"jailbreak",
]
clean = []
for ex in examples:
text = f"{ex.get('instruction', '')} {ex.get('response', '')}"
if any(p in text.lower() for p in toxic_patterns):
self.stats["toxic_removed"] += 1
else:
clean.append(ex)
return clean
def filter_difficulty(
self, examples: list[dict], min_response_words: int = 20
) -> list[dict]:
"""Remove trivially easy examples (very short responses)."""
filtered = []
for ex in examples:
word_count = len(ex.get("response", "").split())
if word_count >= min_response_words:
filtered.append(ex)
else:
self.stats["trivial_removed"] += 1
return filtered
def balance_lengths(
self,
examples: list[dict],
max_per_bucket: int = 5000,
buckets: list[tuple[int, int]] = None,
) -> list[dict]:
"""Balance the response length distribution across buckets."""
if buckets is None:
buckets = [(0, 100), (100, 300), (300, 700), (700, 1500), (1500, 99999)]
bucket_contents: dict[int, list[dict]] = {i: [] for i in range(len(buckets))}
for ex in examples:
word_count = len(ex.get("response", "").split())
for i, (lo, hi) in enumerate(buckets):
if lo <= word_count < hi:
bucket_contents[i].append(ex)
break
balanced = []
for i, items in bucket_contents.items():
balanced.extend(items[:max_per_bucket])
overflow = max(0, len(items) - max_per_bucket)
self.stats[f"bucket_{i}_overflow"] += overflow
return balanced
def run_pipeline(self, examples: list[dict]) -> list[dict]:
"""Execute the full quality filtering pipeline in order."""
self.stats.clear()
self.seen_hashes.clear()
result = self.deduplicate(examples)
result = self.filter_toxicity(result)
result = self.filter_difficulty(result)
result = self.balance_lengths(result)
self.stats["final_count"] = len(result)
self.stats["original_count"] = len(examples)
return result
7. Data Mixing Strategies
A fine-tuning dataset is rarely a single monolithic collection. In practice, it is a mixture of instruction types (coding, writing, analysis, conversation), difficulty levels (simple lookups through complex multi-step reasoning), and data sources (production logs, synthetic generation, human annotation). The proportions in this mixture directly affect model behavior: over-representing coding examples produces a model that tries to write code for every query, while under-representing refusal examples produces a model that attempts dangerous tasks.
The mixing strategy defines the sampling weights for each category. Two approaches dominate the literature. Proportional mixing samples each category in proportion to its natural frequency in production traffic, producing a model that mirrors real usage patterns. Stratified mixing oversamples rare but important categories (safety refusals, multi-step tool use, complex reasoning) to ensure the model handles edge cases well, even if they represent a small fraction of production traffic.
Data mixing is nutritional planning for your model. A diet of only desserts (easy, common queries) produces a model that is pleasant but incapable of anything demanding. A diet of only protein (hard reasoning tasks) produces a model that over-thinks simple questions. The goal is a balanced mix where every category is represented in proportion to its importance (not its frequency). Just as a nutritionist adjusts macros based on an athlete's goals, you adjust mixing ratios based on your model's intended use case. A customer support model needs heavier conversation and refusal ratios; a coding assistant needs heavier code and tool-use ratios.
Lab: Production Logs to DPO Dataset
This lab walks through the complete pipeline: loading production chat logs, extracting conversation turns, building preference pairs, applying quality filters, and producing a validated DPO training dataset. The output is a JSONL file ready for training with the TRL library's DPO trainer.
# Code Fragment 12.8.4: End-to-end lab: production logs to DPO dataset
from pathlib import Path
import json
import random
def full_pipeline(
log_dir: Path,
output_path: Path,
max_examples: int = 10000,
seed: int = 42,
):
"""Convert production logs into a validated DPO training dataset.
Steps:
1. Extract conversation turns from JSONL logs
2. Build preference pairs from feedback and regeneration signals
3. Apply quality filtering (dedup, toxicity, length balance)
4. Validate against data contract schema
5. Write validated JSONL output
"""
random.seed(seed)
# Step 1: Extract turns from all log files
all_turns = []
for log_file in sorted(log_dir.glob("*.jsonl")):
all_turns.extend(extract_turns_from_jsonl(log_file))
print(f"Step 1: Extracted {len(all_turns)} turns from {len(list(log_dir.glob('*.jsonl')))} files")
# Step 2: Build preference pairs from multiple signals
feedback_pairs = build_preference_pairs_from_feedback(all_turns)
regen_pairs = build_preference_pairs_from_regeneration(all_turns)
all_pairs = feedback_pairs + regen_pairs
print(f"Step 2: Built {len(all_pairs)} preference pairs "
f"({len(feedback_pairs)} feedback, {len(regen_pairs)} regeneration)")
# Step 3: Convert to dict format for filtering
pair_dicts = [
{
"instruction": p.prompt, # using 'instruction' key for dedup
"prompt": p.prompt,
"chosen": p.chosen,
"rejected": p.rejected,
"response": p.chosen, # for length filtering
"source": p.source,
"margin": p.margin,
}
for p in all_pairs
]
quality_filter = QualityFilter()
filtered = quality_filter.run_pipeline(pair_dicts)
print(f"Step 3: Quality filter stats: {dict(quality_filter.stats)}")
# Step 4: Validate against DPO schema
validated = []
validation_errors = 0
for item in filtered[:max_examples]:
try:
example = DPOExample(
prompt=item["prompt"],
chosen=item["chosen"],
rejected=item["rejected"],
margin=item["margin"],
)
validated.append(example.model_dump())
except Exception:
validation_errors += 1
print(f"Step 4: Validated {len(validated)} examples ({validation_errors} failed validation)")
# Step 5: Write output with manifest
random.shuffle(validated)
with open(output_path, "w") as f:
for example in validated:
f.write(json.dumps(example) + "\n")
manifest = DatasetManifest(
version="1.0.0",
format=DatasetFormat.DPO,
num_examples=len(validated),
created_at=datetime.now(),
sources=["production_logs"],
quality_metrics={
"dedup_rate": quality_filter.stats.get("duplicates_removed", 0) / max(len(pair_dicts), 1),
"toxic_rate": quality_filter.stats.get("toxic_removed", 0) / max(len(pair_dicts), 1),
"validation_pass_rate": len(validated) / max(len(filtered), 1),
},
)
manifest_path = output_path.with_suffix(".manifest.json")
with open(manifest_path, "w") as f:
f.write(manifest.model_dump_json(indent=2))
print(f"Step 5: Wrote {len(validated)} examples to {output_path}")
print(f" Manifest written to {manifest_path}")
return validated
# Run the pipeline
# dataset = full_pipeline(Path("logs/"), Path("datasets/dpo_v1.jsonl"))
Production logs often contain personally identifiable information (PII), proprietary business data, and content subject to data retention policies. Before extracting training data from production logs, verify that your data processing agreement covers model training use cases, implement robust PII redaction (not just regex patterns; consider named entity recognition for addresses and names), obtain legal review for any data that crosses jurisdictional boundaries, and maintain an audit trail mapping each training example back to its source log entry with the user's consent status. The GDPR right to erasure means you may need to retrain models if a user requests deletion of their data.
