Processors
Processors transform, validate, or control messages as they pass through an agent. They run at specific points in the agent's execution pipeline, allowing you to modify inputs before they reach the language model or outputs before they're returned to users.
Processors are configured as:
inputProcessors: Run before messages reach the language model.outputProcessors: Run after the language model generates a response, but before it's returned to users.
You can use individual Processor objects or compose them into workflows using Mastra's workflow primitives. Workflows give you advanced control over processor execution order, parallel processing, and conditional logic.
Some processors implement both input and output logic and can be used in either array depending on where the transformation should occur.
When to use processorsDirect link to When to use processors
Use processors to:
- Normalize or validate user input
- Add guardrails to your agent
- Detect and prevent prompt injection or jailbreak attempts
- Moderate content for safety or compliance
- Transform messages (e.g., translate languages, filter tool calls)
- Limit token usage or message history length
- Redact sensitive information (PII)
- Apply custom business logic to messages
Mastra includes several processors for common use cases. You can also create custom processors for application-specific requirements.
Adding processors to an agentDirect link to Adding processors to an agent
Import and instantiate the processor, then pass it to the agent's inputProcessors or outputProcessors array:
import { openai } from "@ai-sdk/openai";
import { Agent } from "@mastra/core/agent";
import { ModerationProcessor } from "@mastra/core/processors";
export const moderatedAgent = new Agent({
name: "moderated-agent",
instructions: "You are a helpful assistant",
model: openai("gpt-4o-mini"),
inputProcessors: [
new ModerationProcessor({
model: openai("gpt-4.1-nano"),
categories: ["hate", "harassment", "violence"],
threshold: 0.7,
strategy: "block",
}),
],
});
Execution orderDirect link to Execution order
Processors run in the order they appear in the array:
inputProcessors: [
new UnicodeNormalizer(),
new PromptInjectionDetector(),
new ModerationProcessor(),
];
For output processors, the order determines the sequence of transformations applied to the model's response.
With memory enabledDirect link to With memory enabled
When memory is enabled on an agent, memory processors are automatically added to the pipeline:
Input processors:
[Memory Processors] → [Your inputProcessors]
Memory loads conversation history first, then your processors run.
Output processors:
[Your outputProcessors] → [Memory Processors]
Your processors run first, then memory persists messages.
This ordering ensures that if your output guardrail calls abort(), memory processors are skipped and no messages are saved. See Memory Processors for details.
Creating custom processorsDirect link to Creating custom processors
Custom processors implement the Processor interface:
Custom input processorDirect link to Custom input processor
import type {
Processor,
MastraDBMessage,
RequestContext,
} from "@mastra/core";
export class CustomInputProcessor implements Processor {
id = "custom-input";
async processInput({
messages,
systemMessages,
context,
}: {
messages: MastraDBMessage[];
systemMessages: CoreMessage[];
context: RequestContext;
}): Promise<MastraDBMessage[]> {
// Transform messages before they reach the LLM
return messages.map((msg) => ({
...msg,
content: {
...msg.content,
content: msg.content.content.toLowerCase(),
},
}));
}
}
The processInput method receives:
messages: User and assistant messages (not system messages)systemMessages: All system messages (agent instructions, memory context, user-provided system prompts)messageList: The full MessageList instance for advanced use casesabort: Function to stop processing and return earlyrequestContext: Execution metadata likethreadIdandresourceId
The method can return:
MastraDBMessage[]— Transformed messages array (backward compatible){ messages: MastraDBMessage[]; systemMessages: CoreMessage[] }— Both messages and modified system messages
The framework handles both return formats, so modifying system messages is optional and existing processors continue to work.
Modifying system messagesDirect link to Modifying system messages
To modify system messages (e.g., trim verbose prompts for smaller models), return an object with both messages and systemMessages:
import type { Processor, CoreMessage, MastraDBMessage } from "@mastra/core";
export class SystemTrimmer implements Processor {
id = "system-trimmer";
async processInput({
messages,
systemMessages,
}): Promise<{ messages: MastraDBMessage[]; systemMessages: CoreMessage[] }> {
// Trim system messages for smaller models
const trimmedSystemMessages = systemMessages.map((msg) => ({
...msg,
content:
typeof msg.content === "string"
? msg.content.substring(0, 500)
: msg.content,
}));
return { messages, systemMessages: trimmedSystemMessages };
}
}
This is useful for:
- Trimming verbose system prompts for models with smaller context windows
- Filtering or modifying semantic recall content to prevent "prompt too long" errors
- Dynamically adjusting system instructions based on the conversation
Per-step processing with processInputStepDirect link to Per-step processing with processInputStep
While processInput runs once at the start of agent execution, processInputStep runs at each step of the agentic loop (including tool call continuations). This enables per-step configuration changes like dynamic model switching or tool choice modifications.
import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core";
export class DynamicModelProcessor implements Processor {
id = "dynamic-model";
async processInputStep({
stepNumber,
model,
toolChoice,
messageList,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}
// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: "none" };
}
// No changes for other steps
return {};
}
}
The processInputStep method receives:
stepNumber: Current step in the agentic loop (0-indexed)steps: Results from previous stepsmessages: Current messages snapshot (read-only)systemMessages: Current system messages (read-only)messageList: The full MessageList instance for mutationsmodel: Current model being usedtools: Current tools available for this steptoolChoice: Current tool choice settingactiveTools: Currently active toolsproviderOptions: Provider-specific optionsmodelSettings: Model settings like temperaturestructuredOutput: Structured output configuration
The method can return any combination of:
model: Change the model for this steptools: Replace or add tools (use spread to merge:{ tools: { ...tools, newTool } })toolChoice: Change tool selection behavioractiveTools: Filter which tools are availablemessages: Replace messages (applied to messageList)systemMessages: Replace all system messagesproviderOptions: Modify provider optionsmodelSettings: Modify model settingsstructuredOutput: Modify structured output configuration
Using prepareStep callbackDirect link to Using prepareStep callback
For simpler per-step logic, you can use the prepareStep callback on generate() or stream() instead of creating a full processor:
await agent.generate({
prompt: "Complex task",
prepareStep: async ({ stepNumber, model }) => {
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}
if (stepNumber > 5) {
return { toolChoice: "none" };
}
},
});
Custom output processorDirect link to Custom output processor
import type {
Processor,
MastraDBMessage,
RequestContext,
} from "@mastra/core";
export class CustomOutputProcessor implements Processor {
id = "custom-output";
async processOutputResult({
messages,
context,
}: {
messages: MastraDBMessage[];
context: RequestContext;
}): Promise<MastraDBMessage[]> {
// Transform messages after the LLM generates them
return messages.filter((msg) => msg.role !== "system");
}
async processOutputStream({
stream,
context,
}: {
stream: ReadableStream;
context: RequestContext;
}): Promise<ReadableStream> {
// Transform streaming responses
return stream;
}
}
Built-in Utility ProcessorsDirect link to Built-in Utility Processors
Mastra provides utility processors for common tasks:
For security and validation processors, see the Guardrails page for input/output guardrails and moderation processors. For memory-specific processors, see the Memory Processors page for processors that handle message history, semantic recall, and working memory.
TokenLimiterDirect link to TokenLimiter
Prevents context window overflow by removing older messages when the total token count exceeds a specified limit.
import { Agent } from "@mastra/core/agent";
import { TokenLimiter } from "@mastra/core/processors";
import { openai } from "@ai-sdk/openai";
const agent = new Agent({
name: "my-agent",
model: openai("gpt-4o"),
inputProcessors: [
// Ensure the total tokens don't exceed ~127k
new TokenLimiter(127000),
],
});
The TokenLimiter uses the o200k_base encoding by default (suitable for GPT-4o). You can specify other encodings for different models:
import cl100k_base from "js-tiktoken/ranks/cl100k_base";
const agent = new Agent({
name: "my-agent",
inputProcessors: [
new TokenLimiter({
limit: 16000, // Example limit for a 16k context model
encoding: cl100k_base,
}),
],
});
ToolCallFilterDirect link to ToolCallFilter
Removes tool calls from messages sent to the LLM, saving tokens by excluding potentially verbose tool interactions.
import { Agent } from "@mastra/core/agent";
import { ToolCallFilter, TokenLimiter } from "@mastra/core/processors";
import { openai } from "@ai-sdk/openai";
const agent = new Agent({
name: "my-agent",
model: openai("gpt-4o"),
inputProcessors: [
// Example 1: Remove all tool calls/results
new ToolCallFilter(),
// Example 2: Remove only specific tool calls
new ToolCallFilter({ exclude: ["generateImageTool"] }),
// Always place TokenLimiter last
new TokenLimiter(127000),
],
});
Note: The example above filters tool calls and limits tokens for the LLM, but these filtered messages will still be saved to memory. To also filter messages before they're saved to memory, manually add memory processors before utility processors. See Memory Processors for details.
Using workflows as processorsDirect link to Using workflows as processors
You can use Mastra workflows as processors to create complex processing pipelines with parallel execution, conditional branching, and error handling:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { ProcessorStepSchema } from "@mastra/core/processors";
import { Agent } from "@mastra/core/agent";
// Create a workflow that runs multiple checks in parallel
const moderationWorkflow = createWorkflow({
id: "moderation-pipeline",
inputSchema: ProcessorStepSchema,
outputSchema: ProcessorStepSchema,
})
.then(createStep(new LengthValidator({ maxLength: 10000 })))
.parallel([
createStep(new PIIDetector({ strategy: "redact" })),
createStep(new ToxicityChecker({ threshold: 0.8 })),
])
.commit();
// Use the workflow as an input processor
const agent = new Agent({
id: "moderated-agent",
name: "Moderated Agent",
model: "openai/gpt-4o",
inputProcessors: [moderationWorkflow],
});
When an agent is registered with Mastra, processor workflows are automatically registered as workflows, allowing you to view and debug them in the playground.
Retry mechanismDirect link to Retry mechanism
Processors can request that the LLM retry its response with feedback. This is useful for implementing quality checks, output validation, or iterative refinement:
import type { Processor } from "@mastra/core";
export class QualityChecker implements Processor {
id = "quality-checker";
async processOutputStep({ text, abort, retryCount }) {
const qualityScore = await evaluateQuality(text);
if (qualityScore < 0.7 && retryCount < 3) {
// Request a retry with feedback for the LLM
abort("Response quality score too low. Please provide a more detailed answer.", {
retry: true,
metadata: { score: qualityScore },
});
}
return [];
}
}
const agent = new Agent({
id: "quality-agent",
name: "Quality Agent",
model: "openai/gpt-4o",
outputProcessors: [new QualityChecker()],
maxProcessorRetries: 3, // Maximum retry attempts (default: 3)
});
The retry mechanism:
- Only works in
processOutputStepandprocessInputStepmethods - Replays the step with the abort reason added as context for the LLM
- Tracks retry count via the
retryCountparameter - Respects
maxProcessorRetrieslimit on the agent
Related documentationDirect link to Related documentation
- Guardrails - Security and validation processors
- Memory Processors - Memory-specific processors and automatic integration
- Processor Interface - Full API reference for processors