ai-agenttutorialllmfine-tuningdata-prepmachine-learning

Fine-Tuning LLMs for Custom Agent Behaviors - Part 1: Preparing Training Data

By AgentForge Hub8/14/202524 min read
Advanced
Fine-Tuning LLMs for Custom Agent Behaviors - Part 1: Preparing Training Data

πŸ“š Fine-Tuning LLMs for Custom Agent Behaviors

Part 1 of 5
Series Progress20% Complete
View All Parts in This Series

Ad Space

Fine-Tuning LLMs for Custom Agent Behaviors - Part 1: Preparing Training Data

Fine-tuning large language models is one of the most powerful techniques for creating AI agents with specialized behaviors and domain expertise. However, the success of your fine-tuned model depends entirely on the quality of your training data. Poor data leads to poor performance, while high-quality, well-structured training data can transform a general-purpose LLM into a domain expert.

In this comprehensive guide, you'll master the critical first step of fine-tuning: preparing training data that produces exceptional results.

What You'll Learn in This Tutorial

By the end of this tutorial, you'll have:

  • βœ… Deep understanding of training data requirements for different LLM providers
  • βœ… Complete data preparation pipeline with validation and quality checks
  • βœ… Professional-grade tools for data collection, cleaning, and formatting
  • βœ… Quality assurance framework to ensure optimal training outcomes
  • βœ… Best practices for prompt engineering and completion writing
  • βœ… Automated workflows for large-scale data processing

Estimated Time: 40-45 minutes


Understanding Fine-Tuning Data Requirements

Before diving into data preparation, it's crucial to understand what makes training data effective for fine-tuning.

The Foundation of Successful Fine-Tuning

Why Data Quality Matters:

  • Model Behavior: Your model will mimic the patterns in your training data
  • Performance: High-quality data leads to better accuracy and fewer errors
  • Consistency: Well-formatted data ensures predictable model responses
  • Efficiency: Good data reduces training time and computational costs

Fine-Tuning vs. Pre-Training Data

Pre-training data (used to create base models like GPT-4):

  • Massive scale (trillions of tokens)
  • General web content, books, articles
  • Focuses on language understanding

Fine-tuning data (what we're preparing):

  • Smaller scale (hundreds to thousands of examples)
  • Task-specific, high-quality examples
  • Focuses on specific behaviors and responses

Understanding Different Fine-Tuning Approaches

1. Instruction Following

  • Teaches the model to follow specific instructions
  • Format: Instruction β†’ Response
  • Best for: Task-specific agents, specialized workflows

2. Conversational Behavior

  • Teaches the model conversation patterns
  • Format: Multi-turn dialogue
  • Best for: Chatbots, customer service agents

3. Domain Expertise

  • Teaches domain-specific knowledge and reasoning
  • Format: Question β†’ Expert answer
  • Best for: Medical, legal, technical assistants

Step 1: Data Collection Strategy

The first step is gathering raw data that represents the behaviors you want your model to learn.

Understanding Your Use Case

Before collecting data, clearly define your fine-tuning objectives:

# data_collection/use_case_definition.py

class FineTuningUseCase:
    """Define your fine-tuning objectives and requirements"""
    
    def __init__(self):
        self.objective = ""
        self.target_behaviors = []
        self.domain_expertise = []
        self.conversation_style = ""
        self.quality_requirements = {}
    
    def define_objective(self, objective: str, behaviors: list, domain: list):
        """
        Define what you want your model to achieve
        
        Args:
            objective: Primary goal (e.g., "Customer support agent for SaaS product")
            behaviors: Specific behaviors (e.g., ["helpful", "concise", "technical"])
            domain: Domain expertise (e.g., ["software troubleshooting", "billing"])
        """
        self.objective = objective
        self.target_behaviors = behaviors
        self.domain_expertise = domain
        
        print(f"πŸ“‹ Objective: {objective}")
        print(f"🎯 Target Behaviors: {', '.join(behaviors)}")
        print(f"🧠 Domain Expertise: {', '.join(domain)}")
    
    def estimate_data_requirements(self):
        """Estimate how much training data you'll need"""
        
        base_examples = 100  # Minimum for basic fine-tuning
        
        # Add examples based on complexity
        behavior_multiplier = len(self.target_behaviors) * 20
        domain_multiplier = len(self.domain_expertise) * 50
        
        estimated_examples = base_examples + behavior_multiplier + domain_multiplier
        
        return {
            "minimum_examples": estimated_examples,
            "recommended_examples": estimated_examples * 2,
            "optimal_examples": estimated_examples * 5
        }

# Example usage
use_case = FineTuningUseCase()
use_case.define_objective(
    objective="Technical support agent for AI development tools",
    behaviors=["helpful", "technically accurate", "code-focused", "step-by-step"],
    domain=["Python programming", "AI/ML libraries", "debugging", "best practices"]
)

requirements = use_case.estimate_data_requirements()
print(f"\nπŸ“Š Data Requirements:")
print(f"Minimum: {requirements['minimum_examples']} examples")
print(f"Recommended: {requirements['recommended_examples']} examples")
print(f"Optimal: {requirements['optimal_examples']} examples")

Data Source Identification

Internal Sources (Highest Quality):

# data_collection/internal_sources.py

class InternalDataCollector:
    """Collect high-quality data from internal sources"""
    
    def __init__(self):
        self.sources = {
            "customer_support": [],
            "documentation": [],
            "expert_knowledge": [],
            "existing_workflows": []
        }
    
    def collect_customer_interactions(self, chat_logs_path: str):
        """
        Extract valuable interactions from customer support logs
        
        This is often your highest-quality data source because it represents
        real user needs and expert responses.
        """
        import json
        import re
        
        valuable_interactions = []
        
        with open(chat_logs_path, 'r') as f:
            logs = json.load(f)
        
        for interaction in logs:
            # Filter for high-quality interactions
            if self._is_high_quality_interaction(interaction):
                formatted_example = {
                    "instruction": interaction["user_message"],
                    "response": interaction["agent_response"],
                    "context": interaction.get("context", ""),
                    "quality_score": self._calculate_quality_score(interaction)
                }
                valuable_interactions.append(formatted_example)
        
        return valuable_interactions
    
    def _is_high_quality_interaction(self, interaction: dict) -> bool:
        """Determine if an interaction is suitable for training"""
        
        # Check message length (not too short, not too long)
        user_msg = interaction["user_message"]
        agent_msg = interaction["agent_response"]
        
        if len(user_msg) < 10 or len(user_msg) > 1000:
            return False
        
        if len(agent_msg) < 20 or len(agent_msg) > 2000:
            return False
        
        # Check for resolution indicators
        satisfaction_indicators = [
            "thank you", "solved", "fixed", "working now", 
            "that helped", "perfect", "exactly what I needed"
        ]
        
        # Check for quality indicators
        if any(indicator in interaction.get("follow_up", "").lower() 
               for indicator in satisfaction_indicators):
            return True
        
        # Check agent response quality
        quality_indicators = [
            "step-by-step", "here's how", "try this", "solution",
            "example", "code", "documentation"
        ]
        
        return any(indicator in agent_msg.lower() for indicator in quality_indicators)
    
    def _calculate_quality_score(self, interaction: dict) -> float:
        """Calculate a quality score for prioritizing examples"""
        score = 0.5  # Base score
        
        # Bonus for resolution
        if "resolved" in interaction.get("status", "").lower():
            score += 0.3
        
        # Bonus for code examples
        if "```" in interaction["agent_response"]:
            score += 0.2
        
        # Bonus for detailed explanations
        if len(interaction["agent_response"]) > 200:
            score += 0.1
        
        return min(score, 1.0)

# Example usage
collector = InternalDataCollector()
training_examples = collector.collect_customer_interactions("support_logs.json")
print(f"πŸ“š Collected {len(training_examples)} high-quality examples")

External Sources (Supplement Your Data):

# data_collection/external_sources.py

import requests
import json
from typing import List, Dict

class ExternalDataCollector:
    """Collect supplementary data from external sources"""
    
    def __init__(self):
        self.rate_limits = {
            "github": 5000,  # requests per hour
            "stackoverflow": 300,  # requests per day
            "reddit": 60  # requests per minute
        }
    
    def collect_github_issues(self, repo: str, labels: List[str]) -> List[Dict]:
        """
        Collect resolved GitHub issues as training examples
        
        Args:
            repo: Repository in format "owner/repo"
            labels: Labels to filter by (e.g., ["bug", "question"])
        """
        examples = []
        
        # GitHub API endpoint
        url = f"https://api.github.com/repos/{repo}/issues"
        
        params = {
            "state": "closed",
            "labels": ",".join(labels),
            "per_page": 100,
            "sort": "updated",
            "direction": "desc"
        }
        
        headers = {
            "Accept": "application/vnd.github.v3+json",
            # Add your GitHub token here for higher rate limits
            # "Authorization": "token YOUR_GITHUB_TOKEN"
        }
        
        try:
            response = requests.get(url, params=params, headers=headers)
            response.raise_for_status()
            
            issues = response.json()
            
            for issue in issues:
                if self._is_valuable_issue(issue):
                    example = {
                        "instruction": f"Help with: {issue['title']}\n\n{issue['body'][:500]}",
                        "response": self._extract_solution(issue),
                        "source": f"GitHub-{repo}",
                        "quality_score": self._score_github_issue(issue)
                    }
                    examples.append(example)
            
            print(f"πŸ“₯ Collected {len(examples)} examples from {repo}")
            return examples
            
        except requests.exceptions.RequestException as e:
            print(f"❌ Error collecting from GitHub: {e}")
            return []
    
    def _is_valuable_issue(self, issue: Dict) -> bool:
        """Check if a GitHub issue is valuable for training"""
        
        # Must have comments (indicating discussion/solution)
        if issue["comments"] < 2:
            return False
        
        # Must be reasonably sized
        if not issue["body"] or len(issue["body"]) < 50:
            return False
        
        # Filter out certain issue types
        skip_keywords = ["duplicate", "invalid", "wontfix", "spam"]
        title_lower = issue["title"].lower()
        
        return not any(keyword in title_lower for keyword in skip_keywords)
    
    def _extract_solution(self, issue: Dict) -> str:
        """Extract the solution from issue comments"""
        # This is a simplified version - in practice, you'd use
        # the GitHub API to fetch comments and identify the accepted solution
        
        return f"Based on the issue '{issue['title']}', here's a structured approach to solve this problem:\n\n[This would contain the actual solution from comments]"
    
    def _score_github_issue(self, issue: Dict) -> float:
        """Score GitHub issue quality"""
        score = 0.5
        
        # More comments usually indicate better discussion
        if issue["comments"] > 5:
            score += 0.2
        
        # Issues with reactions show community engagement
        if issue.get("reactions", {}).get("total_count", 0) > 3:
            score += 0.1
        
        # Longer issues often have more context
        if len(issue.get("body", "")) > 200:
            score += 0.2
        
        return min(score, 1.0)

# Example usage
external_collector = ExternalDataCollector()
github_examples = external_collector.collect_github_issues(
    repo="openai/openai-python",
    labels=["question", "bug"]
)
print(f"Collected {len(github_examples)} examples from GitHub")

Step 2: Data Cleaning and Preprocessing

Raw data needs significant cleaning before it's suitable for training. Here's a comprehensive cleaning pipeline:

Text Cleaning and Normalization

# data_processing/text_cleaner.py

import re
import html
import unicodedata
from typing import List, Dict, Optional
import logging

class TextCleaner:
    """Comprehensive text cleaning for fine-tuning data"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        # Define cleaning patterns
        self.patterns = {
            # Remove HTML tags
            'html_tags': re.compile(r'<[^>]+>'),
            
            # Remove URLs (but preserve in special cases)
            'urls': re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'),
            
            # Remove email addresses
            'emails': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            
            # Remove excessive whitespace
            'excess_whitespace': re.compile(r'\s+'),
            
            # Remove special characters (but preserve important ones)
            'special_chars': re.compile(r'[^\w\s\.\,\!\?\;\:\'\"\-\(\)\[\]\{\}]'),
            
            # Remove repeated characters (e.g., "helloooo" -> "hello")
            'repeated_chars': re.compile(r'(.)\1{2,}')
        }
    
    def clean_text(self, text: str, preserve_code: bool = True) -> str:
        """
        Comprehensive text cleaning pipeline
        
        Args:
            text: Raw text to clean
            preserve_code: Whether to preserve code blocks
            
        Returns:
            Cleaned text suitable for training
        """
        if not text or not isinstance(text, str):
            return ""
        
        original_text = text
        
        # Step 1: Handle code blocks (preserve them)
        code_blocks = []
        if preserve_code:
            text, code_blocks = self._extract_code_blocks(text)
        
        # Step 2: Decode HTML entities
        text = html.unescape(text)
        
        # Step 3: Normalize Unicode characters
        text = unicodedata.normalize('NFKC', text)
        
        # Step 4: Remove HTML tags
        text = self.patterns['html_tags'].sub('', text)
        
        # Step 5: Handle URLs (remove or replace with placeholder)
        text = self._handle_urls(text)
        
        # Step 6: Remove email addresses
        text = self.patterns['emails'].sub('[EMAIL]', text)
        
        # Step 7: Fix repeated characters
        text = self.patterns['repeated_chars'].sub(r'\1\1', text)
        
        # Step 8: Clean excessive whitespace
        text = self.patterns['excess_whitespace'].sub(' ', text)
        
        # Step 9: Remove leading/trailing whitespace
        text = text.strip()
        
        # Step 10: Restore code blocks
        if preserve_code and code_blocks:
            text = self._restore_code_blocks(text, code_blocks)
        
        # Step 11: Validate cleaning quality
        quality_score = self._assess_cleaning_quality(original_text, text)
        
        if quality_score < 0.5:
            self.logger.warning(f"Low cleaning quality score: {quality_score}")
        
        return text
    
    def _extract_code_blocks(self, text: str) -> tuple:
        """Extract and preserve code blocks"""
        code_blocks = []
        pattern = r'```[\s\S]*?```|`[^`]+`'
        
        def replace_code(match):
            code_blocks.append(match.group(0))
            return f"__CODE_BLOCK_{len(code_blocks)-1}__"
        
        cleaned_text = re.sub(pattern, replace_code, text)
        return cleaned_text, code_blocks
    
    def _restore_code_blocks(self, text: str, code_blocks: List[str]) -> str:
        """Restore preserved code blocks"""
        for i, code_block in enumerate(code_blocks):
            text = text.replace(f"__CODE_BLOCK_{i}__", code_block)
        return text
    
    def _handle_urls(self, text: str) -> str:
        """Handle URLs intelligently"""
        def replace_url(match):
            url = match.group(0)
            
            # Preserve documentation URLs
            if any(domain in url for domain in ['docs.', 'documentation', 'github.com']):
                return url
            
            # Replace other URLs with placeholder
            return '[URL]'
        
        return self.patterns['urls'].sub(replace_url, text)
    
    def _assess_cleaning_quality(self, original: str, cleaned: str) -> float:
        """Assess the quality of text cleaning"""
        
        # Check length ratio (shouldn't lose too much content)
        length_ratio = len(cleaned) / max(len(original), 1)
        
        # Check if essential content is preserved
        important_indicators = ['?', '.', '!', 'how', 'what', 'why', 'when']
        original_indicators = sum(1 for indicator in important_indicators 
                                if indicator.lower() in original.lower())
        cleaned_indicators = sum(1 for indicator in important_indicators 
                               if indicator.lower() in cleaned.lower())
        
        content_preservation = (cleaned_indicators / max(original_indicators, 1)) if original_indicators > 0 else 1.0
        
        # Overall quality score
        quality_score = (length_ratio * 0.6) + (content_preservation * 0.4)
        
        return min(quality_score, 1.0)
    
    def batch_clean(self, texts: List[str], preserve_code: bool = True) -> List[str]:
        """Clean a batch of texts efficiently"""
        return [self.clean_text(text, preserve_code) for text in texts]

# Usage example
cleaner = TextCleaner()

# Test cleaning
raw_text = """
<p>Hello!!! This is a test with <strong>HTML</strong> tags and 
some URLs like https://example.com and emails like test@example.com.</p>

Here's some code:
```python
def hello():
    print("Hello, world!")

And some repeated charactersssss!!!! """

cleaned = cleaner.clean_text(raw_text) print("Original:") print(raw_text) print("\nCleaned:") print(cleaned)


### Data Validation and Quality Control

```python
# data_processing/quality_validator.py

from typing import List, Dict, Tuple
import re
import json
from dataclasses import dataclass

@dataclass
class QualityMetrics:
    """Data quality metrics for training examples"""
    length_score: float
    diversity_score: float
    coherence_score: float
    completeness_score: float
    overall_score: float

class DataQualityValidator:
    """Validate and score training data quality"""
    
    def __init__(self):
        self.min_instruction_length = 10
        self.max_instruction_length = 1000
        self.min_response_length = 20
        self.max_response_length = 2000
        
        # Common quality indicators
        self.quality_indicators = {
            'positive': [
                'step-by-step', 'example', 'specifically', 'detailed',
                'solution', 'approach', 'method', 'technique',
                'implementation', 'code', 'documentation'
            ],
            'negative': [
                'unclear', 'confusing', 'maybe', 'probably',
                'not sure', 'might be', 'could be'
            ]
        }
    
    def validate_example(self, example: Dict) -> Tuple[bool, QualityMetrics, List[str]]:
        """
        Validate a single training example
        
        Args:
            example: Dictionary with 'instruction' and 'response' keys
            
        Returns:
            Tuple of (is_valid, quality_metrics, issues)
        """
        issues = []
        
        # Extract instruction and response
        instruction = example.get('instruction', '')
        response = example.get('response', '')
        
        if not instruction or not response:
            issues.append("Missing instruction or response")
            return False, QualityMetrics(0, 0, 0, 0, 0), issues
        
        # Validate lengths
        length_score = self._validate_lengths(instruction, response, issues)
        
        # Check diversity (uniqueness)
        diversity_score = self._check_diversity(instruction, response)
        
        # Check coherence (instruction-response alignment)
        coherence_score = self._check_coherence(instruction, response, issues)
        
        # Check completeness
        completeness_score = self._check_completeness(response, issues)
        
        # Calculate overall score
        overall_score = (
            length_score * 0.2 +
            diversity_score * 0.2 +
            coherence_score * 0.3 +
            completeness_score * 0.3
        )
        
        metrics = QualityMetrics(
            length_score=length_score,
            diversity_score=diversity_score,
            coherence_score=coherence_score,
            completeness_score=completeness_score,
            overall_score=overall_score
        )
        
        # Determine if example is valid
        is_valid = overall_score >= 0.6 and len(issues) == 0
        
        return is_valid, metrics, issues
    
    def _validate_lengths(self, instruction: str, response: str, issues: List[str]) -> float:
        """Validate text lengths"""
        score = 1.0
        
        # Check instruction length
        if len(instruction) < self.min_instruction_length:
            issues.append(f"Instruction too short ({len(instruction)} < {self.min_instruction_length})")
            score *= 0.5
        elif len(instruction) > self.max_instruction_length:
            issues.append(f"Instruction too long ({len(instruction)} > {self.max_instruction_length})")
            score *= 0.7
        
        # Check response length
        if len(response) < self.min_response_length:
            issues.append(f"Response too short ({len(response)} < {self.min_response_length})")
            score *= 0.5
        elif len(response) > self.max_response_length:
            issues.append(f"Response too long ({len(response)} > {self.max_response_length})")
            score *= 0.7
        
        return score
    
    def _check_diversity(self, instruction: str, response: str) -> float:
        """Check lexical diversity of the text"""
        
        # Combine instruction and response
        full_text = f"{instruction} {response}".lower()
        words = re.findall(r'\b\w+\b', full_text)
        
        if len(words) == 0:
            return 0.0
        
        # Calculate unique word ratio
        unique_words = len(set(words))
        total_words = len(words)
        diversity_ratio = unique_words / total_words
        
        # Normalize to 0-1 scale (0.5 is typical for natural text)
        return min(diversity_ratio * 2, 1.0)
    
    def _check_coherence(self, instruction: str, response: str, issues: List[str]) -> float:
        """Check if response addresses the instruction"""
        
        # Extract key terms from instruction
        instruction_words = set(re.findall(r'\b\w+\b', instruction.lower()))
        response_words = set(re.findall(r'\b\w+\b', response.lower()))
        
        # Remove common words
        common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
        instruction_words -= common_words
        response_words -= common_words
        
        if not instruction_words:
            return 0.5  # Neutral score if no meaningful words
        
        # Calculate overlap
        overlap = len(instruction_words & response_words)
        coherence_score = overlap / len(instruction_words)
        
        # Check for quality indicators
        positive_count = sum(1 for indicator in self.quality_indicators['positive'] 
                           if indicator in response.lower())
        negative_count = sum(1 for indicator in self.quality_indicators['negative'] 
                           if indicator in response.lower())
        
        # Adjust score based on quality indicators
        if positive_count > 0:
            coherence_score += 0.2 * positive_count
        if negative_count > 0:
            coherence_score -= 0.3 * negative_count
            issues.append(f"Response contains {negative_count} negative quality indicators")
        
        return min(max(coherence_score, 0.0), 1.0)
    
    def _check_completeness(self, response: str, issues: List[str]) -> float:
        """Check if response is complete and helpful"""
        
        completeness_score = 0.5  # Base score
        
        # Check for complete sentences
        sentences = re.split(r'[.!?]+', response)
        complete_sentences = [s for s in sentences if len(s.strip()) > 5]
        
        if len(complete_sentences) >= 2:
            completeness_score += 0.2
        
        # Check for explanatory elements
        explanatory_patterns = [
            r'\b(because|since|due to|as a result)\b',  # Explanations
            r'\b(first|second|then|next|finally)\b',    # Step-by-step
            r'\b(for example|such as|like)\b',          # Examples
            r'```.*```',                                 # Code examples
            r'\b(solution|approach|method)\b'           # Solution-oriented
        ]
        
        for pattern in explanatory_patterns:
            if re.search(pattern, response, re.IGNORECASE | re.DOTALL):
                completeness_score += 0.1
        
        # Check for incomplete endings
        incomplete_endings = ['...', 'etc.', 'and so on']
        if any(ending in response.lower() for ending in incomplete_endings):
            completeness_score -= 0.2
            issues.append("Response appears incomplete")
        
        return min(completeness_score, 1.0)
    
    def validate_dataset(self, examples: List[Dict]) -> Dict:
        """Validate entire dataset and provide summary"""
        
        valid_examples = []
        invalid_examples = []
        all_metrics = []
        all_issues = []
        
        for i, example in enumerate(examples):
            is_valid, metrics, issues = self.validate_example(example)
            
            all_metrics.append(metrics)
            
            if is_valid:
                valid_examples.append((i, example))
            else:
                invalid_examples.append((i, example, issues))
                all_issues.extend(issues)
        
        # Calculate dataset-level statistics
        avg_metrics = self._calculate_average_metrics(all_metrics)
        
        return {
            'total_examples': len(examples),
            'valid_examples': len(valid_examples),
            'invalid_examples': len(invalid_examples),
            'validity_rate': len(valid_examples) / len(examples),
            'average_metrics': avg_metrics,
            'common_issues': self._analyze_common_issues(all_issues),
            'valid_data': [example for _, example in valid_examples],
            'invalid_data': invalid_examples
        }
    
    def _calculate_average_metrics(self, metrics_list: List[QualityMetrics]) -> Dict:
        """Calculate average quality metrics"""
        if not metrics_list:
            return {}
        
        return {
            'length_score': sum(m.length_score for m in metrics_list) / len(metrics_list),
            'diversity_score': sum(m.diversity_score for m in metrics_list) / len(metrics_list),
            'coherence_score': sum(m.coherence_score for m in metrics_list) / len(metrics_list),
            'completeness_score': sum(m.completeness_score for m in metrics_list) / len(metrics_list),
            'overall_score': sum(m.overall_score for m in metrics_list) / len(metrics_list)
        }
    
    def _analyze_common_issues(self, issues: List[str]) -> Dict[str, int]:
        """Analyze common issues in the dataset"""
        issue_counts = {}
        for issue in issues:
            # Categorize issues
            if "too short" in issue:
                category = "Length Issues - Too Short"
            elif "too long" in issue:
                category = "Length Issues - Too Long"
            elif "incomplete" in issue:
                category = "Completeness Issues"
            elif "negative quality" in issue:
                category = "Quality Issues"
            else:
                category = "Other Issues"
            
            issue_counts[category] = issue_counts.get(category, 0) + 1
        
        return issue_counts

# Usage example
validator = DataQualityValidator()

# Example training data
sample_data = [
    {
        "instruction": "How do I install Python packages?",
        "response": "You can install Python packages using pip. Run 'pip install package_name' in your terminal. For example, to install requests: pip install requests"
    },
    {
        "instruction": "Fix bug",  # Too short
        "response": "Try debugging..."  # Too short and incomplete
    }
]

# Validate the dataset
results = validator.validate_dataset(sample_data)
print(f"πŸ“Š Dataset Validation Results:")
print(f"Total examples: {results['total_examples']}")
print(f"Valid examples: {results['valid_examples']}")
print(f"Validity rate: {results['validity_rate']:.2%}")
print(f"Average quality score: {results['average_metrics']['overall_score']:.2f}")

Step 3: Data Formatting for Different Platforms

Once your data is clean and validated, you need to format it correctly for your chosen fine-tuning platform.

OpenAI Fine-Tuning Format

OpenAI expects data in JSONL format with specific structure:

# data_formatting/openai_formatter.py

import json
import tiktoken
from typing import List, Dict, Optional

class OpenAIFormatter:
    """Format data for OpenAI fine-tuning"""
    
    def __init__(self, model_name: str = "gpt-3.5-turbo"):
        self.model_name = model_name
        self.tokenizer = tiktoken.encoding_for_model(model_name)
        
        # OpenAI fine-tuning limits
        self.max_tokens_per_example = 4096
        self.recommended_examples = {"minimum": 10, "recommended": 50, "optimal": 100}
    
    def format_for_openai(self, examples: List[Dict], output_file: str) -> Dict:
        """
        Format examples for OpenAI fine-tuning
        
        Args:
            examples: List of {"instruction": str, "response": str} dictionaries
            output_file: Output JSONL file path
            
        Returns:
            Formatting statistics and information
        """
        formatted_examples = []
        stats = {
            "total_examples": len(examples),
            "formatted_examples": 0,
            "skipped_examples": 0,
            "total_tokens": 0,
            "average_tokens": 0,
            "issues": []
        }
        
        for i, example in enumerate(examples):
            try:
                formatted_example = self._format_single_example(example)
                
                if formatted_example:
                    # Count tokens
                    token_count = self._count_tokens(formatted_example)
                    
                    if token_count <= self.max_tokens_per_example:
                        formatted_examples.append(formatted_example)
                        stats["formatted_examples"] += 1
                        stats["total_tokens"] += token_count
                    else:
                        stats["skipped_examples"] += 1
                        stats["issues"].append(f"Example {i}: Too many tokens ({token_count})")
                else:
                    stats["skipped_examples"] += 1
                    stats["issues"].append(f"Example {i}: Formatting failed")
                    
            except Exception as e:
                stats["skipped_examples"] += 1
                stats["issues"].append(f"Example {i}: Error - {str(e)}")
        
        # Calculate average tokens
        if stats["formatted_examples"] > 0:
            stats["average_tokens"] = stats["total_tokens"] / stats["formatted_examples"]
        
        # Write to JSONL file
        with open(output_file, 'w', encoding='utf-8') as f:
            for example in formatted_examples:
                f.write(json.dumps(example, ensure_ascii=False) + '\n')
        
        print(f"πŸ“„ OpenAI Format Results:")
        print(f"Formatted: {stats['formatted_examples']}/{stats['total_examples']} examples")
        print(f"Average tokens per example: {stats['average_tokens']:.1f}")
        print(f"Output saved to: {output_file}")
        
        if stats["issues"]:
            print(f"⚠️  Issues found: {len(stats['issues'])}")
            for issue in stats["issues"][:5]:  # Show first 5 issues
                print(f"  - {issue}")
        
        return stats
    
    def _format_single_example(self, example: Dict) -> Optional[Dict]:
        """Format a single example for OpenAI"""
        
        instruction = example.get("instruction", "").strip()
        response = example.get("response", "").strip()
        
        if not instruction or not response:
            return None
        
        # OpenAI fine-tuning format
        formatted = {
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": instruction},
                {"role": "assistant", "content": response}
            ]
        }
        
        return formatted
    
    def _count_tokens(self, example: Dict) -> int:
        """Count tokens in a formatted example"""
        
        total_tokens = 0
        
        for message in example["messages"]:
            # Count tokens for each message
            content_tokens = len(self.tokenizer.encode(message["content"]))
            role_tokens = len(self.tokenizer.encode(message["role"]))
            total_tokens += content_tokens + role_tokens + 4  # 4 tokens for message formatting
        
        return total_tokens + 2  # 2 tokens for message list formatting

# Usage example
formatter = OpenAIFormatter()

# Sample training data
sample_examples = [
    {
        "instruction": "How do I create a Python virtual environment?",
        "response": "You can create a Python virtual environment using: python -m venv myenv. Then activate it with: source myenv/bin/activate (Linux/Mac) or myenv\\Scripts\\activate (Windows)."
    }
]

# Format for OpenAI
stats = formatter.format_for_openai(sample_examples, "training_data.jsonl")

Hugging Face Dataset Format

For Hugging Face Transformers, you'll typically use datasets in JSON or CSV format:

# data_formatting/huggingface_formatter.py

import json
import pandas as pd
from typing import List, Dict
from datasets import Dataset

class HuggingFaceFormatter:
    """Format data for Hugging Face fine-tuning"""
    
    def __init__(self):
        self.supported_formats = ["json", "csv", "dataset"]
    
    def format_for_huggingface(self, examples: List[Dict], 
                             output_format: str = "json",
                             output_file: str = "hf_training_data") -> str:
        """
        Format examples for Hugging Face fine-tuning
        
        Args:
            examples: List of training examples
            output_format: Output format ("json", "csv", or "dataset")
            output_file: Output file name (without extension)
            
        Returns:
            Path to the created file
        """
        
        if output_format not in self.supported_formats:
            raise ValueError(f"Unsupported format. Use: {self.supported_formats}")
        
        # Prepare data
        formatted_data = []
        
        for example in examples:
            # Format for instruction-following
            formatted_example = {
                "input": example.get("instruction", ""),
                "output": example.get("response", ""),
                "instruction": "Please provide a helpful response to the following:",
            }
            
            # Add metadata if available
            if "quality_score" in example:
                formatted_example["quality_score"] = example["quality_score"]
            
            formatted_data.append(formatted_example)
        
        # Save in requested format
        if output_format == "json":
            output_path = f"{output_file}.json"
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(formatted_data, f, indent=2, ensure_ascii=False)
                
        elif output_format == "csv":
            output_path = f"{output_file}.csv"
            df = pd.DataFrame(formatted_data)
            df.to_csv(output_path, index=False, encoding='utf-8')
            
        elif output_format == "dataset":
            output_path = f"{output_file}_dataset"
            dataset = Dataset.from_list(formatted_data)
            dataset.save_to_disk(output_path)
        
        print(f"πŸ“Š Hugging Face format created: {output_path}")
        print(f"Total examples: {len(formatted_data)}")
        
        return output_path

# Usage example
hf_formatter = HuggingFaceFormatter()

# Format for Hugging Face
hf_path = hf_formatter.format_for_huggingface(
    sample_examples, 
    output_format="json",
    output_file="my_agent_training_data"
)

Step 4: Quality Assurance and Testing

Before using your training data, perform final quality assurance checks:

Automated Quality Pipeline

# quality_assurance/qa_pipeline.py

from typing import List, Dict, Tuple
import json
from pathlib import Path

class QualityAssurancePipeline:
    """Comprehensive QA pipeline for training data"""
    
    def __init__(self):
        self.qa_checks = [
            self._check_data_balance,
            self._check_instruction_diversity,
            self._check_response_quality,
            self._check_formatting_consistency,
            self._check_domain_coverage
        ]
    
    def run_qa_pipeline(self, examples: List[Dict]) -> Dict:
        """Run complete QA pipeline"""
        
        qa_report = {
            "total_examples": len(examples),
            "qa_checks": {},
            "overall_score": 0.0,
            "recommendations": [],
            "critical_issues": []
        }
        
        total_score = 0
        
        print("πŸ” Running Quality Assurance Pipeline...")
        
        for check in self.qa_checks:
            check_name = check.__name__.replace("_check_", "")
            print(f"  Running {check_name}...")
            
            try:
                score, issues, recommendations = check(examples)
                
                qa_report["qa_checks"][check_name] = {
                    "score": score,
                    "issues": issues,
                    "recommendations": recommendations
                }
                
                total_score += score
                
                # Collect critical issues (score < 0.5)
                if score < 0.5:
                    qa_report["critical_issues"].extend(issues)
                
                # Collect all recommendations
                qa_report["recommendations"].extend(recommendations)
                
            except Exception as e:
                qa_report["qa_checks"][check_name] = {
                    "score": 0.0,
                    "issues": [f"QA check failed: {str(e)}"],
                    "recommendations": ["Fix QA check implementation"]
                }
        
        # Calculate overall score
        qa_report["overall_score"] = total_score / len(self.qa_checks)
        
        # Generate final recommendations
        qa_report["final_assessment"] = self._generate_final_assessment(qa_report)
        
        return qa_report
    
    def _check_data_balance(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
        """Check if data is balanced across different categories"""
        
        issues = []
        recommendations = []
        
        # Check instruction length distribution
        instruction_lengths = [len(ex.get("instruction", "")) for ex in examples]
        avg_length = sum(instruction_lengths) / len(instruction_lengths)
        
        # Check for extreme outliers
        very_short = sum(1 for length in instruction_lengths if length < 20)
        very_long = sum(1 for length in instruction_lengths if length > 1000)
        
        balance_score = 1.0
        
        if very_short / len(examples) > 0.2:
            balance_score -= 0.3
            issues.append(f"{very_short} instructions are very short (< 20 chars)")
            recommendations.append("Consider expanding short instructions with more context")
        
        if very_long / len(examples) > 0.1:
            balance_score -= 0.2
            issues.append(f"{very_long} instructions are very long (> 1000 chars)")
            recommendations.append("Consider breaking down long instructions")
        
        return max(balance_score, 0.0), issues, recommendations
    
    def _check_instruction_diversity(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
        """Check diversity of instructions"""
        
        issues = []
        recommendations = []
        
        instructions = [ex.get("instruction", "").lower() for ex in examples]
        
        # Check for duplicate instructions
        unique_instructions = len(set(instructions))
        diversity_ratio = unique_instructions / len(instructions)
        
        score = diversity_ratio
        
        if diversity_ratio < 0.8:
            issues.append(f"Low instruction diversity: {diversity_ratio:.2%}")
            recommendations.append("Add more varied instruction formats and phrasings")
        
        # Check for common starting patterns
        common_starters = ["how to", "what is", "can you", "please"]
        starter_counts = {}
        
        for instruction in instructions:
            for starter in common_starters:
                if instruction.startswith(starter):
                    starter_counts[starter] = starter_counts.get(starter, 0) + 1
        
        # If any starter appears in > 30% of instructions
        for starter, count in starter_counts.items():
            if count / len(instructions) > 0.3:
                score *= 0.8
                issues.append(f"'{starter}' appears in {count}/{len(instructions)} instructions")
                recommendations.append(f"Vary instruction beginnings beyond '{starter}'")
        
        return max(score, 0.0), issues, recommendations
    
    def _check_response_quality(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
        """Check quality of responses"""
        
        issues = []
        recommendations = []
        
        responses = [ex.get("response", "") for ex in examples]
        
        # Check for empty or very short responses
        short_responses = sum(1 for resp in responses if len(resp) < 30)
        
        score = 1.0
        
        if short_responses / len(responses) > 0.1:
            score -= 0.4
            issues.append(f"{short_responses} responses are too short (< 30 chars)")
            recommendations.append("Expand short responses with more detail and examples")
        
        # Check for helpful indicators
        helpful_indicators = ["example", "step", "because", "solution", "approach"]
        responses_with_indicators = 0
        
        for response in responses:
            if any(indicator in response.lower() for indicator in helpful_indicators):
                responses_with_indicators += 1
        
        helpful_ratio = responses_with_indicators / len(responses)
        
        if helpful_ratio < 0.5:
            score -= 0.2
            issues.append(f"Only {helpful_ratio:.1%} responses contain helpful indicators")
            recommendations.append("Add more explanatory content, examples, and step-by-step guidance")
        
        return max(score, 0.0), issues, recommendations
    
    def _check_formatting_consistency(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
        """Check formatting consistency"""
        
        issues = []
        recommendations = []
        
        score = 1.0
        
        # Check required fields
        required_fields = ["instruction", "response"]
        
        for i, example in enumerate(examples):
            for field in required_fields:
                if field not in example or not example[field]:
                    score -= 0.1
                    issues.append(f"Example {i}: Missing or empty '{field}'")
        
        if len(issues) > 0:
            recommendations.append("Ensure all examples have required fields: instruction, response")
        
        return max(score, 0.0), issues, recommendations
    
    def _check_domain_coverage(self, examples: List[Dict]) -> Tuple[float, List[str], List[str]]:
        """Check if examples cover intended domain adequately"""
        
        issues = []
        recommendations = []
        
        # This is a simplified check - in practice, you'd use more sophisticated
        # domain analysis based on your specific use case
        
        all_text = " ".join([
            ex.get("instruction", "") + " " + ex.get("response", "") 
            for ex in examples
        ]).lower()
        
        # Check for technical terms (adjust based on your domain)
        technical_terms = ["api", "function", "code", "error", "debug", "install"]
        technical_coverage = sum(1 for term in technical_terms if term in all_text)
        
        score = min(technical_coverage / len(technical_terms), 1.0)
        
        if score < 0.5:
            missing_terms = [term for term in technical_terms if term not in all_text]
            issues.append(f"Limited domain coverage. Missing: {missing_terms}")
            recommendations.append("Add examples covering missing domain areas")
        
        return score, issues, recommendations
    
    def _generate_final_assessment(self, report: Dict) -> str:
        """Generate final assessment and recommendations"""
        
        overall_score = report["overall_score"]
        
        if overall_score >= 0.8:
            return "βœ… High Quality: Dataset is ready for fine-tuning"
        elif overall_score >= 0.6:
            return "⚠️  Good Quality: Consider addressing recommendations before fine-tuning"
        elif overall_score >= 0.4:
            return "🟑 Moderate Quality: Significant improvements needed before fine-tuning"
        else:
            return "❌ Low Quality: Major issues must be resolved before fine-tuning"

# Usage example
qa_pipeline = QualityAssurancePipeline()
qa_report = qa_pipeline.run_qa_pipeline(sample_examples)

print(f"\nπŸ“‹ Quality Assurance Report:")
print(f"Overall Score: {qa_report['overall_score']:.2f}")
print(f"Assessment: {qa_report['final_assessment']}")

if qa_report['critical_issues']:
    print(f"\n🚨 Critical Issues:")
    for issue in qa_report['critical_issues']:
        print(f"  - {issue}")

if qa_report['recommendations']:
    print(f"\nπŸ’‘ Recommendations:")
    for rec in qa_report['recommendations'][:5]:  # Show top 5
        print(f"  - {rec}")

What You've Accomplished

Congratulations! You now have a complete, production-ready data preparation pipeline for fine-tuning LLMs:

  • βœ… Strategic Data Collection - Internal and external source integration
  • βœ… Professional Data Cleaning - Comprehensive text processing and validation
  • βœ… Multi-Platform Formatting - OpenAI and Hugging Face compatibility
  • βœ… Quality Assurance - Automated QA pipeline with scoring and recommendations
  • βœ… Best Practices - Industry-standard approaches to data preparation

Key Features Implemented:

  1. Intelligent Data Collection with quality scoring
  2. Advanced Text Cleaning with code preservation
  3. Comprehensive Validation with detailed metrics
  4. Multi-Platform Support for different fine-tuning services
  5. Automated Quality Assurance with actionable recommendations

What's Next?

In Part 2: Fine-Tuning with OpenAI, you'll learn:

  • Setting up OpenAI fine-tuning jobs
  • Configuring hyperparameters for optimal results
  • Monitoring training progress and metrics
  • Evaluating fine-tuned model performance
  • Deploying and testing your custom model

Quick Reference Commands

# Install required packages
pip install openai tiktoken datasets pandas

# Run data preparation pipeline
python data_preparation_pipeline.py

# Validate dataset quality
python quality_assurance.py --input training_data.json

# Format for OpenAI
python format_openai.py --input clean_data.json --output openai_training.jsonl

Additional Resources

Ready to start fine-tuning with your prepared data? Continue to Part 2: Fine-Tuning with OpenAI to create your specialized AI agent!


Tutorial Navigation


This tutorial is part of our comprehensive Fine-Tuning LLMs series. Quality training data is the foundation of successful fine-tuning - invest the time to get it right, and your specialized AI agent will exceed expectations.

Ad Space

Recommended Tools & Resources

* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.

OpenAI API

AI Platform

Access GPT-4 and other powerful AI models for your agent development.

Pay-per-use

LangChain Plus

Framework

Advanced framework for building applications with large language models.

Free + Paid

Pinecone Vector Database

Database

High-performance vector database for AI applications and semantic search.

Free tier available

AI Agent Development Course

Education

Complete course on building production-ready AI agents from scratch.

$199

πŸ’‘ Pro Tip

Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.

πŸ“š Fine-Tuning LLMs for Custom Agent Behaviors

Part 1 of 5
Series Progress20% Complete
View All Parts in This Series

πŸš€ Join the AgentForge Community

Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.

No spam, ever. Unsubscribe at any time.

Loading conversations...