# Processors Processors transform, validate, or control messages as they pass through an agent. They run at specific points in the agent's execution pipeline, allowing you to modify inputs before they reach the language model or outputs before they're returned to users. Processors are configured as: - **`inputProcessors`**: Run before messages reach the language model. - **`outputProcessors`**: Run after the language model generates a response, but before it's returned to users. You can use individual `Processor` objects or compose them into workflows using Mastra's workflow primitives. Workflows give you advanced control over processor execution order, parallel processing, and conditional logic. Some processors implement both input and output logic and can be used in either array depending on where the transformation should occur. ## When to use processors Use processors to: - Normalize or validate user input - Add guardrails to your agent - Detect and prevent prompt injection or jailbreak attempts - Moderate content for safety or compliance - Transform messages (e.g., translate languages, filter tool calls) - Limit token usage or message history length - Redact sensitive information (PII) - Apply custom business logic to messages Mastra includes several processors for common use cases. You can also create custom processors for application-specific requirements. ## Adding processors to an agent Import and instantiate the processor, then pass it to the agent's `inputProcessors` or `outputProcessors` array: ```typescript import { Agent } from "@mastra/core/agent"; import { ModerationProcessor } from "@mastra/core/processors"; export const moderatedAgent = new Agent({ name: "moderated-agent", instructions: "You are a helpful assistant", model: "openai/gpt-4o-mini", inputProcessors: [ new ModerationProcessor({ model: "openai/gpt-4.1-nano", categories: ["hate", "harassment", "violence"], threshold: 0.7, strategy: "block", }), ], }); ``` ## Execution order Processors run in the order they appear in the array: ```typescript inputProcessors: [ new UnicodeNormalizer(), new PromptInjectionDetector(), new ModerationProcessor(), ]; ``` For output processors, the order determines the sequence of transformations applied to the model's response. ### With memory enabled When memory is enabled on an agent, memory processors are automatically added to the pipeline: **Input processors:** ```text [Memory Processors] → [Your inputProcessors] ``` Memory loads message history first, then your processors run. **Output processors:** ```text [Your outputProcessors] → [Memory Processors] ``` Your processors run first, then memory persists messages. This ordering ensures that if your output guardrail calls `abort()`, memory processors are skipped and no messages are saved. See [Memory Processors](https://mastra.ai/docs/memory/memory-processors) for details. ## Creating custom processors Custom processors implement the `Processor` interface: ### Custom input processor ```typescript import type { Processor, MastraDBMessage, RequestContext, } from "@mastra/core"; export class CustomInputProcessor implements Processor { id = "custom-input"; async processInput({ messages, systemMessages, context, }: { messages: MastraDBMessage[]; systemMessages: CoreMessage[]; context: RequestContext; }): Promise { // Transform messages before they reach the LLM return messages.map((msg) => ({ ...msg, content: { ...msg.content, content: msg.content.content.toLowerCase(), }, })); } } ``` The `processInput` method receives: - `messages`: User and assistant messages (not system messages) - `systemMessages`: All system messages (agent instructions, memory context, user-provided system prompts) - `messageList`: The full MessageList instance for advanced use cases - `abort`: Function to stop processing and return early - `requestContext`: Execution metadata like `threadId` and `resourceId` The method can return: - `MastraDBMessage[]` — Transformed messages array (backward compatible) - `{ messages: MastraDBMessage[]; systemMessages: CoreMessage[] }` — Both messages and modified system messages The framework handles both return formats, so modifying system messages is optional and existing processors continue to work. ### Modifying system messages To modify system messages (e.g., trim verbose prompts for smaller models), return an object with both `messages` and `systemMessages`: ```typescript import type { Processor, CoreMessage, MastraDBMessage } from "@mastra/core"; export class SystemTrimmer implements Processor { id = "system-trimmer"; async processInput({ messages, systemMessages, }): Promise<{ messages: MastraDBMessage[]; systemMessages: CoreMessage[] }> { // Trim system messages for smaller models const trimmedSystemMessages = systemMessages.map((msg) => ({ ...msg, content: typeof msg.content === "string" ? msg.content.substring(0, 500) : msg.content, })); return { messages, systemMessages: trimmedSystemMessages }; } } ``` This is useful for: - Trimming verbose system prompts for models with smaller context windows - Filtering or modifying semantic recall content to prevent "prompt too long" errors - Dynamically adjusting system instructions based on the conversation ### Per-step processing with processInputStep While `processInput` runs once at the start of agent execution, `processInputStep` runs at **each step** of the agentic loop (including tool call continuations). This enables per-step configuration changes like dynamic model switching or tool choice modifications. ```typescript import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core"; export class DynamicModelProcessor implements Processor { id = "dynamic-model"; async processInputStep({ stepNumber, model, toolChoice, messageList, }: ProcessInputStepArgs): Promise { // Use a fast model for initial response if (stepNumber === 0) { return { model: "openai/gpt-4o-mini" }; } // Disable tools after 5 steps to force completion if (stepNumber > 5) { return { toolChoice: "none" }; } // No changes for other steps return {}; } } ``` The `processInputStep` method receives: - `stepNumber`: Current step in the agentic loop (0-indexed) - `steps`: Results from previous steps - `messages`: Current messages snapshot (read-only) - `systemMessages`: Current system messages (read-only) - `messageList`: The full MessageList instance for mutations - `model`: Current model being used - `tools`: Current tools available for this step - `toolChoice`: Current tool choice setting - `activeTools`: Currently active tools - `providerOptions`: Provider-specific options - `modelSettings`: Model settings like temperature - `structuredOutput`: Structured output configuration The method can return any combination of: - `model`: Change the model for this step - `tools`: Replace or add tools (use spread to merge: `{ tools: { ...tools, newTool } }`) - `toolChoice`: Change tool selection behavior - `activeTools`: Filter which tools are available - `messages`: Replace messages (applied to messageList) - `systemMessages`: Replace all system messages - `providerOptions`: Modify provider options - `modelSettings`: Modify model settings - `structuredOutput`: Modify structured output configuration #### Ensuring a final response with maxSteps When using `maxSteps` to limit agent execution, the agent may return an empty response if it attempts a tool call on the final step. Use `processInputStep` to force a text response on the last step: ```typescript import { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core/processors"; export class EnsureFinalResponseProcessor implements Processor { readonly id = "ensure-final-response"; private maxSteps: number; constructor(maxSteps: number) { this.maxSteps = maxSteps; } async processInputStep({ stepNumber, systemMessages }: ProcessInputStepArgs): Promise { // On the last step, prevent tool calls and instruct the LLM to summarize if (stepNumber === this.maxSteps - 1) { return { tools: {}, toolChoice: "none", systemMessages: [ ...systemMessages, { role: "system", content: "You have reached the maximum number of steps. Summarize your progress so far and provide a best-effort response. If the task is incomplete, clearly indicate what remains to be done.", }, ], }; } return {}; } } ``` Use it with your agent: ```typescript import { Agent } from "@mastra/core/agent"; import { EnsureFinalResponseProcessor } from "../processors/ensure-final-response"; const MAX_STEPS = 5; const agent = new Agent({ id: "bounded-agent", name: "Bounded Agent", model: "openai/gpt-4o-mini", tools: { /* your tools */ }, inputProcessors: [new EnsureFinalResponseProcessor(MAX_STEPS)], }); // Pass maxSteps when calling generate/stream const result = await agent.generate("Your prompt", { maxSteps: MAX_STEPS }); ``` This ensures that on the final allowed step (step 4 when `maxSteps` is 5, since steps are 0-indexed), the LLM generates a summary instead of attempting another tool call, and clearly indicates if the task is incomplete. #### Using prepareStep callback For simpler per-step logic, you can use the `prepareStep` callback on `generate()` or `stream()` instead of creating a full processor: ```typescript await agent.generate("Complex task", { prepareStep: async ({ stepNumber, model }) => { if (stepNumber === 0) { return { model: "openai/gpt-4o-mini" }; } if (stepNumber > 5) { return { toolChoice: "none" }; } }, }); ``` ### Custom output processor ```typescript import type { Processor, MastraDBMessage, RequestContext, } from "@mastra/core"; export class CustomOutputProcessor implements Processor { id = "custom-output"; async processOutputResult({ messages, context, }: { messages: MastraDBMessage[]; context: RequestContext; }): Promise { // Transform messages after the LLM generates them return messages.filter((msg) => msg.role !== "system"); } async processOutputStream({ stream, context, }: { stream: ReadableStream; context: RequestContext; }): Promise { // Transform streaming responses return stream; } } ``` #### Adding metadata in output processors You can add custom metadata to messages in `processOutputResult`. This metadata is accessible via the response object: ```typescript import type { Processor, MastraDBMessage } from "@mastra/core"; export class MetadataProcessor implements Processor { id = "metadata-processor"; async processOutputResult({ messages, }: { messages: MastraDBMessage[]; }): Promise { return messages.map((msg) => { if (msg.role === "assistant") { return { ...msg, content: { ...msg.content, metadata: { ...msg.content.metadata, processedAt: new Date().toISOString(), customData: "your data here", }, }, }; } return msg; }); } } ``` Access the metadata with `generate()`: ```typescript const result = await agent.generate("Hello"); // The response includes uiMessages with processor-added metadata const assistantMessage = result.response?.uiMessages?.find((m) => m.role === "assistant"); console.log(assistantMessage?.metadata?.customData); ``` Access the metadata when streaming: ```typescript const stream = await agent.stream("Hello"); for await (const chunk of stream.fullStream) { if (chunk.type === "finish") { // Access response with processor-added metadata from the finish chunk const uiMessages = chunk.payload.response?.uiMessages; const assistantMessage = uiMessages?.find((m) => m.role === "assistant"); console.log(assistantMessage?.metadata?.customData); } } // Or via the response promise after consuming the stream const response = await stream.response; console.log(response.uiMessages); ``` ## Built-in Utility Processors Mastra provides utility processors for common tasks: **For security and validation processors**, see the [Guardrails](https://mastra.ai/docs/agents/guardrails) page for input/output guardrails and moderation processors. **For memory-specific processors**, see the [Memory Processors](https://mastra.ai/docs/memory/memory-processors) page for processors that handle message history, semantic recall, and working memory. ### TokenLimiter Prevents context window overflow by removing older messages when the total token count exceeds a specified limit. ```typescript import { Agent } from "@mastra/core/agent"; import { TokenLimiter } from "@mastra/core/processors"; const agent = new Agent({ name: "my-agent", model: "openai/gpt-4o", inputProcessors: [ // Ensure the total tokens don't exceed ~127k new TokenLimiter(127000), ], }); ``` The `TokenLimiter` uses the `o200k_base` encoding by default (suitable for GPT-4o). You can specify other encodings for different models: ```typescript import cl100k_base from "js-tiktoken/ranks/cl100k_base"; const agent = new Agent({ name: "my-agent", inputProcessors: [ new TokenLimiter({ limit: 16000, // Example limit for a 16k context model encoding: cl100k_base, }), ], }); ``` ### ToolCallFilter Removes tool calls from messages sent to the LLM, saving tokens by excluding potentially verbose tool interactions. ```typescript import { Agent } from "@mastra/core/agent"; import { ToolCallFilter, TokenLimiter } from "@mastra/core/processors"; const agent = new Agent({ name: "my-agent", model: "openai/gpt-4o", inputProcessors: [ // Example 1: Remove all tool calls/results new ToolCallFilter(), // Example 2: Remove only specific tool calls new ToolCallFilter({ exclude: ["generateImageTool"] }), // Always place TokenLimiter last new TokenLimiter(127000), ], }); ``` > **Note:** The example above filters tool calls and limits tokens for the LLM, but these filtered messages will still be saved to memory. To also filter messages before they're saved to memory, manually add memory processors before utility processors. See [Memory Processors](https://mastra.ai/docs/memory/memory-processors) for details. ### ToolSearchProcessor Enables dynamic tool discovery and loading for agents with large tool libraries. Instead of providing all tools upfront, the agent searches for tools by keyword and loads them on demand, reducing context token usage. ```typescript import { Agent } from "@mastra/core/agent"; import { ToolSearchProcessor } from "@mastra/core/processors"; const agent = new Agent({ name: "my-agent", model: "openai/gpt-4o", inputProcessors: [ new ToolSearchProcessor({ tools: { createIssue: githubTools.createIssue, sendEmail: emailTools.send, // ... hundreds of tools }, search: { topK: 5, minScore: 0.1 }, }), ], }); ``` The processor gives the agent two meta-tools: `search_tools` to find tools by keyword and `load_tool` to add a tool to the conversation. Loaded tools persist within the thread. See the [ToolSearchProcessor reference](https://mastra.ai/reference/processors/tool-search-processor) for full configuration options. ## Using workflows as processors You can use Mastra workflows as processors to create complex processing pipelines with parallel execution, conditional branching, and error handling: ```typescript import { createWorkflow, createStep } from "@mastra/core/workflows"; import { ProcessorStepSchema } from "@mastra/core/processors"; import { Agent } from "@mastra/core/agent"; // Create a workflow that runs multiple checks in parallel const moderationWorkflow = createWorkflow({ id: "moderation-pipeline", inputSchema: ProcessorStepSchema, outputSchema: ProcessorStepSchema, }) .then(createStep(new LengthValidator({ maxLength: 10000 }))) .parallel([ createStep(new PIIDetector({ strategy: "redact" })), createStep(new ToxicityChecker({ threshold: 0.8 })), ]) .commit(); // Use the workflow as an input processor const agent = new Agent({ id: "moderated-agent", name: "Moderated Agent", model: "openai/gpt-4o", inputProcessors: [moderationWorkflow], }); ``` When an agent is registered with Mastra, processor workflows are automatically registered as workflows, allowing you to view and debug them in the [Studio](https://mastra.ai/docs/getting-started/studio). ## Retry mechanism Processors can request that the LLM retry its response with feedback. This is useful for implementing quality checks, output validation, or iterative refinement: ```typescript import type { Processor } from "@mastra/core"; export class QualityChecker implements Processor { id = "quality-checker"; async processOutputStep({ text, abort, retryCount }) { const qualityScore = await evaluateQuality(text); if (qualityScore < 0.7 && retryCount < 3) { // Request a retry with feedback for the LLM abort("Response quality score too low. Please provide a more detailed answer.", { retry: true, metadata: { score: qualityScore }, }); } return []; } } const agent = new Agent({ id: "quality-agent", name: "Quality Agent", model: "openai/gpt-4o", outputProcessors: [new QualityChecker()], maxProcessorRetries: 3, // Maximum retry attempts (default: 3) }); ``` The retry mechanism: - Only works in `processOutputStep` and `processInputStep` methods - Replays the step with the abort reason added as context for the LLM - Tracks retry count via the `retryCount` parameter - Respects `maxProcessorRetries` limit on the agent ## Related documentation - [Guardrails](https://mastra.ai/docs/agents/guardrails) - Security and validation processors - [Memory Processors](https://mastra.ai/docs/memory/memory-processors) - Memory-specific processors and automatic integration - [Processor Interface](https://mastra.ai/reference/processors/processor-interface) - Full API reference for processors - [ToolSearchProcessor Reference](https://mastra.ai/reference/processors/tool-search-processor) - API reference for dynamic tool search