Fine-Tuning LLMs for Custom Agent Behaviors - Part 2: Fine-Tuning with OpenAI

π Fine-Tuning LLMs for Custom Agent Behaviors
View All Parts in This Series
Ad Space
Fine-Tuning LLMs for Custom Agent Behaviors - Part 2: Fine-Tuning with OpenAI
Now that you have high-quality training data from Part 1, it's time to create your specialized AI agent using OpenAI's fine-tuning platform. OpenAI's fine-tuning service offers the most accessible path to creating custom models, but success requires understanding hyperparameters, monitoring training progress, and implementing proper evaluation strategies.
This comprehensive guide will take you through the complete OpenAI fine-tuning process, from initial setup to deploying your custom model in production.
What You'll Learn in This Tutorial
By the end of this tutorial, you'll have:
- β Complete OpenAI fine-tuning workflow from data upload to model deployment
- β Advanced hyperparameter optimization for optimal model performance
- β Professional monitoring and evaluation techniques for training jobs
- β Production deployment strategies with version management
- β Cost optimization techniques to minimize fine-tuning expenses
- β Troubleshooting framework for common fine-tuning issues
Estimated Time: 45-50 minutes
Understanding OpenAI Fine-Tuning Architecture
OpenAI's fine-tuning service builds upon their base models (GPT-3.5-turbo, GPT-4) to create specialized versions trained on your specific data.
How OpenAI Fine-Tuning Works
The Fine-Tuning Process:
- Base Model Selection: Choose from available OpenAI models
- Data Upload: Submit your JSONL training data
- Training Job Creation: Configure hyperparameters and start training
- Model Training: OpenAI trains your model on their infrastructure
- Model Evaluation: Test performance and validate results
- Model Deployment: Deploy for production use with API access
Key Advantages of OpenAI Fine-Tuning:
- No Infrastructure Management: OpenAI handles all compute resources
- Production-Ready Models: Automatically deployed and scaled
- Built-in Monitoring: Training metrics and logs provided
- Cost-Effective: Pay only for training time and usage
Step 1: Setting Up OpenAI Fine-Tuning Environment
Let's create a comprehensive fine-tuning management system for OpenAI:
OpenAI Fine-Tuning Manager
# openai_finetuning/fine_tune_manager.py
import openai
import json
import time
import os
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import logging
from dataclasses import dataclass
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class FineTuningConfig:
"""Configuration for fine-tuning jobs"""
model: str = "gpt-3.5-turbo-1106" # Base model to fine-tune
n_epochs: int = 3 # Number of training epochs
batch_size: Optional[int] = None # Auto-selected if None
learning_rate_multiplier: Optional[float] = None # Auto-selected if None
prompt_loss_weight: float = 0.01 # Weight for prompt tokens in loss
validation_file: Optional[str] = None # Validation data file
suffix: Optional[str] = None # Model name suffix
# Advanced parameters
early_stopping: bool = True
temperature: float = 1.0
max_tokens: int = 256
class OpenAIFineTuningManager:
"""Comprehensive OpenAI fine-tuning management system"""
def __init__(self, api_key: str = None):
"""
Initialize OpenAI fine-tuning manager
Args:
api_key: OpenAI API key (if None, uses OPENAI_API_KEY env var)
"""
# Set up OpenAI client
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable.")
openai.api_key = self.api_key
# Initialize tracking
self.active_jobs = {}
self.completed_jobs = {}
self.uploaded_files = {}
# Validation settings
self.min_training_examples = 10
self.max_training_examples = 3000
self.recommended_examples = 50
logger.info("β
OpenAI Fine-Tuning Manager initialized")
def validate_training_data(self, file_path: str) -> Dict[str, Any]:
"""
Validate training data format and content for OpenAI
Args:
file_path: Path to JSONL training data file
Returns:
Validation report with issues and recommendations
"""
logger.info(f"π Validating training data: {file_path}")
validation_report = {
"is_valid": True,
"total_examples": 0,
"issues": [],
"warnings": [],
"recommendations": [],
"estimated_cost": 0.0,
"estimated_training_time": "Unknown"
}
try:
# Read and validate JSONL format
examples = []
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
try:
example = json.loads(line.strip())
examples.append(example)
# Validate example format
self._validate_example_format(example, line_num, validation_report)
except json.JSONDecodeError as e:
validation_report["is_valid"] = False
validation_report["issues"].append(
f"Line {line_num}: Invalid JSON - {str(e)}"
)
validation_report["total_examples"] = len(examples)
# Validate dataset size
self._validate_dataset_size(len(examples), validation_report)
# Estimate costs and training time
if len(examples) > 0:
validation_report["estimated_cost"] = self._estimate_training_cost(examples)
validation_report["estimated_training_time"] = self._estimate_training_time(examples)
# Check for common issues
self._check_common_issues(examples, validation_report)
# Generate recommendations
self._generate_data_recommendations(examples, validation_report)
except FileNotFoundError:
validation_report["is_valid"] = False
validation_report["issues"].append(f"Training data file not found: {file_path}")
except Exception as e:
validation_report["is_valid"] = False
validation_report["issues"].append(f"Validation error: {str(e)}")
# Log validation results
if validation_report["is_valid"]:
logger.info(f"β
Training data validated: {validation_report['total_examples']} examples")
else:
logger.error(f"β Training data validation failed: {len(validation_report['issues'])} issues found")
return validation_report
def _validate_example_format(self, example: Dict, line_num: int, report: Dict):
"""Validate individual example format"""
# Check required structure
if "messages" not in example:
report["is_valid"] = False
report["issues"].append(f"Line {line_num}: Missing 'messages' field")
return
messages = example["messages"]
if not isinstance(messages, list) or len(messages) < 2:
report["is_valid"] = False
report["issues"].append(f"Line {line_num}: 'messages' must be a list with at least 2 messages")
return
# Validate message format
for msg_idx, message in enumerate(messages):
if not isinstance(message, dict):
report["issues"].append(f"Line {line_num}, Message {msg_idx}: Must be a dictionary")
continue
if "role" not in message or "content" not in message:
report["issues"].append(f"Line {line_num}, Message {msg_idx}: Missing 'role' or 'content'")
continue
# Check role validity
valid_roles = {"system", "user", "assistant"}
if message["role"] not in valid_roles:
report["issues"].append(f"Line {line_num}, Message {msg_idx}: Invalid role '{message['role']}'")
# Check content length
if len(message["content"]) == 0:
report["warnings"].append(f"Line {line_num}, Message {msg_idx}: Empty content")
elif len(message["content"]) > 8000:
report["warnings"].append(f"Line {line_num}, Message {msg_idx}: Very long content ({len(message['content'])} chars)")
def _validate_dataset_size(self, num_examples: int, report: Dict):
"""Validate dataset size and provide recommendations"""
if num_examples < self.min_training_examples:
report["is_valid"] = False
report["issues"].append(f"Too few training examples: {num_examples} (minimum: {self.min_training_examples})")
elif num_examples > self.max_training_examples:
report["warnings"].append(f"Very large dataset: {num_examples} examples (may be expensive)")
elif num_examples < self.recommended_examples:
report["warnings"].append(f"Small dataset: {num_examples} examples (recommended: {self.recommended_examples}+)")
def _estimate_training_cost(self, examples: List[Dict]) -> float:
"""
Estimate fine-tuning cost based on token count
OpenAI pricing (as of 2024):
- GPT-3.5-turbo fine-tuning: $0.008 per 1K tokens
- GPT-4 fine-tuning: $0.080 per 1K tokens
"""
import tiktoken
# Get tokenizer for the model
try:
tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo")
except:
tokenizer = tiktoken.get_encoding("cl100k_base")
total_tokens = 0
for example in examples:
for message in example.get("messages", []):
content = message.get("content", "")
tokens = len(tokenizer.encode(content))
total_tokens += tokens
# Estimate cost (using GPT-3.5-turbo pricing)
cost_per_1k_tokens = 0.008
estimated_cost = (total_tokens / 1000) * cost_per_1k_tokens
return round(estimated_cost, 2)
def _estimate_training_time(self, examples: List[Dict]) -> str:
"""Estimate training time based on dataset size"""
num_examples = len(examples)
# Rough estimates based on experience
if num_examples <= 50:
return "5-10 minutes"
elif num_examples <= 200:
return "10-30 minutes"
elif num_examples <= 500:
return "30-60 minutes"
elif num_examples <= 1000:
return "1-3 hours"
else:
return "3+ hours"
def _check_common_issues(self, examples: List[Dict], report: Dict):
"""Check for common fine-tuning issues"""
# Check for duplicate examples
seen_contents = set()
duplicates = 0
for example in examples:
messages = example.get("messages", [])
if len(messages) > 0:
# Create hash of conversation
content_hash = hash(str(messages))
if content_hash in seen_contents:
duplicates += 1
seen_contents.add(content_hash)
if duplicates > 0:
report["warnings"].append(f"Found {duplicates} potential duplicate examples")
# Check conversation balance
avg_messages_per_example = sum(len(ex.get("messages", [])) for ex in examples) / len(examples)
if avg_messages_per_example < 2:
report["warnings"].append("Very short conversations - consider adding more context")
elif avg_messages_per_example > 10:
report["warnings"].append("Very long conversations - may be expensive to train")
def _generate_data_recommendations(self, examples: List[Dict], report: Dict):
"""Generate recommendations for improving training data"""
num_examples = len(examples)
if num_examples < self.recommended_examples:
report["recommendations"].append(
f"Add more training examples for better performance (current: {num_examples}, recommended: {self.recommended_examples}+)"
)
# Check for system message usage
has_system_messages = any(
any(msg.get("role") == "system" for msg in ex.get("messages", []))
for ex in examples
)
if not has_system_messages:
report["recommendations"].append("Consider adding system messages to define agent behavior and personality")
# Check assistant message quality
short_responses = 0
for example in examples:
for message in example.get("messages", []):
if message.get("role") == "assistant" and len(message.get("content", "")) < 20:
short_responses += 1
if short_responses > num_examples * 0.2:
report["recommendations"].append("Many assistant responses are very short - consider adding more detailed responses")
async def upload_training_data(self, file_path: str, purpose: str = "fine-tune") -> Dict:
"""
Upload training data to OpenAI
Args:
file_path: Path to JSONL training data
purpose: Purpose of the file (usually "fine-tune")
Returns:
Upload result with file ID and metadata
"""
logger.info(f"π€ Uploading training data: {file_path}")
# Validate data first
validation_report = self.validate_training_data(file_path)
if not validation_report["is_valid"]:
raise ValueError(f"Training data validation failed: {validation_report['issues']}")
if validation_report["warnings"]:
logger.warning("β οΈ Training data warnings:")
for warning in validation_report["warnings"]:
logger.warning(f" - {warning}")
try:
# Upload file to OpenAI
with open(file_path, 'rb') as f:
upload_response = openai.File.create(
file=f,
purpose=purpose
)
file_info = {
"file_id": upload_response.id,
"filename": upload_response.filename,
"bytes": upload_response.bytes,
"purpose": upload_response.purpose,
"status": upload_response.status,
"uploaded_at": datetime.now().isoformat(),
"validation_report": validation_report
}
# Store file info for tracking
self.uploaded_files[upload_response.id] = file_info
logger.info(f"β
File uploaded successfully: {upload_response.id}")
logger.info(f"File size: {upload_response.bytes} bytes")
logger.info(f"Examples: {validation_report['total_examples']}")
logger.info(f"Estimated cost: ${validation_report['estimated_cost']}")
return file_info
except Exception as e:
logger.error(f"β File upload failed: {str(e)}")
raise
async def create_fine_tuning_job(self,
training_file_id: str,
config: FineTuningConfig,
validation_file_id: str = None) -> Dict:
"""
Create and start a fine-tuning job
Args:
training_file_id: ID of uploaded training file
config: Fine-tuning configuration
validation_file_id: Optional validation file ID
Returns:
Fine-tuning job information
"""
logger.info(f"π Creating fine-tuning job with model: {config.model}")
try:
# Prepare job parameters
job_params = {
"training_file": training_file_id,
"model": config.model
}
# Add hyperparameters if specified
hyperparameters = {}
if config.n_epochs:
hyperparameters["n_epochs"] = config.n_epochs
if config.batch_size:
hyperparameters["batch_size"] = config.batch_size
if config.learning_rate_multiplier:
hyperparameters["learning_rate_multiplier"] = config.learning_rate_multiplier
if hyperparameters:
job_params["hyperparameters"] = hyperparameters
# Add validation file if provided
if validation_file_id:
job_params["validation_file"] = validation_file_id
# Add suffix for model naming
if config.suffix:
job_params["suffix"] = config.suffix
# Create the fine-tuning job
fine_tune_response = openai.FineTuningJob.create(**job_params)
# Track job information
job_info = {
"job_id": fine_tune_response.id,
"model": fine_tune_response.model,
"status": fine_tune_response.status,
"created_at": datetime.fromtimestamp(fine_tune_response.created_at).isoformat(),
"training_file": training_file_id,
"validation_file": validation_file_id,
"hyperparameters": fine_tune_response.hyperparameters,
"config": config.__dict__,
"estimated_completion": self._estimate_completion_time(config)
}
self.active_jobs[fine_tune_response.id] = job_info
logger.info(f"β
Fine-tuning job created: {fine_tune_response.id}")
logger.info(f"Status: {fine_tune_response.status}")
logger.info(f"Model: {fine_tune_response.model}")
logger.info(f"Estimated completion: {job_info['estimated_completion']}")
return job_info
except Exception as e:
logger.error(f"β Failed to create fine-tuning job: {str(e)}")
raise
def _estimate_completion_time(self, config: FineTuningConfig) -> str:
"""Estimate when fine-tuning will complete"""
# Base time estimates (these are rough approximations)
base_minutes = 10 # Minimum time for any job
# Add time based on epochs
epoch_minutes = config.n_epochs * 5
# Add time based on model complexity
if "gpt-4" in config.model:
base_minutes *= 3 # GPT-4 takes longer
total_minutes = base_minutes + epoch_minutes
completion_time = datetime.now() + timedelta(minutes=total_minutes)
return completion_time.strftime("%Y-%m-%d %H:%M:%S")
async def monitor_fine_tuning_job(self, job_id: str, poll_interval: int = 60) -> Dict:
"""
Monitor fine-tuning job progress with real-time updates
Args:
job_id: Fine-tuning job ID
poll_interval: How often to check status (seconds)
Returns:
Final job status and results
"""
logger.info(f"π Monitoring fine-tuning job: {job_id}")
start_time = time.time()
last_status = None
while True:
try:
# Get current job status
job_status = openai.FineTuningJob.retrieve(job_id)
# Update tracking
if job_id in self.active_jobs:
self.active_jobs[job_id].update({
"status": job_status.status,
"last_checked": datetime.now().isoformat()
})
# Log status changes
if job_status.status != last_status:
logger.info(f"π Job {job_id} status: {job_status.status}")
last_status = job_status.status
# Log additional details based on status
if hasattr(job_status, 'trained_tokens') and job_status.trained_tokens:
logger.info(f"Trained tokens: {job_status.trained_tokens:,}")
# Check if job completed
if job_status.status in ["succeeded", "failed", "cancelled"]:
logger.info(f"π Job {job_id} completed with status: {job_status.status}")
# Move to completed jobs
if job_id in self.active_jobs:
final_job_info = self.active_jobs.pop(job_id)
final_job_info.update({
"final_status": job_status.status,
"completed_at": datetime.now().isoformat(),
"total_time": f"{(time.time() - start_time) / 60:.1f} minutes",
"fine_tuned_model": getattr(job_status, 'fine_tuned_model', None),
"result_files": getattr(job_status, 'result_files', []),
"trained_tokens": getattr(job_status, 'trained_tokens', None)
})
self.completed_jobs[job_id] = final_job_info
# Return final results
return await self._process_job_completion(job_id, job_status)
# Wait before next check
logger.debug(f"Checking again in {poll_interval} seconds...")
time.sleep(poll_interval)
except KeyboardInterrupt:
logger.info("π΄ Monitoring interrupted by user")
return {"status": "monitoring_interrupted", "job_id": job_id}
except Exception as e:
logger.error(f"β Error monitoring job {job_id}: {str(e)}")
time.sleep(poll_interval) # Continue monitoring despite errors
async def _process_job_completion(self, job_id: str, job_status) -> Dict:
"""Process completed fine-tuning job"""
completion_result = {
"job_id": job_id,
"status": job_status.status,
"fine_tuned_model": getattr(job_status, 'fine_tuned_model', None),
"metrics": {},
"recommendations": []
}
if job_status.status == "succeeded":
logger.info(f"π Fine-tuning succeeded!")
logger.info(f"Model ID: {job_status.fine_tuned_model}")
# Get training metrics if available
if hasattr(job_status, 'result_files') and job_status.result_files:
completion_result["metrics"] = await self._extract_training_metrics(
job_status.result_files
)
# Generate success recommendations
completion_result["recommendations"] = [
"Test your fine-tuned model with validation examples",
"Compare performance with base model",
"Set up production deployment with proper monitoring",
"Consider creating backup checkpoints"
]
elif job_status.status == "failed":
logger.error(f"π₯ Fine-tuning failed!")
# Analyze failure and provide recommendations
completion_result["recommendations"] = await self._analyze_failure(job_id, job_status)
return completion_result
async def _extract_training_metrics(self, result_files: List) -> Dict:
"""Extract and parse training metrics from result files"""
metrics = {
"training_loss": [],
"validation_loss": [],
"final_loss": None,
"convergence_analysis": {}
}
try:
for file_info in result_files:
if file_info.purpose == "fine-tune-results":
# Download and parse metrics file
file_content = openai.File.download(file_info.id)
# Parse metrics (format may vary)
lines = file_content.decode('utf-8').strip().split('\n')
for line in lines:
try:
metric_data = json.loads(line)
if "train_loss" in metric_data:
metrics["training_loss"].append(metric_data["train_loss"])
if "valid_loss" in metric_data:
metrics["validation_loss"].append(metric_data["valid_loss"])
except json.JSONDecodeError:
continue
# Calculate final metrics
if metrics["training_loss"]:
metrics["final_loss"] = metrics["training_loss"][-1]
metrics["convergence_analysis"] = self._analyze_convergence(metrics["training_loss"])
except Exception as e:
logger.warning(f"Failed to extract training metrics: {str(e)}")
return metrics
def _analyze_convergence(self, training_losses: List[float]) -> Dict:
"""Analyze training convergence from loss values"""
if len(training_losses) < 3:
return {"status": "insufficient_data"}
# Check if loss is decreasing
recent_losses = training_losses[-3:]
is_decreasing = all(recent_losses[i] > recent_losses[i+1] for i in range(len(recent_losses)-1))
# Calculate loss reduction
initial_loss = training_losses[0]
final_loss = training_losses[-1]
loss_reduction = ((initial_loss - final_loss) / initial_loss) * 100
# Determine convergence status
if loss_reduction > 20:
convergence_status = "good_convergence"
elif loss_reduction > 5:
convergence_status = "moderate_convergence"
else:
convergence_status = "poor_convergence"
return {
"status": convergence_status,
"loss_reduction_percent": round(loss_reduction, 2),
"is_decreasing": is_decreasing,
"final_loss": final_loss,
"training_epochs": len(training_losses)
}
async def _analyze_failure(self, job_id: str, job_status) -> List[str]:
"""Analyze failed fine-tuning job and provide recommendations"""
recommendations = [
"Check training data format and validation results",
"Verify API key permissions and billing status",
"Review OpenAI service status for platform issues"
]
# Try to get more detailed error information
try:
# Get job events for more details
events = openai.FineTuningJob.list_events(id=job_id, limit=10)
for event in events.data:
if event.level == "error":
logger.error(f"Error event: {event.message}")
# Add specific recommendations based on error
if "invalid" in event.message.lower():
recommendations.append("Fix data format issues identified in validation")
elif "quota" in event.message.lower():
recommendations.append("Check API usage limits and billing status")
elif "permission" in event.message.lower():
recommendations.append("Verify API key has fine-tuning permissions")
except Exception as e:
logger.warning(f"Could not retrieve detailed error information: {str(e)}")
return recommendations
async def evaluate_fine_tuned_model(self, model_id: str, test_cases: List[Dict]) -> Dict:
"""
Evaluate fine-tuned model performance
Args:
model_id: ID of fine-tuned model
test_cases: List of test examples with expected responses
Returns:
Evaluation results with metrics and analysis
"""
logger.info(f"π Evaluating fine-tuned model: {model_id}")
evaluation_results = {
"model_id": model_id,
"test_cases": len(test_cases),
"accuracy_metrics": {},
"response_quality": {},
"comparison_with_base": {},
"recommendations": []
}
correct_responses = 0
response_times = []
response_qualities = []
for i, test_case in enumerate(test_cases):
try:
# Test fine-tuned model
start_time = time.time()
fine_tuned_response = openai.ChatCompletion.create(
model=model_id,
messages=test_case["messages"][:-1], # Exclude expected response
max_tokens=config.max_tokens,
temperature=config.temperature
)
response_time = time.time() - start_time
response_times.append(response_time)
# Extract response
generated_response = fine_tuned_response.choices[0].message.content
expected_response = test_case["messages"][-1]["content"]
# Evaluate response quality
quality_score = self._evaluate_response_quality(
generated_response,
expected_response
)
response_qualities.append(quality_score)
# Check if response is acceptable (quality > 0.7)
if quality_score > 0.7:
correct_responses += 1
# Log progress
if (i + 1) % 10 == 0:
logger.info(f"Evaluated {i + 1}/{len(test_cases)} test cases")
except Exception as e:
logger.error(f"Error evaluating test case {i}: {str(e)}")
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
π Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
π‘ Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
π Fine-Tuning LLMs for Custom Agent Behaviors
View All Parts in This Series
π Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.