Fine-Tuning LLMs for Custom Agent Behaviors - Part 1: Preparing Training Data

π Fine-Tuning LLMs for Custom Agent Behaviors
View All Parts in This Series
Ad Space
Fine-Tuning LLMs for Custom Agent Behaviors - Part 1: Preparing Training Data
Fine-tuning large language models is one of the most powerful techniques for creating AI agents with specialized behaviors and domain expertise. However, the success of your fine-tuned model depends entirely on the quality of your training data. Poor data leads to poor performance, while high-quality, well-structured training data can transform a general-purpose LLM into a domain expert.
In this comprehensive guide, you'll master the critical first step of fine-tuning: preparing training data that produces exceptional results.
What You'll Learn in This Tutorial
By the end of this tutorial, you'll have:
- β Deep understanding of training data requirements for different LLM providers
- β Complete data preparation pipeline with validation and quality checks
- β Professional-grade tools for data collection, cleaning, and formatting
- β Quality assurance framework to ensure optimal training outcomes
- β Best practices for prompt engineering and completion writing
- β Automated workflows for large-scale data processing
Estimated Time: 40-45 minutes
Understanding Fine-Tuning Data Requirements
Before diving into data preparation, it's crucial to understand what makes training data effective for fine-tuning.
The Foundation of Successful Fine-Tuning
Why Data Quality Matters:
- Model Behavior: Your model will mimic the patterns in your training data
- Performance: High-quality data leads to better accuracy and fewer errors
- Consistency: Well-formatted data ensures predictable model responses
- Efficiency: Good data reduces training time and computational costs
Fine-Tuning vs. Pre-Training Data
Pre-training data (used to create base models like GPT-4):
- Massive scale (trillions of tokens)
- General web content, books, articles
- Focuses on language understanding
Fine-tuning data (what we're preparing):
- Smaller scale (hundreds to thousands of examples)
- Task-specific, high-quality examples
- Focuses on specific behaviors and responses
Understanding Different Fine-Tuning Approaches
1. Instruction Following
- Teaches the model to follow specific instructions
- Format: Instruction β Response
- Best for: Task-specific agents, specialized workflows
2. Conversational Behavior
- Teaches the model conversation patterns
- Format: Multi-turn dialogue
- Best for: Chatbots, customer service agents
3. Domain Expertise
- Teaches domain-specific knowledge and reasoning
- Format: Question β Expert answer
- Best for: Medical, legal, technical assistants
Step 1: Data Collection Strategy
The first step is gathering raw data that represents the behaviors you want your model to learn.
Understanding Your Use Case
Before collecting data, clearly define your fine-tuning objectives:
# data_collection/use_case_definition.py
class FineTuningUseCase:
"""Define your fine-tuning objectives and requirements"""
def __init__(self):
self.objective = ""
self.target_behaviors = []
self.domain_expertise = []
self.conversation_style = ""
self.quality_requirements = {}
def define_objective(self, objective: str, behaviors: list, domain: list):
"""
Define what you want your model to achieve
Args:
objective: Primary goal (e.g., "Customer support agent for SaaS product")
behaviors: Specific behaviors (e.g., ["helpful", "concise", "technical"])
domain: Domain expertise (e.g., ["software troubleshooting", "billing"])
"""
self.objective = objective
self.target_behaviors = behaviors
self.domain_expertise = domain
print(f"π Objective: {objective}")
print(f"π― Target Behaviors: {', '.join(behaviors)}")
print(f"π§ Domain Expertise: {', '.join(domain)}")
def estimate_data_requirements(self):
"""Estimate how much training data you'll need"""
base_examples = 100 # Minimum for basic fine-tuning
# Add examples based on complexity
behavior_multiplier = len(self.target_behaviors) * 20
domain_multiplier = len(self.domain_expertise) * 50
estimated_examples = base_examples + behavior_multiplier + domain_multiplier
return {
"minimum_examples": estimated_examples,
"recommended_examples": estimated_examples * 2,
"optimal_examples": estimated_examples * 5
}
# Example usage
use_case = FineTuningUseCase()
use_case.define_objective(
objective="Technical support agent for AI development tools",
behaviors=["helpful", "technically accurate", "code-focused", "step-by-step"],
domain=["Python programming", "AI/ML libraries", "debugging", "best practices"]
)
requirements = use_case.estimate_data_requirements()
print(f"\nπ Data Requirements:")
print(f"Minimum: {requirements['minimum_examples']} examples")
print(f"Recommended: {requirements['recommended_examples']} examples")
print(f"Optimal: {requirements['optimal_examples']} examples")
Data Source Identification
Internal Sources (Highest Quality):
# data_collection/internal_sources.py
class InternalDataCollector:
"""Collect high-quality data from internal sources"""
def __init__(self):
self.sources = {
"customer_support": [],
"documentation": [],
"expert_knowledge": [],
"existing_workflows": []
}
def collect_customer_interactions(self, chat_logs_path: str):
"""
Extract valuable interactions from customer support logs
This is often your highest-quality data source because it represents
real user needs and expert responses.
"""
import json
import re
valuable_interactions = []
with open(chat_logs_path, 'r') as f:
logs = json.load(f)
for interaction in logs:
# Filter for high-quality interactions
if self._is_high_quality_interaction(interaction):
formatted_example = {
"instruction": interaction["user_message"],
"response": interaction["agent_response"],
"context": interaction.get("context", ""),
"quality_score": self._calculate_quality_score(interaction)
}
valuable_interactions.append(formatted_example)
return valuable_interactions
def _is_high_quality_interaction(self, interaction: dict) -> bool:
"""Determine if an interaction is suitable for training"""
# Check message length (not too short, not too long)
user_msg = interaction["user_message"]
agent_msg = interaction["agent_response"]
if len(user_msg) < 10 or len(user_msg) > 1000:
return False
if len(agent_msg) < 20 or len(agent_msg) > 2000:
return False
# Check for resolution indicators
satisfaction_indicators = [
"thank you", "solved", "fixed", "working now",
"that helped", "perfect", "exactly what I needed"
]
# Check for quality indicators
if any(indicator in interaction.get("follow_up", "").lower()
for indicator in satisfaction_indicators):
return True
# Check agent response quality
quality_indicators = [
"step-by-step", "here's how", "try this", "solution",
"example", "code", "documentation"
]
return any(indicator in agent_msg.lower() for indicator in quality_indicators)
def _calculate_quality_score(self, interaction: dict) -> float:
"""Calculate a quality score for prioritizing examples"""
score = 0.5 # Base score
# Bonus for resolution
if "resolved" in interaction.get("status", "").lower():
score += 0.3
# Bonus for code examples
if "```" in interaction["agent_response"]:
score += 0.2
# Bonus for detailed explanations
if len(interaction["agent_response"]) > 200:
score += 0.1
return min(score, 1.0)
# Example usage
collector = InternalDataCollector()
training_examples = collector.collect_customer_interactions("support_logs.json")
print(f"π Collected {len(training_examples)} high-quality examples")
External Sources (Supplement Your Data):
# data_collection/external_sources.py
import requests
import json
from typing import List, Dict
class ExternalDataCollector:
"""Collect supplementary data from external sources"""
def __init__(self):
self.rate_limits = {
"github": 5000, # requests per hour
"stackoverflow": 300, # requests per day
"reddit": 60 # requests per minute
}
def collect_github_issues(self, repo: str, labels: List[str]) -> List[Dict]:
"""
Collect resolved GitHub issues as training examples
Args:
repo: Repository in format "owner/repo"
labels: Labels to filter by (e.g., ["bug", "question"])
"""
examples = []
# GitHub API endpoint
url = f"https://api.github.com/repos/{repo}/issues"
params = {
"state": "closed",
"labels": ",".join(labels),
"per_page": 100,
"sort": "updated",
"direction": "desc"
}
headers = {
"Accept": "application/vnd.github.v3+json",
# Add your GitHub token here for higher rate limits
# "Authorization": "token YOUR_GITHUB_TOKEN"
}
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
issues = response.json()
for issue in issues:
if self._is_valuable_issue(issue):
example = {
"instruction": f"Help with: {issue['title']}\n\n{issue['body'][:500]}",
"response": self._extract_solution(issue),
"source": f"GitHub-{repo}",
"quality_score": self._score_github_issue(issue)
}
examples.append(example)
print(f"π₯ Collected {len(examples)} examples from {repo}")
return examples
except requests.exceptions.RequestException as e:
print(f"β Error collecting from GitHub: {e}")
return []
def _is_valuable_issue(self, issue: Dict) -> bool:
"""Check if a GitHub issue is valuable for training"""
# Must have comments (indicating discussion/solution)
if issue["comments"] < 2:
return False
# Must be reasonably sized
if not issue["body"] or len(issue["body"]) < 50:
return False
# Filter out certain issue types
skip_keywords = ["duplicate", "invalid", "wontfix", "spam"]
title_lower = issue["title"].lower()
return not any(keyword in title_lower for keyword in skip_keywords)
def _extract_solution(self, issue: Dict) -> str:
"""Extract the solution from issue comments"""
# This is a simplified version - in practice, you'd use
# the GitHub API to fetch comments and identify the accepted solution
return f"Based on the issue '{issue['title']}', here's a structured approach to solve this problem:\n\n[This would contain the actual solution from comments]"
def _score_github_issue(self, issue: Dict) -> float:
"""Score GitHub issue quality"""
score = 0.5
# More comments usually indicate better discussion
if issue["comments"] > 5:
score += 0.2
# Issues with reactions show community engagement
if issue.get("reactions", {}).get("total_count", 0) > 3:
score += 0.1
# Longer issues often have more context
if len(issue.get("body", "")) > 200:
score += 0.2
return min(score, 1.0)
# Example usage
external_collector = ExternalDataCollector()
github_examples = external_collector.collect_github_issues(
repo="openai/openai-python",
labels=["question", "bug"]
)
print(f"Collected {len(github_examples)} examples from GitHub")
Step 2: Data Cleaning and Preprocessing
Raw data needs significant cleaning before it's suitable for training. Here's a comprehensive cleaning pipeline:
Text Cleaning and Normalization
# data_processing/text_cleaner.py
import re
import html
import unicodedata
from typing import List, Dict, Optional
import logging
class TextCleaner:
"""Comprehensive text cleaning for fine-tuning data"""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Define cleaning patterns
self.patterns = {
# Remove HTML tags
'html_tags': re.compile(r'<[^>]+>'),
# Remove URLs (but preserve in special cases)
'urls': re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'),
# Remove email addresses
'emails': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
# Remove excessive whitespace
'excess_whitespace': re.compile(r'\s+'),
# Remove special characters (but preserve important ones)
'special_chars': re.compile(r'[^\w\s\.\,\!\?\;\:\'\"\-\(\)\[\]\{\}]'),
# Remove repeated characters (e.g., "helloooo" -> "hello")
'repeated_chars': re.compile(r'(.)\1{2,}')
}
def clean_text(self, text: str, preserve_code: bool = True) -> str:
"""
Comprehensive text cleaning pipeline
Args:
text: Raw text to clean
preserve_code: Whether to preserve code blocks
Returns:
Cleaned text suitable for training
"""
if not text or not isinstance(text, str):
return ""
original_text = text
# Step 1: Handle code blocks (preserve them)
code_blocks = []
if preserve_code:
text, code_blocks = self._extract_code_blocks(text)
# Step 2: Decode HTML entities
text = html.unescape(text)
# Step 3: Normalize Unicode characters
text = unicodedata.normalize('NFKC', text)
# Step 4: Remove HTML tags
text = self.patterns['html_tags'].sub('', text)
# Step 5: Handle URLs (remove or replace with placeholder)
text = self._handle_urls(text)
# Step 6: Remove email addresses
text = self.patterns['emails'].sub('[EMAIL]', text)
# Step 7: Fix repeated characters
text = self.patterns['repeated_chars'].sub(r'\1\1', text)
# Step 8: Clean excessive whitespace
text = self.patterns['excess_whitespace'].sub(' ', text)
# Step 9: Remove leading/trailing whitespace
text = text.strip()
# Step 10: Restore code blocks
if preserve_code and code_blocks:
text = self._restore_code_blocks(text, code_blocks)
# Step 11: Validate cleaning quality
quality_score = self._assess_cleaning_quality(original_text, text)
if quality_score < 0.5:
self.logger.warning(f"Low cleaning quality score: {quality_score}")
return text
def _extract_code_blocks(self, text: str) -> tuple:
"""Extract and preserve code blocks"""
code_blocks = []
pattern = r'```[\s\S]*?```|`[^`]+`'
def replace_code(match):
code_blocks.append(match.group(0))
return f"__CODE_BLOCK_{len(code_blocks)-1}__"
cleaned_text = re.sub(pattern, replace_code, text)
return cleaned_text, code_blocks
def _restore_code_blocks(self, text: str, code_blocks: List[str]) -> str:
"""Restore preserved code blocks"""
for i, code_block in enumerate(code_blocks):
text = text.replace(f"__CODE_BLOCK_{i}__", code_block)
return text
def _handle_urls(self, text: str) -> str:
"""Handle URLs intelligently"""
def replace_url(match):
url = match.group(0)
# Preserve documentation URLs
if any(domain in url for domain in ['docs.', 'documentation', 'github.com']):
return url
# Replace other URLs with placeholder
return '[URL]'
return self.patterns['urls'].sub(replace_url, text)
def _assess_cleaning_quality(self, original: str, cleaned: str) -> float:
"""Assess the quality of text cleaning"""
# Check length ratio (shouldn't lose too much content)
length_ratio = len(cleaned) / max(len(original), 1)
# Check if essential content is preserved
important_indicators = ['?', '.', '!', 'how', 'what', 'why', 'when']
original_indicators = sum(1 for indicator in important_indicators
if indicator.lower() in original.lower())
cleaned_indicators = sum(1 for indicator in important_indicators
if indicator.lower() in cleaned.lower())
content_preservation = (cleaned_indicators / max(original_indicators, 1)) if original_indicators > 0 else 1.0
# Overall quality score
quality_score = (length_ratio * 0.6) + (content_preservation * 0.4)
return min(quality_score, 1.0)
def batch_clean(self, texts: List[str], preserve_code: bool = True) -> List[str]:
"""Clean a batch of texts efficiently"""
return [self.clean_text(text, preserve_code) for text in texts]
# Usage example
cleaner = TextCleaner()
# Test cleaning
raw_text = """
<p>Hello!!! This is a test with <strong>HTML</strong> tags and
some URLs like https://example.com and emails like test@example.com.</p>
Here's some code:
```python
def hello():
print("Hello, world!")
And some repeated charactersssss!!!! """
cleaned = cleaner.clean_text(raw_text) print("Original:") print(raw_text) print("\nCleaned:") print(cleaned)
### Data Validation and Quality Control
```python
# data_processing/quality_validator.py
from typing import List, Dict, Tuple
import re
import json
from dataclasses import dataclass
@dataclass
class QualityMetrics:
"""Data quality metrics for training examples"""
length_score: float
diversity_score: float
coherence_score: float
completeness_score: float
overall_score: float
class DataQualityValidator:
"""Validate and score training data quality"""
def __init__(self):
self.min_instruction_length = 10
self.max_instruction_length = 1000
self.min_response_length = 20
self.max_response_length = 2000
# Common quality indicators
self.quality_indicators = {
'positive': [
'step-by-step', 'example', 'specifically', 'detailed',
'solution', 'approach', 'method', 'technique',
'implementation', 'code', 'documentation'
],
'negative': [
'unclear', 'confusing', 'maybe', 'probably',
'not sure', 'might be', 'could be'
]
}
def validate_example(self, example: Dict) -> Tuple[bool, QualityMetrics, List[str]]:
"""
Validate a single training example
Args:
example: Dictionary with 'instruction' and 'response' keys
Returns:
Tuple of (is_valid, quality_metrics, issues)
"""
issues = []
# Extract instruction and response
instruction = example.get('instruction', '')
response = example.get('response', '')
if not instruction or not response:
issues.append("Missing instruction or response")
return False, QualityMetrics(0, 0, 0, 0, 0), issues
# Validate lengths
length_score = self._validate_lengths(instruction, response, issues)
# Check diversity (uniqueness)
diversity_score = self._check_diversity(instruction, response)
# Check coherence (instruction-response alignment)
coherence_score = self._check_coherence(instruction, response, issues)
# Check completeness
completeness_score = self._check_completeness(response, issues)
# Calculate overall score
overall_score = (
length_score * 0.2 +
diversity_score * 0.2 +
coherence_score * 0.3 +
completeness_score * 0.3
)
metrics = QualityMetrics(
length_score=length_score,
diversity_score=diversity_score,
coherence_score=coherence_score,
completeness_score=completeness_score,
overall_score=overall_score
)
# Determine if example is valid
is_valid = overall_score >= 0.6 and len(issues) == 0
return is_valid, metrics, issues
def _validate_lengths(self, instruction: str, response: str, issues: List[str]) -> float:
"""Validate text lengths"""
score = 1.0
# Check instruction length
if len(instruction) < self.min_instruction_length:
issues.append(f"Instruction too short ({len(instruction)} < {self.min_instruction_length})")
score *= 0.5
elif len(instruction) > self.max_instruction_length:
issues.append(f"Instruction too long ({len(instruction)} > {self.max_instruction_length})")
score *= 0.7
# Check response length
if len(response) < self.min_response_length:
issues.append(f"Response too short ({len(response)} < {self.min_response_length})")
score *= 0.5
elif len(response) > self.max_response_length:
issues.append(f"Response too long ({len(response)} > {self.max_response_length})")
score *= 0.7
return score
def _check_diversity(self, instruction: str, response: str) -> float:
"""Check lexical diversity of the text"""
# Combine instruction and response
full_text = f"{instruction} {response}".lower()
words = re.findall(r'\b\w+\b', full_text)
if len(words) == 0:
return 0.0
# Calculate unique word ratio
unique_words = len(set(words))
total_words = len(words)
diversity_ratio = unique_words / total_words
# Normalize to 0-1 scale (0.5 is typical for natural text)
return min(diversity_ratio * 2, 1.0)
def _check_coherence(self, instruction: str, response: str, issues: List[str]) -> float:
"""Check if response addresses the instruction"""
# Extract key terms from instruction
instruction_words = set(re.findall(r'\b\w+\b', instruction.lower()))
response_words = set(re.findall(r'\b\w+\b', response.lower()))
# Remove common words
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
instruction_words -= common_words
response_words -= common_words
if not instruction_words:
return 0.5 # Neutral score if no meaningful words
# Calculate overlap
overlap = len(instruction_words & response_words)
coherence_score = overlap / len(instruction_words)
# Check for quality indicators
positive_count = sum(1 for indicator in self.quality_indicators['positive']
if indicator in response.lower())
negative_count = sum(1 for indicator in self.quality_indicators['negative']
if indicator in response.lower())
# Adjust score based on quality indicators
if positive_count > 0:
coherence_score += 0.2 * positive_count
if negative_count > 0:
coherence_score -= 0.3 * negative_count
issues.append(f"Response contains {negative_count} negative quality indicators")
return min(max(coherence_score, 0.0), 1.0)
def _check_completeness(self, response: str, issues: List[str]) -> float:
"""Check if response is complete and helpful"""
completeness_score = 0.5 # Base score
# Check for complete sentences
sentences = re.split(r'[.!?]+', response)
complete_sentences = [s for s in sentences if len(s.strip()) > 5]
if len(complete_sentences) >= 2:
completeness_score += 0.2
# Check for explanatory elements
explanatory_patterns = [
r'\b(because|since|due to|as a result)\b', # Explanations
r'\b(first|second|then|next|finally)\b', # Step-by-step
r'\b(for example|such as|like)\b', # Examples
r'```.*```', # Code examples
r'\b(solution|approach|method)\b' # Solution-oriented
]
for pattern in explanatory_patterns:
if re.search(pattern, response, re.IGNORECASE | re.DOTALL):
completeness_score += 0.1
# Check for incomplete endings
incomplete_endings = ['...', 'etc.', 'and so on']
if any(ending in response.lower() for ending in incomplete_endings):
completeness_score -= 0.2
issues.append("Response appears incomplete")
return min(completeness_score, 1.0)
def validate_dataset(self, examples: List[Dict]) -> Dict:
"""Validate entire dataset and provide summary"""
valid_examples = []
invalid_examples = []
all_metrics = []
all_issues = []
for i, example in enumerate(examples):
is_valid, metrics, issues = self.validate_example(example)
all_metrics.append(metrics)
if is_valid:
valid_examples.append((i, example))
else:
invalid_examples.append((i, example, issues))
all_issues.extend(issues)
# Calculate dataset-level statistics
avg_metrics = self._calculate_average_metrics(all_metrics)
return {
'total_examples': len(examples),
'valid_examples': len(valid_examples),
'invalid_examples': len(invalid_examples),
'validity_rate': len(valid_examples) / len(examples),
'average_metrics': avg_metrics,
'common_issues': self._analyze_common_issues(all_issues),
'valid_data': [example for _, example in valid_examples],
'invalid_data': invalid_examples
}
def _calculate_average_metrics(self, metrics_list: List[QualityMetrics]) -> Dict:
"""Calculate average quality metrics"""
if not metrics_list:
return {}
return {
'length_score': sum(m.length_score for m in metrics_list) / len(metrics_list),
'diversity_score': sum(m.diversity_score for m in metrics_list) / len(metrics_list),
'coherence_score': sum(m.coherence_score for m in metrics_list) / len(metrics_list),
'completeness_score': sum(m.completeness_score for m in metrics_list) / len(metrics_list),
'overall_score': sum(m.overall_score for m in metrics_list) / len(metrics_list)
}
def _analyze_common_issues(self, issues: List[str]) -> Dict[str, int]:
"""Analyze common issues in the dataset"""
issue_counts = {}
for issue in issues:
# Categorize issues
if "too short" in issue:
category = "Length Issues - Too Short"
elif "too long" in issue:
category = "Length Issues - Too Long"
elif "incomplete" in issue:
category = "Completeness Issues"
elif "negative quality" in issue:
category = "Quality Issues"
else:
category = "Other Issues"
issue_counts[category] = issue_counts.get(category, 0) + 1
return issue_counts
# Usage example
validator = DataQualityValidator()
# Example training data
sample_data = [
{
"instruction": "How do I install Python packages?",
"response": "You can install Python packages using pip. Run 'pip install package_name' in your terminal. For example, to install requests: pip install requests"
},
{
"instruction": "Fix bug", # Too short
"response": "Try debugging..." # Too short and incomplete
}
]
# Validate the dataset
results = validator.validate_dataset(sample_data)
print(f"π Dataset Validation Results:")
print(f"Total examples: {results['total_examples']}")
print(f"Valid examples: {results['valid_examples']}")
print(f"Validity rate: {results['validity_rate']:.2%}")
print(f"Average quality score: {results['average_metrics']['overall_score']:.2f}")
Step 3: Data Formatting for Different Platforms
Once your data is clean and validated, you need to format it correctly for your chosen fine-tuning platform.
OpenAI Fine-Tuning Format
OpenAI expects data in JSONL format with specific structure:
# data_formatting/openai_formatter.py
import json
import tiktoken
from typing import List, Dict, Optional
class OpenAIFormatter:
"""Format data for OpenAI fine-tuning"""
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.model_name = model_name
self.tokenizer = tiktoken.encoding_for_model(model_name)
# OpenAI fine-tuning limits
self.max_tokens_per_example = 4096
self.recommended_examples = {"minimum": 10, "recommended": 50, "optimal": 100}
def format_for_openai(self, examples: List[Dict], output_file: str) -> Dict:
"""
Format examples for OpenAI fine-tuning
Args:
examples: List of {"instruction": str, "response": str} dictionaries
output_file: Output JSONL file path
Returns:
Formatting statistics and information
"""
formatted_examples = []
stats = {
"total_examples": len(examples),
"formatted_examples": 0,
"skipped_examples": 0,
"total_tokens": 0,
"average_tokens": 0,
"issues": []
}
for i, example in enumerate(examples):
try:
formatted_example = self._format_single_example(example)
if formatted_example:
# Count tokens
token_count = self._count_tokens(formatted_example)
if token_count <= self.max_tokens_per_example:
formatted_examples.append(formatted_example)
stats["formatted_examples"] += 1
stats["total_tokens"] += token_count
else:
stats["skipped_examples"] += 1
stats["issues"].append(f"Example {i}: Too many tokens ({token_count})")
else:
stats["skipped_examples"] += 1
stats["issues"].append(f"Example {i}: Formatting failed")
except Exception as e:
stats["skipped_examples"] += 1
stats["issues"].append(f"Example {i}: Error - {str(e)}")
# Calculate average tokens
if stats["formatted_examples"] > 0:
stats["average_tokens"] = stats["total_tokens"] / stats["formatted_examples"]
# Write to JSONL file
with open(output_file, 'w', encoding='utf-8') as f:
for example in formatted_examples:
f.write(json.dumps(example, ensure_ascii=False) + '\n')
print(f"π OpenAI Format Results:")
print(f"Formatted: {stats['formatted_examples']}/{stats['total_examples']} examples")
print(f"Average tokens per example: {stats['average_tokens']:.1f}")
print(f"Output saved to: {output_file}")
if stats["issues"]:
print(f"β οΈ Issues found: {len(stats['issues'])}")
for issue in stats["issues"][:5]: # Show first 5 issues
print(f" - {issue}")
return stats
def _format_single_example(self, example: Dict) -> Optional[Dict]:
"""Format a single example for OpenAI"""
instruction = example.get("instruction", "").strip()
response = example.get("response", "").strip()
if not instruction or not response:
return None
# OpenAI fine-tuning format
formatted = {
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": instruction},
{"role": "assistant", "content": response}
]
}
return formatted
def _count_tokens(self, example: Dict) -> int:
"""Count tokens in a formatted example"""
total_tokens = 0
for message in example["messages"]:
# Count tokens for each message
content_tokens = len(self.tokenizer.encode(message["content"]))
role_tokens = len(self.tokenizer.encode(message["role"]))
total_tokens += content_tokens + role_tokens + 4 # 4 tokens for message formatting
return total_tokens + 2 # 2 tokens for message list formatting
# Usage example
formatter = OpenAIFormatter()
# Sample training data
sample_examples = [
{
"instruction": "How do I create a Python virtual environment?",
"response": "You can create a Python virtual environment using: python -m venv myenv. Then activate it with: source myenv/bin/activate (Linux/Mac) or myenv\\Scripts\\activate (Windows)."
}
]
# Format for OpenAI
stats = formatter.format_for_openai(sample_examples, "training_data.jsonl")
Hugging Face Dataset Format
For Hugging Face Transformers, you'll typically use datasets in JSON or CSV format:
# data_formatting/huggingface_formatter.py
import json
import pandas as pd
from typing import List, Dict
from datasets import Dataset
class HuggingFaceFormatter:
"""Format data for Hugging Face fine-tuning"""
def __init__(self):
self.supported_formats = ["json", "csv", "dataset"]
def format_for_huggingface(self, examples: List[Dict],
output_format: str = "json",
output_file: str = "hf_training_data") -> str:
"""
Format examples for Hugging Face fine-tuning
Args:
examples: List of training examples
output_format: Output format ("json", "csv", or "dataset")
output_file: Output file name (without extension)
Returns:
Path to the created file
"""
if output_format not in self.supported_formats:
raise ValueError(f"Unsupported format. Use: {self.supported_formats}")
# Prepare data
formatted_data = []
for example in examples:
# Format for instruction-following
formatted_example = {
"input": example.get("instruction", ""),
"output": example.get("response", ""),
"instruction": "Please provide a helpful response to the following:",
}
# Add metadata if available
if "quality_score" in example:
formatted_example["quality_score"] = example["quality_score"]
formatted_data.append(formatted_example)
# Save in requested format
if output_format == "json":
output_path = f"{output_file}.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(formatted_data, f, indent=2, ensure_ascii=False)
elif output_format == "csv":
output_path = f"{output_file}.csv"
df = pd.DataFrame(formatted_data)
df.to_csv(output_path, index=False, encoding='utf-8')
elif output_format == "dataset":
output_path = f"{output_file}_dataset"
dataset = Dataset.from_list(formatted_data)
dataset.save_to_disk(output_path)
print(f"π Hugging Face format created: {output_path}")
print(f"Total examples: {len(formatted_data)}")
return output_path
# Usage example
hf_formatter = HuggingFaceFormatter()
# Format for Hugging Face
hf_path = hf_formatter.format_for_huggingface(
sample_examples,
output_format="json",
output_file="my_agent_training_data"
)
Step 4: Quality Assurance and Testing
Before using your training data, perform final quality assurance checks:
Automated Quality Pipeline
# quality_assurance/qa_pipeline.py
from typing import List, Dict, Tuple
import json
from pathlib import Path
class QualityAssurancePipeline:
"""Comprehensive QA pipeline for training data"""
def __init__(self):
self.qa_checks = [
self._check_data_balance,
self._check_instruction_diversity,
self._check_response_quality,
self._check_formatting_consistency,
self._check_domain_coverage
]
def run_qa_pipeline(self, examples: List[Dict]) -> Dict:
"""Run complete QA pipeline"""
qa_report = {
"total_examples": len(examples),
"qa_checks": {},
"overall_score": 0.0,
"recommendations": [],
"critical_issues": []
}
total_score = 0
print("π Running Quality Assurance Pipeline...")
for check in self.qa_checks:
check_name = check.__name__.replace("_check_", "")
print(f" Running {check_name}...")
try:
score, issues, recommendations = check(examples)
qa_report["qa_checks"][check_name] = {
"score": score,
"issues": issues,
"recommendations": recommendations
}
total_score += score
# Collect critical issues (score < 0.5)
if score < 0.5:
qa_report["critical_issues"].extend(issues)
# Collect all recommendations
qa_report["recommendations"].extend(recommendations)
except Exception as e:
qa_report["qa_checks"][check_name] = {
"score": 0.0,
"issues": [f"QA check failed: {str(e)}"],
"recommendations": ["Fix QA check implementation"]
}
# Calculate overall score
qa_report["overall_score"] = total_score / len(self.qa_checks)
# Generate final recommendations
qa_report["final_assessment"] = self._generate_final_assessment(qa_report)
return qa_report
def _check_data_balance(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
"""Check if data is balanced across different categories"""
issues = []
recommendations = []
# Check instruction length distribution
instruction_lengths = [len(ex.get("instruction", "")) for ex in examples]
avg_length = sum(instruction_lengths) / len(instruction_lengths)
# Check for extreme outliers
very_short = sum(1 for length in instruction_lengths if length < 20)
very_long = sum(1 for length in instruction_lengths if length > 1000)
balance_score = 1.0
if very_short / len(examples) > 0.2:
balance_score -= 0.3
issues.append(f"{very_short} instructions are very short (< 20 chars)")
recommendations.append("Consider expanding short instructions with more context")
if very_long / len(examples) > 0.1:
balance_score -= 0.2
issues.append(f"{very_long} instructions are very long (> 1000 chars)")
recommendations.append("Consider breaking down long instructions")
return max(balance_score, 0.0), issues, recommendations
def _check_instruction_diversity(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
"""Check diversity of instructions"""
issues = []
recommendations = []
instructions = [ex.get("instruction", "").lower() for ex in examples]
# Check for duplicate instructions
unique_instructions = len(set(instructions))
diversity_ratio = unique_instructions / len(instructions)
score = diversity_ratio
if diversity_ratio < 0.8:
issues.append(f"Low instruction diversity: {diversity_ratio:.2%}")
recommendations.append("Add more varied instruction formats and phrasings")
# Check for common starting patterns
common_starters = ["how to", "what is", "can you", "please"]
starter_counts = {}
for instruction in instructions:
for starter in common_starters:
if instruction.startswith(starter):
starter_counts[starter] = starter_counts.get(starter, 0) + 1
# If any starter appears in > 30% of instructions
for starter, count in starter_counts.items():
if count / len(instructions) > 0.3:
score *= 0.8
issues.append(f"'{starter}' appears in {count}/{len(instructions)} instructions")
recommendations.append(f"Vary instruction beginnings beyond '{starter}'")
return max(score, 0.0), issues, recommendations
def _check_response_quality(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
"""Check quality of responses"""
issues = []
recommendations = []
responses = [ex.get("response", "") for ex in examples]
# Check for empty or very short responses
short_responses = sum(1 for resp in responses if len(resp) < 30)
score = 1.0
if short_responses / len(responses) > 0.1:
score -= 0.4
issues.append(f"{short_responses} responses are too short (< 30 chars)")
recommendations.append("Expand short responses with more detail and examples")
# Check for helpful indicators
helpful_indicators = ["example", "step", "because", "solution", "approach"]
responses_with_indicators = 0
for response in responses:
if any(indicator in response.lower() for indicator in helpful_indicators):
responses_with_indicators += 1
helpful_ratio = responses_with_indicators / len(responses)
if helpful_ratio < 0.5:
score -= 0.2
issues.append(f"Only {helpful_ratio:.1%} responses contain helpful indicators")
recommendations.append("Add more explanatory content, examples, and step-by-step guidance")
return max(score, 0.0), issues, recommendations
def _check_formatting_consistency(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
"""Check formatting consistency"""
issues = []
recommendations = []
score = 1.0
# Check required fields
required_fields = ["instruction", "response"]
for i, example in enumerate(examples):
for field in required_fields:
if field not in example or not example[field]:
score -= 0.1
issues.append(f"Example {i}: Missing or empty '{field}'")
if len(issues) > 0:
recommendations.append("Ensure all examples have required fields: instruction, response")
return max(score, 0.0), issues, recommendations
def _check_domain_coverage(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
"""Check if examples cover intended domain adequately"""
issues = []
recommendations = []
# This is a simplified check - in practice, you'd use more sophisticated
# domain analysis based on your specific use case
all_text = " ".join([
ex.get("instruction", "") + " " + ex.get("response", "")
for ex in examples
]).lower()
# Check for technical terms (adjust based on your domain)
technical_terms = ["api", "function", "code", "error", "debug", "install"]
technical_coverage = sum(1 for term in technical_terms if term in all_text)
score = min(technical_coverage / len(technical_terms), 1.0)
if score < 0.5:
missing_terms = [term for term in technical_terms if term not in all_text]
issues.append(f"Limited domain coverage. Missing: {missing_terms}")
recommendations.append("Add examples covering missing domain areas")
return score, issues, recommendations
def _generate_final_assessment(self, report: Dict) -> str:
"""Generate final assessment and recommendations"""
overall_score = report["overall_score"]
if overall_score >= 0.8:
return "β
High Quality: Dataset is ready for fine-tuning"
elif overall_score >= 0.6:
return "β οΈ Good Quality: Consider addressing recommendations before fine-tuning"
elif overall_score >= 0.4:
return "π‘ Moderate Quality: Significant improvements needed before fine-tuning"
else:
return "β Low Quality: Major issues must be resolved before fine-tuning"
# Usage example
qa_pipeline = QualityAssurancePipeline()
qa_report = qa_pipeline.run_qa_pipeline(sample_examples)
print(f"\nπ Quality Assurance Report:")
print(f"Overall Score: {qa_report['overall_score']:.2f}")
print(f"Assessment: {qa_report['final_assessment']}")
if qa_report['critical_issues']:
print(f"\nπ¨ Critical Issues:")
for issue in qa_report['critical_issues']:
print(f" - {issue}")
if qa_report['recommendations']:
print(f"\nπ‘ Recommendations:")
for rec in qa_report['recommendations'][:5]: # Show top 5
print(f" - {rec}")
What You've Accomplished
Congratulations! You now have a complete, production-ready data preparation pipeline for fine-tuning LLMs:
- β Strategic Data Collection - Internal and external source integration
- β Professional Data Cleaning - Comprehensive text processing and validation
- β Multi-Platform Formatting - OpenAI and Hugging Face compatibility
- β Quality Assurance - Automated QA pipeline with scoring and recommendations
- β Best Practices - Industry-standard approaches to data preparation
Key Features Implemented:
- Intelligent Data Collection with quality scoring
- Advanced Text Cleaning with code preservation
- Comprehensive Validation with detailed metrics
- Multi-Platform Support for different fine-tuning services
- Automated Quality Assurance with actionable recommendations
What's Next?
In Part 2: Fine-Tuning with OpenAI, you'll learn:
- Setting up OpenAI fine-tuning jobs
- Configuring hyperparameters for optimal results
- Monitoring training progress and metrics
- Evaluating fine-tuned model performance
- Deploying and testing your custom model
Quick Reference Commands
# Install required packages
pip install openai tiktoken datasets pandas
# Run data preparation pipeline
python data_preparation_pipeline.py
# Validate dataset quality
python quality_assurance.py --input training_data.json
# Format for OpenAI
python format_openai.py --input clean_data.json --output openai_training.jsonl
Additional Resources
- OpenAI Fine-tuning Guide: platform.openai.com/docs/guides/fine-tuning
- Hugging Face Datasets: huggingface.co/docs/datasets
- Data Quality Best Practices: papers.nips.cc/paper/2021/file/data-quality-for-machine-learning.pdf
- Fine-tuning Research: arxiv.org/abs/2109.01652
Ready to start fine-tuning with your prepared data? Continue to Part 2: Fine-Tuning with OpenAI to create your specialized AI agent!
Tutorial Navigation
- Part 1: Preparing Training Data (Current)
- Part 2: Fine-Tuning with OpenAI β
- Part 3: Hugging Face Fine-Tuning β
- Part 4: Model Deployment β
- Part 5: Integration & Troubleshooting β
This tutorial is part of our comprehensive Fine-Tuning LLMs series. Quality training data is the foundation of successful fine-tuning - invest the time to get it right, and your specialized AI agent will exceed expectations.
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
π Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
π‘ Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
π Fine-Tuning LLMs for Custom Agent Behaviors
View All Parts in This Series
π Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.