Creating Custom Data Processors

This guide explains how to implement custom data processors for the LMMS Engine. Data processors are responsible for transforming raw multimodal data (images, audio, videos, text) into tokenized sequences suitable for model training.

Architecture Overview

The LMMS Engine processor hierarchy is designed for flexibility and reusability:

ProcessorConfig
    ↓
BaseProcessor (AeroDataProcessor)
    ↓
Specialized Processors (Qwen2_5-based, Text-only, etc.)
    ↓
YourCustomProcessor

Key Components

  • ProcessorConfig: Configuration container with processor name and type

  • Base Processors: Define common functionality for tokenization and tensor creation

  • Specialized Processors: Handle specific model requirements (Qwen, Llava, etc.)

  • Custom Processors: Your implementation inheriting from appropriate base class

When to Create a Custom Processor

Create a custom processor when you need to:

  1. Support a new model architecture - Different models have different tokenization and formatting requirements

  2. Handle a new modality combination - e.g., text + image + audio + video

  3. Implement custom tokenization logic - Special token handling or chat templates

  4. Add model-specific preprocessing - Image resizing, audio normalization, etc.

Processor Roles

A processor typically handles:

Task

Responsibility

Building

Load tokenizer and media processors from HuggingFace

Processing

Transform raw data into model-compatible tensors

Tokenization

Convert text to token IDs using chat templates

Token Expansion

Expand special media tokens to appropriate lengths

Label Masking

Create training labels with attention masking

Architecture: Three Main Approaches

1. Text-Only Processor (Simplest)

Use when you only need to handle text data:

from lmms_engine.mapping_func import register_processor
from lmms_engine.datasets.processor.config import ProcessorConfig
from transformers import AutoTokenizer

@register_processor("my_text_processor")
class MyTextProcessor:
    def __init__(self, config: ProcessorConfig) -> None:
        self.config = config
    
    def build(self):
        self.tokenizer = AutoTokenizer.from_pretrained(self.config.processor_name)
    
    def process(self, texts: List[str], **kwargs):
        return self.tokenizer(
            texts,
            truncation=True,
            padding='max_length',
            max_length=2048,
            return_tensors='pt'
        )
    
    def save_pretrained(self, path: str):
        self.tokenizer.save_pretrained(path)

2. Audio-Only Processor

For audio modality:

@register_processor("my_audio_processor")
class MyAudioProcessor:
    def __init__(self, config: ProcessorConfig) -> None:
        self.config = config
    
    def build(self):
        # Load audio processor from transformers
        from transformers import AutoFeatureExtractor
        self.audio_processor = AutoFeatureExtractor.from_pretrained(
            self.config.processor_name
        )
        self.tokenizer = AutoTokenizer.from_pretrained(self.config.processor_name)
    
    def process(self, audios: List[np.ndarray], sampling_rate: int, **kwargs):
        # Process audio with feature extractor
        audio_inputs = self.audio_processor(
            audios,
            sampling_rate=sampling_rate,
            return_tensors='pt'
        )
        
        # Tokenize text and integrate audio
        # ... implementation ...
        
        return {
            'input_ids': input_ids,
            'audio_values': audio_inputs['input_features'],
            'labels': labels
        }

3. Multimodal Processor (Most Common)

Handle images, audio, and/or videos - inherit from base processor:

from lmms_engine.datasets.processor.aero_processor import AeroDataProcessor

@register_processor("my_multimodal_processor")
class MyMultimodalProcessor(AeroDataProcessor):
    def _build_processor(self):
        # Load model-specific processor from transformers
        from transformers import MyModelProcessor
        processor = MyModelProcessor.from_pretrained(
            self.config.processor_name
        )
        return processor
    
    def process(
        self,
        images: List[Image.Image],
        hf_messages,
        audios: Optional[List[np.ndarray]] = None,
        sampling_rate: Optional[int] = None,
        videos=None,
        add_system_prompt=True,
        **kwargs,
    ):
        # Process each modality
        image_inputs = self._process_images(images)
        video_inputs = self._process_videos(videos)
        audio_inputs = self._process_audio(audios, sampling_rate)
        
        # Tokenize messages and expand media tokens
        inputs = self._tokenize_and_expand(
            hf_messages,
            image_inputs,
            video_inputs,
            audio_inputs,
            add_system_prompt
        )
        
        # Merge all inputs
        inputs.update(image_inputs)
        inputs.update(video_inputs)
        inputs.update(audio_inputs)
        
        return inputs

Quick Start: Creating a Vision-Only Processor

Here’s a complete minimal example for a vision processor:

Step 1: Create the Processor Class

from typing import List, Optional
import torch
import numpy as np
from PIL import Image
from transformers import Qwen2_5_VLProcessor

from lmms_engine.mapping_func import register_processor
from lmms_engine.datasets.processor.config import ProcessorConfig
from lmms_engine.datasets.processor.base_qwen2_5_processor import BaseQwen2_5_DataProcessor


@register_processor("qwen_vision_simple")
class QwenVisionSimpleProcessor(BaseQwen2_5_DataProcessor):
    """Simplified vision processor for Qwen2.5-VL model."""
    
    def _build_processor(self):
        """Build the underlying Qwen processor."""
        processor = Qwen2_5_VLProcessor.from_pretrained(
            self.config.processor_name
        )
        
        # Customize processor parameters if needed
        if hasattr(processor, 'image_processor'):
            image_max_pixels = self.config.extra_kwargs.get("image_max_pixels")
            if image_max_pixels:
                processor.image_processor.max_pixels = image_max_pixels
        
        return processor
    
    def process(
        self,
        images: List[Image],
        hf_messages,
        audios: Optional[List[np.ndarray]] = None,
        sampling_rate: Optional[int] = None,
        videos=None,
        system_message: str = "You are a helpful assistant",
        add_system_prompt=True,
        add_generation_prompt=False,
        **kwargs,
    ):
        """Process vision data."""
        # Don't support audio in this simple example
        assert audios is None, "This processor does not support audio"
        
        # Call parent implementation which handles vision + video
        return super().process(
            images=images,
            hf_messages=hf_messages,
            audios=None,
            videos=videos,
            system_message=system_message,
            add_system_prompt=add_system_prompt,
            add_generation_prompt=add_generation_prompt,
            **kwargs,
        )

Step 2: Register Configuration

Add to your training config:

processor_config:
  processor_type: qwen_vision_simple
  processor_name: Qwen/Qwen2.5-VL-7B
  extra_kwargs:
    image_max_pixels: 1000000

Required Methods

Every processor must implement these methods:

1. __init__(config: ProcessorConfig)

Initialize processor with configuration:

def __init__(self, config: ProcessorConfig) -> None:
    self.config = config
    self.processor = None  # Will be set in build()

2. build()

Load model components (tokenizer, image processor, etc.):

def build(self):
    self.processor = self._build_processor()
    # Optional: set custom chat template
    self.processor.chat_template = self.chat_template_custom

3. _build_processor()

Load the actual processor from transformers:

def _build_processor(self):
    from transformers import Qwen3VLProcessor
    return Qwen3VLProcessor.from_pretrained(self.config.processor_name)

4. process(images, hf_messages, ...)

Main method that transforms data:

def process(
    self,
    images: List[Image.Image],
    hf_messages,
    audios: Optional[List[np.ndarray]] = None,
    sampling_rate: Optional[int] = None,
    videos=None,
    **kwargs,
):
    """
    Transform multimodal data into model-ready tensors.
    
    Args:
        images: List of PIL Images or None
        hf_messages: Messages in HuggingFace format with roles
        audios: List of audio arrays or None
        sampling_rate: Audio sampling rate
        videos: List of video frames or None
        **kwargs: Additional arguments
    
    Returns:
        Dictionary with keys:
        - input_ids: Token IDs tensor
        - attention_mask: Attention mask tensor
        - labels: Training labels tensor
        - pixel_values: Image tensors (if images present)
        - audio_values: Audio tensors (if audio present)
        - video_pixel_values: Video tensors (if videos present)
    """
    # Implementation
    pass

5. save_pretrained(path: str)

Save processor state:

def save_pretrained(self, save_directory: str):
    if not hasattr(self, "processor"):
        raise ValueError("Processor has not been built yet")
    self.processor.save_pretrained(save_directory)

Message Format

Input messages should follow the HuggingFace format:

hf_messages = [
    {
        "role": "user",
        "content": [
            {"type": "image_url", "image_url": {"url": "path/to/image.jpg"}},
            {"type": "text", "text": "What's in this image?"}
        ]
    },
    {
        "role": "assistant",
        "content": [
            {"type": "text", "text": "This image shows..."}
        ]
    }
]

Content types supported:

  • text - Text content

  • image_url - Image content

  • audio_url - Audio content

  • video_url - Video content

Key Implementation Patterns

Pattern 1: Simple Inheritance from Base

When your model is a variant of Qwen2.5-VL:

@register_processor("my_variant")
class MyVariantProcessor(BaseQwen2_5_DataProcessor):
    def _build_processor(self):
        processor = MyProcessor.from_pretrained(self.config.processor_name)
        # Custom configuration
        return processor
    
    # Usually no need to override process() if parent handles it

Pattern 2: Custom Processing Logic

When you need special tokenization:

def get_qwen_template_labels(
    self,
    hf_messages,
    num_image_tokens,
    num_video_tokens,
    system_message="You are a helpful assistant",
    add_system_prompt=True,
):
    """Custom label generation with your logic."""
    input_ids = []
    labels = []
    
    # Add system prompt if needed
    if add_system_prompt:
        system_tokens = self.tokenizer.encode(system_message)
        input_ids.extend(system_tokens)
        labels.extend([-100] * len(system_tokens))  # Mask system
    
    # Process each message
    for message in hf_messages:
        role = message["role"]
        
        # Encode message content
        message_tokens = self._encode_message(message)
        input_ids.extend(message_tokens)
        
        # Label masking: user messages masked, assistant unmasked
        if role in ["user", "system"]:
            labels.extend([-100] * len(message_tokens))
        else:
            labels.extend(message_tokens)
    
    return {
        "input_ids": torch.tensor(input_ids, dtype=torch.long),
        "labels": torch.tensor(labels, dtype=torch.long),
    }

Pattern 3: Token Expansion

Handle special media tokens that need expansion:

def _expand_encode_id_image_tokens(
    self,
    encode_id: List[int],
    image_token_num: List[int],
    start_from: int = 0,
):
    """
    Expand image placeholder tokens to actual token count.
    
    Example: If <|image|> represents 196 tokens, expand it to 196 copies.
    """
    image_pos = [i for i, x in enumerate(encode_id) if x == self.image_token_id]
    expanded_encode_id = []
    prev = 0
    
    for idx, pos in enumerate(image_pos):
        # Add tokens before image position
        expanded_encode_id.extend(encode_id[prev:pos])
        
        # Expand the image token
        token_count = image_token_num[idx + start_from]
        expanded_encode_id.extend([self.image_token_id] * token_count)
        
        prev = pos + 1
    
    # Add remaining tokens
    expanded_encode_id.extend(encode_id[prev:])
    
    return expanded_encode_id, len(image_pos)

Properties and Utilities

Define useful properties for token access:

@property
def image_token_id(self):
    """Get the special token ID for images."""
    image_token = getattr(self.processor, "image_token", None)
    if image_token is None:
        return None
    return self.processor.tokenizer.convert_tokens_to_ids(image_token)

@property
def audio_token_id(self):
    """Get the special token ID for audio."""
    audio_token = getattr(self.processor, "audio_token", None)
    if audio_token is None:
        return None
    return self.processor.tokenizer.convert_tokens_to_ids(audio_token)

@property
def video_token_id(self):
    """Get the special token ID for videos."""
    video_token = getattr(self.processor, "video_token", None)
    if video_token is None:
        return None
    return self.processor.tokenizer.convert_tokens_to_ids(video_token)

@property
def tokenizer(self):
    """Get the underlying tokenizer."""
    return self.processor.tokenizer

@property
def sampling_rate(self):
    """Get audio sampling rate."""
    if hasattr(self.processor, 'audio_processor'):
        return self.processor.audio_processor.sampling_rate
    return None

Chat Templates

Customize how conversations are formatted:

@property
def chat_template_custom(self):
    """Define how messages are formatted for the model."""
    return (
        "{% for message in messages %}"
        "<|im_start|>{{ message['role'] }}\n"
        "{% if message['content'] is string %}"
        "{{ message['content'] }}<|im_end|>\n"
        "{% else %}"
        "{% for content in message['content'] %}"
        "{% if content['type'] == 'text' %}"
        "{{ content['text'] }}"
        "{% elif content['type'] == 'image_url' %}"
        "<|image|>"
        "{% elif content['type'] == 'audio_url' %}"
        "<|audio|>"
        "{% endif %}"
        "{% endfor %}"
        "<|im_end|>\n"
        "{% endif %}"
        "{% endfor %}"
    )

ProcessorConfig

Configure your processor in dataset config:

processor_config:
  processor_type: my_custom_processor      # Name used in @register_processor
  processor_name: org/model-name           # HF model identifier
  extra_kwargs:                            # Optional custom parameters
    image_max_pixels: 1000000
    video_max_frames: 100
    custom_param: value

Access extra kwargs in your processor:

def _build_processor(self):
    max_pixels = self.config.extra_kwargs.get("image_max_pixels", 1000000)
    # Use max_pixels for configuration

Testing Your Processor

Test your implementation:

from lmms_engine.datasets.processor.config import ProcessorConfig
from PIL import Image
import numpy as np

# Setup
config = ProcessorConfig(
    processor_type="my_processor",
    processor_name="Qwen/Qwen2.5-VL-7B"
)
processor = MyProcessor(config)
processor.build()

# Test with sample data
images = [Image.new('RGB', (224, 224))]
messages = [
    {
        "role": "user",
        "content": [
            {"type": "image_url", "image_url": {"url": "placeholder"}},
            {"type": "text", "text": "What is this?"}
        ]
    }
]

# Process
output = processor.process(
    images=images,
    hf_messages=messages
)

# Verify output
assert "input_ids" in output
assert "labels" in output
assert output["input_ids"].shape[0] > 0
print("✓ Processor test passed")

Best Practices

  1. Inherit from appropriate base class

    • Text-only: Inherit from no base

    • Multimodal: Inherit from AeroDataProcessor or BaseQwen2_5_DataProcessor

  2. Always implement _build_processor()

    • This is where model-specific loading happens

    • Keep it focused and readable

  3. Document modality support

    • Clearly state what your processor supports in docstrings

    • Use assertions to validate inputs

  4. Handle edge cases

    if images is None:
        image_inputs = {}
    else:
        # Process images
    
  5. Use the @register_processor decorator

    • This makes your processor discoverable

    • Use lowercase with underscores for processor names

  6. Mask appropriately for training

    • User/system messages typically masked (-100)

    • Assistant responses unmasked for training

    • Special tokens handled carefully

  7. Return consistent output format

    • Always include: input_ids, attention_mask, labels

    • Include modality-specific keys when applicable

Common Issues and Solutions

Issue: KeyError for ‘input_ids’

Cause: Processor not returning required keys Solution: Ensure process() returns dict with input_ids and labels

Issue: Token count mismatch

Cause: Media token expansion incorrect Solution: Verify token counting logic and that _expand_encode_id_* methods are correct

Issue: Chat template not applied

Cause: Forgot to set template in build() Solution: Add self.processor.chat_template = self.chat_template_custom in build()

Issue: Audio/Video not processed

Cause: Missing processor components Solution: Verify _build_processor() loads all needed components

Advanced: Custom Collator Integration

Processors often work with custom collators:

# In your dataset
def get_collator(self):
    return CustomCollator(self.processor)

# Define collator to work with processor outputs
class CustomCollator:
    def __init__(self, processor):
        self.processor = processor
    
    def __call__(self, batch):
        # Batch is list of processor outputs
        # Stack tensors appropriately
        pass

See Also