Creating Custom Data Processors
This guide explains how to implement custom data processors for the LMMS Engine. Data processors are responsible for transforming raw multimodal data (images, audio, videos, text) into tokenized sequences suitable for model training.
Architecture Overview
The LMMS Engine processor hierarchy is designed for flexibility and reusability:
ProcessorConfig
↓
BaseProcessor (AeroDataProcessor)
↓
Specialized Processors (Qwen2_5-based, Text-only, etc.)
↓
YourCustomProcessor
Key Components
ProcessorConfig: Configuration container with processor name and type
Base Processors: Define common functionality for tokenization and tensor creation
Specialized Processors: Handle specific model requirements (Qwen, Llava, etc.)
Custom Processors: Your implementation inheriting from appropriate base class
When to Create a Custom Processor
Create a custom processor when you need to:
Support a new model architecture - Different models have different tokenization and formatting requirements
Handle a new modality combination - e.g., text + image + audio + video
Implement custom tokenization logic - Special token handling or chat templates
Add model-specific preprocessing - Image resizing, audio normalization, etc.
Processor Roles
A processor typically handles:
Task |
Responsibility |
|---|---|
Building |
Load tokenizer and media processors from HuggingFace |
Processing |
Transform raw data into model-compatible tensors |
Tokenization |
Convert text to token IDs using chat templates |
Token Expansion |
Expand special media tokens to appropriate lengths |
Label Masking |
Create training labels with attention masking |
Architecture: Three Main Approaches
1. Text-Only Processor (Simplest)
Use when you only need to handle text data:
from lmms_engine.mapping_func import register_processor
from lmms_engine.datasets.processor.config import ProcessorConfig
from transformers import AutoTokenizer
@register_processor("my_text_processor")
class MyTextProcessor:
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
def build(self):
self.tokenizer = AutoTokenizer.from_pretrained(self.config.processor_name)
def process(self, texts: List[str], **kwargs):
return self.tokenizer(
texts,
truncation=True,
padding='max_length',
max_length=2048,
return_tensors='pt'
)
def save_pretrained(self, path: str):
self.tokenizer.save_pretrained(path)
2. Audio-Only Processor
For audio modality:
@register_processor("my_audio_processor")
class MyAudioProcessor:
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
def build(self):
# Load audio processor from transformers
from transformers import AutoFeatureExtractor
self.audio_processor = AutoFeatureExtractor.from_pretrained(
self.config.processor_name
)
self.tokenizer = AutoTokenizer.from_pretrained(self.config.processor_name)
def process(self, audios: List[np.ndarray], sampling_rate: int, **kwargs):
# Process audio with feature extractor
audio_inputs = self.audio_processor(
audios,
sampling_rate=sampling_rate,
return_tensors='pt'
)
# Tokenize text and integrate audio
# ... implementation ...
return {
'input_ids': input_ids,
'audio_values': audio_inputs['input_features'],
'labels': labels
}
3. Multimodal Processor (Most Common)
Handle images, audio, and/or videos - inherit from base processor:
from lmms_engine.datasets.processor.aero_processor import AeroDataProcessor
@register_processor("my_multimodal_processor")
class MyMultimodalProcessor(AeroDataProcessor):
def _build_processor(self):
# Load model-specific processor from transformers
from transformers import MyModelProcessor
processor = MyModelProcessor.from_pretrained(
self.config.processor_name
)
return processor
def process(
self,
images: List[Image.Image],
hf_messages,
audios: Optional[List[np.ndarray]] = None,
sampling_rate: Optional[int] = None,
videos=None,
add_system_prompt=True,
**kwargs,
):
# Process each modality
image_inputs = self._process_images(images)
video_inputs = self._process_videos(videos)
audio_inputs = self._process_audio(audios, sampling_rate)
# Tokenize messages and expand media tokens
inputs = self._tokenize_and_expand(
hf_messages,
image_inputs,
video_inputs,
audio_inputs,
add_system_prompt
)
# Merge all inputs
inputs.update(image_inputs)
inputs.update(video_inputs)
inputs.update(audio_inputs)
return inputs
Quick Start: Creating a Vision-Only Processor
Here’s a complete minimal example for a vision processor:
Step 1: Create the Processor Class
from typing import List, Optional
import torch
import numpy as np
from PIL import Image
from transformers import Qwen2_5_VLProcessor
from lmms_engine.mapping_func import register_processor
from lmms_engine.datasets.processor.config import ProcessorConfig
from lmms_engine.datasets.processor.base_qwen2_5_processor import BaseQwen2_5_DataProcessor
@register_processor("qwen_vision_simple")
class QwenVisionSimpleProcessor(BaseQwen2_5_DataProcessor):
"""Simplified vision processor for Qwen2.5-VL model."""
def _build_processor(self):
"""Build the underlying Qwen processor."""
processor = Qwen2_5_VLProcessor.from_pretrained(
self.config.processor_name
)
# Customize processor parameters if needed
if hasattr(processor, 'image_processor'):
image_max_pixels = self.config.extra_kwargs.get("image_max_pixels")
if image_max_pixels:
processor.image_processor.max_pixels = image_max_pixels
return processor
def process(
self,
images: List[Image],
hf_messages,
audios: Optional[List[np.ndarray]] = None,
sampling_rate: Optional[int] = None,
videos=None,
system_message: str = "You are a helpful assistant",
add_system_prompt=True,
add_generation_prompt=False,
**kwargs,
):
"""Process vision data."""
# Don't support audio in this simple example
assert audios is None, "This processor does not support audio"
# Call parent implementation which handles vision + video
return super().process(
images=images,
hf_messages=hf_messages,
audios=None,
videos=videos,
system_message=system_message,
add_system_prompt=add_system_prompt,
add_generation_prompt=add_generation_prompt,
**kwargs,
)
Step 2: Register Configuration
Add to your training config:
processor_config:
processor_type: qwen_vision_simple
processor_name: Qwen/Qwen2.5-VL-7B
extra_kwargs:
image_max_pixels: 1000000
Required Methods
Every processor must implement these methods:
1. __init__(config: ProcessorConfig)
Initialize processor with configuration:
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
self.processor = None # Will be set in build()
2. build()
Load model components (tokenizer, image processor, etc.):
def build(self):
self.processor = self._build_processor()
# Optional: set custom chat template
self.processor.chat_template = self.chat_template_custom
3. _build_processor()
Load the actual processor from transformers:
def _build_processor(self):
from transformers import Qwen3VLProcessor
return Qwen3VLProcessor.from_pretrained(self.config.processor_name)
4. process(images, hf_messages, ...)
Main method that transforms data:
def process(
self,
images: List[Image.Image],
hf_messages,
audios: Optional[List[np.ndarray]] = None,
sampling_rate: Optional[int] = None,
videos=None,
**kwargs,
):
"""
Transform multimodal data into model-ready tensors.
Args:
images: List of PIL Images or None
hf_messages: Messages in HuggingFace format with roles
audios: List of audio arrays or None
sampling_rate: Audio sampling rate
videos: List of video frames or None
**kwargs: Additional arguments
Returns:
Dictionary with keys:
- input_ids: Token IDs tensor
- attention_mask: Attention mask tensor
- labels: Training labels tensor
- pixel_values: Image tensors (if images present)
- audio_values: Audio tensors (if audio present)
- video_pixel_values: Video tensors (if videos present)
"""
# Implementation
pass
5. save_pretrained(path: str)
Save processor state:
def save_pretrained(self, save_directory: str):
if not hasattr(self, "processor"):
raise ValueError("Processor has not been built yet")
self.processor.save_pretrained(save_directory)
Message Format
Input messages should follow the HuggingFace format:
hf_messages = [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "path/to/image.jpg"}},
{"type": "text", "text": "What's in this image?"}
]
},
{
"role": "assistant",
"content": [
{"type": "text", "text": "This image shows..."}
]
}
]
Content types supported:
text- Text contentimage_url- Image contentaudio_url- Audio contentvideo_url- Video content
Key Implementation Patterns
Pattern 1: Simple Inheritance from Base
When your model is a variant of Qwen2.5-VL:
@register_processor("my_variant")
class MyVariantProcessor(BaseQwen2_5_DataProcessor):
def _build_processor(self):
processor = MyProcessor.from_pretrained(self.config.processor_name)
# Custom configuration
return processor
# Usually no need to override process() if parent handles it
Pattern 2: Custom Processing Logic
When you need special tokenization:
def get_qwen_template_labels(
self,
hf_messages,
num_image_tokens,
num_video_tokens,
system_message="You are a helpful assistant",
add_system_prompt=True,
):
"""Custom label generation with your logic."""
input_ids = []
labels = []
# Add system prompt if needed
if add_system_prompt:
system_tokens = self.tokenizer.encode(system_message)
input_ids.extend(system_tokens)
labels.extend([-100] * len(system_tokens)) # Mask system
# Process each message
for message in hf_messages:
role = message["role"]
# Encode message content
message_tokens = self._encode_message(message)
input_ids.extend(message_tokens)
# Label masking: user messages masked, assistant unmasked
if role in ["user", "system"]:
labels.extend([-100] * len(message_tokens))
else:
labels.extend(message_tokens)
return {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"labels": torch.tensor(labels, dtype=torch.long),
}
Pattern 3: Token Expansion
Handle special media tokens that need expansion:
def _expand_encode_id_image_tokens(
self,
encode_id: List[int],
image_token_num: List[int],
start_from: int = 0,
):
"""
Expand image placeholder tokens to actual token count.
Example: If <|image|> represents 196 tokens, expand it to 196 copies.
"""
image_pos = [i for i, x in enumerate(encode_id) if x == self.image_token_id]
expanded_encode_id = []
prev = 0
for idx, pos in enumerate(image_pos):
# Add tokens before image position
expanded_encode_id.extend(encode_id[prev:pos])
# Expand the image token
token_count = image_token_num[idx + start_from]
expanded_encode_id.extend([self.image_token_id] * token_count)
prev = pos + 1
# Add remaining tokens
expanded_encode_id.extend(encode_id[prev:])
return expanded_encode_id, len(image_pos)
Properties and Utilities
Define useful properties for token access:
@property
def image_token_id(self):
"""Get the special token ID for images."""
image_token = getattr(self.processor, "image_token", None)
if image_token is None:
return None
return self.processor.tokenizer.convert_tokens_to_ids(image_token)
@property
def audio_token_id(self):
"""Get the special token ID for audio."""
audio_token = getattr(self.processor, "audio_token", None)
if audio_token is None:
return None
return self.processor.tokenizer.convert_tokens_to_ids(audio_token)
@property
def video_token_id(self):
"""Get the special token ID for videos."""
video_token = getattr(self.processor, "video_token", None)
if video_token is None:
return None
return self.processor.tokenizer.convert_tokens_to_ids(video_token)
@property
def tokenizer(self):
"""Get the underlying tokenizer."""
return self.processor.tokenizer
@property
def sampling_rate(self):
"""Get audio sampling rate."""
if hasattr(self.processor, 'audio_processor'):
return self.processor.audio_processor.sampling_rate
return None
Chat Templates
Customize how conversations are formatted:
@property
def chat_template_custom(self):
"""Define how messages are formatted for the model."""
return (
"{% for message in messages %}"
"<|im_start|>{{ message['role'] }}\n"
"{% if message['content'] is string %}"
"{{ message['content'] }}<|im_end|>\n"
"{% else %}"
"{% for content in message['content'] %}"
"{% if content['type'] == 'text' %}"
"{{ content['text'] }}"
"{% elif content['type'] == 'image_url' %}"
"<|image|>"
"{% elif content['type'] == 'audio_url' %}"
"<|audio|>"
"{% endif %}"
"{% endfor %}"
"<|im_end|>\n"
"{% endif %}"
"{% endfor %}"
)
ProcessorConfig
Configure your processor in dataset config:
processor_config:
processor_type: my_custom_processor # Name used in @register_processor
processor_name: org/model-name # HF model identifier
extra_kwargs: # Optional custom parameters
image_max_pixels: 1000000
video_max_frames: 100
custom_param: value
Access extra kwargs in your processor:
def _build_processor(self):
max_pixels = self.config.extra_kwargs.get("image_max_pixels", 1000000)
# Use max_pixels for configuration
Testing Your Processor
Test your implementation:
from lmms_engine.datasets.processor.config import ProcessorConfig
from PIL import Image
import numpy as np
# Setup
config = ProcessorConfig(
processor_type="my_processor",
processor_name="Qwen/Qwen2.5-VL-7B"
)
processor = MyProcessor(config)
processor.build()
# Test with sample data
images = [Image.new('RGB', (224, 224))]
messages = [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "placeholder"}},
{"type": "text", "text": "What is this?"}
]
}
]
# Process
output = processor.process(
images=images,
hf_messages=messages
)
# Verify output
assert "input_ids" in output
assert "labels" in output
assert output["input_ids"].shape[0] > 0
print("✓ Processor test passed")
Best Practices
Inherit from appropriate base class
Text-only: Inherit from no base
Multimodal: Inherit from
AeroDataProcessororBaseQwen2_5_DataProcessor
Always implement
_build_processor()This is where model-specific loading happens
Keep it focused and readable
Document modality support
Clearly state what your processor supports in docstrings
Use assertions to validate inputs
Handle edge cases
if images is None: image_inputs = {} else: # Process images
Use the @register_processor decorator
This makes your processor discoverable
Use lowercase with underscores for processor names
Mask appropriately for training
User/system messages typically masked (-100)
Assistant responses unmasked for training
Special tokens handled carefully
Return consistent output format
Always include:
input_ids,attention_mask,labelsInclude modality-specific keys when applicable
Common Issues and Solutions
Issue: KeyError for ‘input_ids’
Cause: Processor not returning required keys
Solution: Ensure process() returns dict with input_ids and labels
Issue: Token count mismatch
Cause: Media token expansion incorrect
Solution: Verify token counting logic and that _expand_encode_id_* methods are correct
Issue: Chat template not applied
Cause: Forgot to set template in build()
Solution: Add self.processor.chat_template = self.chat_template_custom in build()
Issue: Audio/Video not processed
Cause: Missing processor components
Solution: Verify _build_processor() loads all needed components
Advanced: Custom Collator Integration
Processors often work with custom collators:
# In your dataset
def get_collator(self):
return CustomCollator(self.processor)
# Define collator to work with processor outputs
class CustomCollator:
def __init__(self, processor):
self.processor = processor
def __call__(self, batch):
# Batch is list of processor outputs
# Stack tensors appropriately
pass