# Processor Interface The `Processor` interface defines the contract for all processors in Mastra. Processors can implement one or more methods to handle different stages of the agent execution pipeline. ## When processor methods run The five processor methods run at different points in the agent execution lifecycle: ```text ┌─────────────────────────────────────────────────────────────────┐ │ Agent Execution Flow │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ User Input │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ processInput │ ← Runs ONCE at start │ │ └────────┬────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Agentic Loop │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ processInputStep │ ← Runs at EACH step │ │ │ │ └──────────┬──────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ LLM Execution │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌──────────────────────┐ │ │ │ │ │ processOutputStream │ ← Runs on EACH stream chunk │ │ │ │ └──────────┬───────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌──────────────────────┐ │ │ │ │ │ processOutputStep │ ← Runs after EACH LLM step │ │ │ │ └──────────┬───────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ Tool Execution (if needed) │ │ │ │ │ │ │ │ │ └──────── Loop back if tools called ────────│ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ processOutputResult │ ← Runs ONCE after completion │ │ └─────────────────────┘ │ │ │ │ │ ▼ │ │ Final Response │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` | Method | When it runs | Use case | | --------------------- | ------------------------------------------------------ | ------------------------------------------------------------- | | `processInput` | Once at the start, before the agentic loop | Validate/transform initial user input, add context | | `processInputStep` | At each step of the agentic loop, before each LLM call | Transform messages between steps, handle tool results | | `processOutputStream` | On each streaming chunk during LLM response | Filter/modify streaming content, detect patterns in real-time | | `processOutputStep` | After each LLM response, before tool execution | Validate output quality, implement guardrails with retry | | `processOutputResult` | Once after generation completes | Post-process final response, log results | ## Interface definition ```typescript interface Processor { readonly id: TId; readonly name?: string; processInput?(args: ProcessInputArgs): Promise | ProcessInputResult; processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult; processOutputStream?(args: ProcessOutputStreamArgs): Promise; processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult; processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult; } ``` ## Properties **id:** (`string`): Unique identifier for the processor. Used for tracing and debugging. **name?:** (`string`): Optional display name for the processor. Falls back to id if not provided. ## Methods ### processInput Processes input messages before they are sent to the LLM. Runs once at the start of agent execution. ```typescript processInput?(args: ProcessInputArgs): Promise | ProcessInputResult; ``` #### ProcessInputArgs **messages:** (`MastraDBMessage[]`): User and assistant messages to process (excludes system messages). **systemMessages:** (`CoreMessage[]`): All system messages (agent instructions, memory context, user-provided). Can be modified and returned. **messageList:** (`MessageList`): Full MessageList instance for advanced message management. **abort:** (`(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never`): Function to abort processing. Throws a TripWire error that stops execution. Pass \`retry: true\` to request the LLM retry the step with feedback. **retryCount?:** (`number`): Number of times processors have triggered retry for this generation. Use this to limit retry attempts. **tracingContext?:** (`TracingContext`): Tracing context for observability. **requestContext?:** (`RequestContext`): Request-scoped context with execution metadata like threadId and resourceId. #### ProcessInputResult The method can return one of three types: **MastraDBMessage\[]:** (`array`): Transformed messages array. System messages remain unchanged. **MessageList:** (`MessageList`): The same messageList instance passed in. Indicates you've mutated it directly. **{ messages, systemMessages }:** (`object`): Object with both transformed messages and modified system messages. *** ### processInputStep Processes input messages at each step of the agentic loop, before they are sent to the LLM. Unlike `processInput` which runs once at the start, this runs at every step including tool call continuations. ```typescript processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult; ``` #### Execution order in the agentic loop 1. `processInput` (once at start) 2. `processInputStep` from inputProcessors (at each step, before LLM call) 3. `prepareStep` callback (runs as part of the processInputStep pipeline, after inputProcessors) 4. LLM execution 5. Tool execution (if needed) 6. Repeat from step 2 if tools were called #### ProcessInputStepArgs **messages:** (`MastraDBMessage[]`): All messages including tool calls and results from previous steps (read-only snapshot). **messageList:** (`MessageList`): MessageList instance for managing messages. Can mutate directly or return in result. **stepNumber:** (`number`): Current step number (0-indexed). Step 0 is the initial LLM call. **steps:** (`StepResult[]`): Results from previous steps, including text, toolCalls, and toolResults. **systemMessages:** (`CoreMessage[]`): All system messages (read-only snapshot). Return in result to replace. **model:** (`MastraLanguageModelV2`): Current model being used. Return a different model in result to switch. **toolChoice?:** (`ToolChoice`): Current tool choice setting ('auto', 'none', 'required', or specific tool). **activeTools?:** (`string[]`): Currently active tool names. Return filtered array to limit tools. **tools?:** (`ToolSet`): Current tools available for this step. Return in result to add/replace tools. **providerOptions?:** (`SharedV2ProviderOptions`): Provider-specific options (e.g., Anthropic cacheControl, OpenAI reasoningEffort). **modelSettings?:** (`CallSettings`): Model settings like temperature, maxTokens, topP. **structuredOutput?:** (`StructuredOutputOptions`): Structured output configuration (schema, output mode). Return in result to modify. **abort:** (`(reason?: string) => never`): Function to abort processing. **tracingContext?:** (`TracingContext`): Tracing context for observability. **requestContext?:** (`RequestContext`): Request-scoped context with execution metadata. #### ProcessInputStepResult The method can return any combination of these properties: **model?:** (`LanguageModelV2 | string`): Change the model for this step. Can be a model instance or router ID like 'openai/gpt-4o'. **toolChoice?:** (`ToolChoice`): Change tool selection behavior for this step. **activeTools?:** (`string[]`): Filter which tools are available for this step. **tools?:** (`ToolSet`): Replace or modify tools for this step. Use spread to merge: { tools: { ...tools, newTool } }. **messages?:** (`MastraDBMessage[]`): Replace all messages. Cannot be used with messageList. **messageList?:** (`MessageList`): Return the same messageList instance (indicates you mutated it). Cannot be used with messages. **systemMessages?:** (`CoreMessage[]`): Replace all system messages for this step only. **providerOptions?:** (`SharedV2ProviderOptions`): Change provider-specific options for this step. **modelSettings?:** (`CallSettings`): Change model settings for this step. **structuredOutput?:** (`StructuredOutputOptions`): Change structured output configuration for this step. #### Processor chaining When multiple processors implement `processInputStep`, they run in order and changes chain through: ```text Processor 1: receives { model: 'gpt-4o' } → returns { model: 'gpt-4o-mini' } Processor 2: receives { model: 'gpt-4o-mini' } → returns { toolChoice: 'none' } Final: model = 'gpt-4o-mini', toolChoice = 'none' ``` #### System message isolation System messages are **reset to their original values** at the start of each step. Modifications made in `processInputStep` only affect the current step, not subsequent steps. #### Use cases - Dynamic model switching based on step number or context - Disabling tools after a certain number of steps - Dynamically adding or replacing tools based on conversation context - Transforming message part types between providers (e.g., `reasoning` → `thinking` for Anthropic) - Modifying messages based on step number or accumulated context - Adding step-specific system instructions - Adjusting provider options per step (e.g., cache control) - Modifying structured output schema based on step context *** ### processOutputStream Processes streaming output chunks with built-in state management. Allows processors to accumulate chunks and make decisions based on larger context. ```typescript processOutputStream?(args: ProcessOutputStreamArgs): Promise; ``` #### ProcessOutputStreamArgs **part:** (`ChunkType`): The current stream chunk being processed. **streamParts:** (`ChunkType[]`): All chunks seen so far in the stream. **state:** (`Record`): Mutable state object that persists across chunks within a single stream. **abort:** (`(reason?: string) => never`): Function to abort the stream. **messageList?:** (`MessageList`): MessageList instance for accessing conversation history. **tracingContext?:** (`TracingContext`): Tracing context for observability. **requestContext?:** (`RequestContext`): Request-scoped context with execution metadata. #### Return value - Return the `ChunkType` to emit it (possibly modified) - Return `null` or `undefined` to skip emitting the chunk *** ### processOutputResult Processes the complete output result after streaming or generation is finished. ```typescript processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult; ``` #### ProcessOutputResultArgs **messages:** (`MastraDBMessage[]`): The generated response messages. **messageList:** (`MessageList`): MessageList instance for managing messages. **abort:** (`(reason?: string) => never`): Function to abort processing. **tracingContext?:** (`TracingContext`): Tracing context for observability. **requestContext?:** (`RequestContext`): Request-scoped context with execution metadata. *** ### processOutputStep Processes output after each LLM response in the agentic loop, before tool execution. Unlike `processOutputResult` which runs once at the end, this runs at every step. This is the ideal method for implementing guardrails that can trigger retries. ```typescript processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult; ``` #### ProcessOutputStepArgs **messages:** (`MastraDBMessage[]`): All messages including the latest LLM response. **messageList:** (`MessageList`): MessageList instance for managing messages. **stepNumber:** (`number`): Current step number (0-indexed). **finishReason?:** (`string`): The finish reason from the LLM (stop, tool-use, length, etc.). **toolCalls?:** (`ToolCallInfo[]`): Tool calls made in this step (if any). **text?:** (`string`): Generated text from this step. **systemMessages?:** (`CoreMessage[]`): All system messages for read/modify access. **abort:** (`(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never`): Function to abort processing. Pass \`retry: true\` to request the LLM retry the step. **retryCount?:** (`number`): Number of times processors have triggered retry. Use this to limit retry attempts. **tracingContext?:** (`TracingContext`): Tracing context for observability. **requestContext?:** (`RequestContext`): Request-scoped context with execution metadata. #### Use cases - Implementing quality guardrails that can request retries - Validating LLM output before tool execution - Adding per-step logging or metrics - Implementing output moderation with retry capability #### Example: Quality guardrail with retry ```typescript import type { Processor } from "@mastra/core"; export class QualityGuardrail implements Processor { id = "quality-guardrail"; async processOutputStep({ text, abort, retryCount }) { const score = await evaluateResponseQuality(text); if (score < 0.7) { if (retryCount < 3) { // Request retry with feedback for the LLM abort("Response quality too low. Please provide more detail.", { retry: true, metadata: { qualityScore: score }, }); } else { // Max retries reached, block the response abort("Response quality too low after multiple attempts."); } } return []; } } ``` ## Processor types Mastra provides type aliases to ensure processors implement the required methods: ```typescript // Must implement processInput OR processInputStep (or both) type InputProcessor = Processor & ( | { processInput: required } | { processInputStep: required } ); // Must implement processOutputStream, processOutputStep, OR processOutputResult (or any combination) type OutputProcessor = Processor & ( | { processOutputStream: required } | { processOutputStep: required } | { processOutputResult: required } ); ``` ## Usage examples ### Basic input processor ```typescript import type { Processor, MastraDBMessage } from "@mastra/core"; export class LowercaseProcessor implements Processor { id = "lowercase"; async processInput({ messages }): Promise { return messages.map((msg) => ({ ...msg, content: { ...msg.content, parts: msg.content.parts?.map((part) => part.type === "text" ? { ...part, text: part.text.toLowerCase() } : part ), }, })); } } ``` ### Per-step processor with processInputStep ```typescript import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core"; export class DynamicModelProcessor implements Processor { id = "dynamic-model"; async processInputStep({ stepNumber, steps, toolChoice, }: ProcessInputStepArgs): Promise { // Use a fast model for initial response if (stepNumber === 0) { return { model: "openai/gpt-4o-mini" }; } // Switch to powerful model after tool calls if (steps.length > 0 && steps[steps.length - 1].toolCalls?.length) { return { model: "openai/gpt-4o" }; } // Disable tools after 5 steps to force completion if (stepNumber > 5) { return { toolChoice: "none" }; } return {}; } } ``` ### Message transformer with processInputStep ```typescript import type { Processor, MastraDBMessage } from "@mastra/core"; export class ReasoningTransformer implements Processor { id = "reasoning-transformer"; async processInputStep({ messages, messageList }) { // Transform reasoning parts to thinking parts at each step // This is useful when switching between model providers for (const msg of messages) { if (msg.role === "assistant" && msg.content.parts) { for (const part of msg.content.parts) { if (part.type === "reasoning") { (part as any).type = "thinking"; } } } } return messageList; } } ``` ### Hybrid processor (input and output) ```typescript import type { Processor, MastraDBMessage, ChunkType } from "@mastra/core"; export class ContentFilter implements Processor { id = "content-filter"; private blockedWords: string[]; constructor(blockedWords: string[]) { this.blockedWords = blockedWords; } async processInput({ messages, abort }): Promise { for (const msg of messages) { const text = msg.content.parts ?.filter((p) => p.type === "text") .map((p) => p.text) .join(" "); if (this.blockedWords.some((word) => text?.includes(word))) { abort("Blocked content detected in input"); } } return messages; } async processOutputStream({ part, abort }): Promise { if (part.type === "text-delta") { if (this.blockedWords.some((word) => part.textDelta.includes(word))) { abort("Blocked content detected in output"); } } return part; } } ``` ### Stream accumulator with state ```typescript import type { Processor, ChunkType } from "@mastra/core"; export class WordCounter implements Processor { id = "word-counter"; async processOutputStream({ part, state }): Promise { // Initialize state on first chunk if (!state.wordCount) { state.wordCount = 0; } // Count words in text chunks if (part.type === "text-delta") { const words = part.textDelta.split(/\s+/).filter(Boolean); state.wordCount += words.length; } // Log word count on finish if (part.type === "finish") { console.log(`Total words: ${state.wordCount}`); } return part; } } ``` ## Related - [Processors overview](https://mastra.ai/docs/agents/processors) - Conceptual guide to processors - [Guardrails](https://mastra.ai/docs/agents/guardrails) - Security and validation processors - [Memory Processors](https://mastra.ai/docs/memory/memory-processors) - Memory-specific processors