Skip to main content

Processor Interface

The Processor interface defines the contract for all processors in Mastra. Processors can implement one or more methods to handle different stages of the agent execution pipeline.

When processor methods run
Direct link to When processor methods run

The five processor methods run at different points in the agent execution lifecycle:

┌─────────────────────────────────────────────────────────────────┐
│ Agent Execution Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ User Input │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ processInput │ ← Runs ONCE at start │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Agentic Loop │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ processInputStep │ ← Runs at EACH step │ │
│ │ └──────────┬──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Execution │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ processOutputStream │ ← Runs on EACH stream chunk │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ processOutputStep │ ← Runs after EACH LLM step │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Tool Execution (if needed) │ │
│ │ │ │ │
│ │ └──────── Loop back if tools called ────────│ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ processOutputResult │ ← Runs ONCE after completion │
│ └─────────────────────┘ │
│ │ │
│ ▼ │
│ Final Response │
│ │
└─────────────────────────────────────────────────────────────────┘
MethodWhen it runsUse case
processInputOnce at the start, before the agentic loopValidate/transform initial user input, add context
processInputStepAt each step of the agentic loop, before each LLM callTransform messages between steps, handle tool results
processOutputStreamOn each streaming chunk during LLM responseFilter/modify streaming content, detect patterns in real-time
processOutputStepAfter each LLM response, before tool executionValidate output quality, implement guardrails with retry
processOutputResultOnce after generation completesPost-process final response, log results

Interface definition
Direct link to Interface definition

interface Processor<TId extends string = string> {
readonly id: TId;
readonly name?: string;

processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;
processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult;
processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;
processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;
processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;
}

Properties
Direct link to Properties

id:

string
Unique identifier for the processor. Used for tracing and debugging.

name?:

string
Optional display name for the processor. Falls back to id if not provided.

Methods
Direct link to Methods

processInput
Direct link to processInput

Processes input messages before they are sent to the LLM. Runs once at the start of agent execution.

processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;

ProcessInputArgs
Direct link to ProcessInputArgs

messages:

MastraDBMessage[]
User and assistant messages to process (excludes system messages).

systemMessages:

CoreMessage[]
All system messages (agent instructions, memory context, user-provided). Can be modified and returned.

messageList:

MessageList
Full MessageList instance for advanced message management.

abort:

(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never
Function to abort processing. Throws a TripWire error that stops execution. Pass `retry: true` to request the LLM retry the step with feedback.

retryCount?:

number
Number of times processors have triggered retry for this generation. Use this to limit retry attempts.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata like threadId and resourceId.

ProcessInputResult
Direct link to ProcessInputResult

The method can return one of three types:

MastraDBMessage[]:

array
Transformed messages array. System messages remain unchanged.

MessageList:

MessageList
The same messageList instance passed in. Indicates you've mutated it directly.

{ messages, systemMessages }:

object
Object with both transformed messages and modified system messages.

processInputStep
Direct link to processInputStep

Processes input messages at each step of the agentic loop, before they are sent to the LLM. Unlike processInput which runs once at the start, this runs at every step including tool call continuations.

processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult;

Execution order in the agentic loop
Direct link to Execution order in the agentic loop

  1. processInput (once at start)
  2. processInputStep from inputProcessors (at each step, before LLM call)
  3. prepareStep callback (runs as part of the processInputStep pipeline, after inputProcessors)
  4. LLM execution
  5. Tool execution (if needed)
  6. Repeat from step 2 if tools were called

ProcessInputStepArgs
Direct link to ProcessInputStepArgs

messages:

MastraDBMessage[]
All messages including tool calls and results from previous steps (read-only snapshot).

messageList:

MessageList
MessageList instance for managing messages. Can mutate directly or return in result.

stepNumber:

number
Current step number (0-indexed). Step 0 is the initial LLM call.

steps:

StepResult[]
Results from previous steps, including text, toolCalls, and toolResults.

systemMessages:

CoreMessage[]
All system messages (read-only snapshot). Return in result to replace.

model:

MastraLanguageModelV2
Current model being used. Return a different model in result to switch.

toolChoice?:

ToolChoice
Current tool choice setting ('auto', 'none', 'required', or specific tool).

activeTools?:

string[]
Currently active tool names. Return filtered array to limit tools.

tools?:

ToolSet
Current tools available for this step. Return in result to add/replace tools.

providerOptions?:

SharedV2ProviderOptions
Provider-specific options (e.g., Anthropic cacheControl, OpenAI reasoningEffort).

modelSettings?:

CallSettings
Model settings like temperature, maxTokens, topP.

structuredOutput?:

StructuredOutputOptions
Structured output configuration (schema, output mode). Return in result to modify.

abort:

(reason?: string) => never
Function to abort processing.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

ProcessInputStepResult
Direct link to ProcessInputStepResult

The method can return any combination of these properties:

model?:

LanguageModelV2 | string
Change the model for this step. Can be a model instance or router ID like 'openai/gpt-4o'.

toolChoice?:

ToolChoice
Change tool selection behavior for this step.

activeTools?:

string[]
Filter which tools are available for this step.

tools?:

ToolSet
Replace or modify tools for this step. Use spread to merge: { tools: { ...tools, newTool } }.

messages?:

MastraDBMessage[]
Replace all messages. Cannot be used with messageList.

messageList?:

MessageList
Return the same messageList instance (indicates you mutated it). Cannot be used with messages.

systemMessages?:

CoreMessage[]
Replace all system messages for this step only.

providerOptions?:

SharedV2ProviderOptions
Change provider-specific options for this step.

modelSettings?:

CallSettings
Change model settings for this step.

structuredOutput?:

StructuredOutputOptions
Change structured output configuration for this step.

Processor chaining
Direct link to Processor chaining

When multiple processors implement processInputStep, they run in order and changes chain through:

Processor 1: receives { model: 'gpt-4o' } → returns { model: 'gpt-4o-mini' }
Processor 2: receives { model: 'gpt-4o-mini' } → returns { toolChoice: 'none' }
Final: model = 'gpt-4o-mini', toolChoice = 'none'

System message isolation
Direct link to System message isolation

System messages are reset to their original values at the start of each step. Modifications made in processInputStep only affect the current step, not subsequent steps.

Use cases
Direct link to Use cases

  • Dynamic model switching based on step number or context
  • Disabling tools after a certain number of steps
  • Dynamically adding or replacing tools based on conversation context
  • Transforming message part types between providers (e.g., reasoningthinking for Anthropic)
  • Modifying messages based on step number or accumulated context
  • Adding step-specific system instructions
  • Adjusting provider options per step (e.g., cache control)
  • Modifying structured output schema based on step context

processOutputStream
Direct link to processOutputStream

Processes streaming output chunks with built-in state management. Allows processors to accumulate chunks and make decisions based on larger context.

processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;

ProcessOutputStreamArgs
Direct link to ProcessOutputStreamArgs

part:

ChunkType
The current stream chunk being processed.

streamParts:

ChunkType[]
All chunks seen so far in the stream.

state:

Record<string, unknown>
Mutable state object that persists across chunks within a single stream.

abort:

(reason?: string) => never
Function to abort the stream.

messageList?:

MessageList
MessageList instance for accessing conversation history.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

Return value
Direct link to Return value

  • Return the ChunkType to emit it (possibly modified)
  • Return null or undefined to skip emitting the chunk

processOutputResult
Direct link to processOutputResult

Processes the complete output result after streaming or generation is finished.

processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;

ProcessOutputResultArgs
Direct link to ProcessOutputResultArgs

messages:

MastraDBMessage[]
The generated response messages.

messageList:

MessageList
MessageList instance for managing messages.

abort:

(reason?: string) => never
Function to abort processing.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

processOutputStep
Direct link to processOutputStep

Processes output after each LLM response in the agentic loop, before tool execution. Unlike processOutputResult which runs once at the end, this runs at every step. This is the ideal method for implementing guardrails that can trigger retries.

processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;

ProcessOutputStepArgs
Direct link to ProcessOutputStepArgs

messages:

MastraDBMessage[]
All messages including the latest LLM response.

messageList:

MessageList
MessageList instance for managing messages.

stepNumber:

number
Current step number (0-indexed).

finishReason?:

string
The finish reason from the LLM (stop, tool-use, length, etc.).

toolCalls?:

ToolCallInfo[]
Tool calls made in this step (if any).

text?:

string
Generated text from this step.

systemMessages?:

CoreMessage[]
All system messages for read/modify access.

abort:

(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never
Function to abort processing. Pass `retry: true` to request the LLM retry the step.

retryCount?:

number
Number of times processors have triggered retry. Use this to limit retry attempts.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

Use cases
Direct link to Use cases

  • Implementing quality guardrails that can request retries
  • Validating LLM output before tool execution
  • Adding per-step logging or metrics
  • Implementing output moderation with retry capability

Example: Quality guardrail with retry
Direct link to Example: Quality guardrail with retry

src/mastra/processors/quality-guardrail.ts
import type { Processor } from "@mastra/core";

export class QualityGuardrail implements Processor {
id = "quality-guardrail";

async processOutputStep({ text, abort, retryCount }) {
const score = await evaluateResponseQuality(text);

if (score < 0.7) {
if (retryCount < 3) {
// Request retry with feedback for the LLM
abort("Response quality too low. Please provide more detail.", {
retry: true,
metadata: { qualityScore: score },
});
} else {
// Max retries reached, block the response
abort("Response quality too low after multiple attempts.");
}
}

return [];
}
}

Processor types
Direct link to Processor types

Mastra provides type aliases to ensure processors implement the required methods:

// Must implement processInput OR processInputStep (or both)
type InputProcessor = Processor & (
| { processInput: required }
| { processInputStep: required }
);

// Must implement processOutputStream, processOutputStep, OR processOutputResult (or any combination)
type OutputProcessor = Processor & (
| { processOutputStream: required }
| { processOutputStep: required }
| { processOutputResult: required }
);

Usage examples
Direct link to Usage examples

Basic input processor
Direct link to Basic input processor

src/mastra/processors/lowercase.ts
import type { Processor, MastraDBMessage } from "@mastra/core";

export class LowercaseProcessor implements Processor {
id = "lowercase";

async processInput({ messages }): Promise<MastraDBMessage[]> {
return messages.map((msg) => ({
...msg,
content: {
...msg.content,
parts: msg.content.parts?.map((part) =>
part.type === "text"
? { ...part, text: part.text.toLowerCase() }
: part
),
},
}));
}
}

Per-step processor with processInputStep
Direct link to Per-step processor with processInputStep

src/mastra/processors/dynamic-model.ts
import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core";

export class DynamicModelProcessor implements Processor {
id = "dynamic-model";

async processInputStep({
stepNumber,
steps,
toolChoice,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}

// Switch to powerful model after tool calls
if (steps.length > 0 && steps[steps.length - 1].toolCalls?.length) {
return { model: "openai/gpt-4o" };
}

// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: "none" };
}

return {};
}
}

Message transformer with processInputStep
Direct link to Message transformer with processInputStep

src/mastra/processors/reasoning-transformer.ts
import type { Processor, MastraDBMessage } from "@mastra/core";

export class ReasoningTransformer implements Processor {
id = "reasoning-transformer";

async processInputStep({ messages, messageList }) {
// Transform reasoning parts to thinking parts at each step
// This is useful when switching between model providers
for (const msg of messages) {
if (msg.role === "assistant" && msg.content.parts) {
for (const part of msg.content.parts) {
if (part.type === "reasoning") {
(part as any).type = "thinking";
}
}
}
}
return messageList;
}
}

Hybrid processor (input and output)
Direct link to Hybrid processor (input and output)

src/mastra/processors/content-filter.ts
import type { Processor, MastraDBMessage, ChunkType } from "@mastra/core";

export class ContentFilter implements Processor {
id = "content-filter";
private blockedWords: string[];

constructor(blockedWords: string[]) {
this.blockedWords = blockedWords;
}

async processInput({ messages, abort }): Promise<MastraDBMessage[]> {
for (const msg of messages) {
const text = msg.content.parts
?.filter((p) => p.type === "text")
.map((p) => p.text)
.join(" ");

if (this.blockedWords.some((word) => text?.includes(word))) {
abort("Blocked content detected in input");
}
}
return messages;
}

async processOutputStream({ part, abort }): Promise<ChunkType | null> {
if (part.type === "text-delta") {
if (this.blockedWords.some((word) => part.textDelta.includes(word))) {
abort("Blocked content detected in output");
}
}
return part;
}
}

Stream accumulator with state
Direct link to Stream accumulator with state

src/mastra/processors/word-counter.ts
import type { Processor, ChunkType } from "@mastra/core";

export class WordCounter implements Processor {
id = "word-counter";

async processOutputStream({ part, state }): Promise<ChunkType> {
// Initialize state on first chunk
if (!state.wordCount) {
state.wordCount = 0;
}

// Count words in text chunks
if (part.type === "text-delta") {
const words = part.textDelta.split(/\s+/).filter(Boolean);
state.wordCount += words.length;
}

// Log word count on finish
if (part.type === "finish") {
console.log(`Total words: ${state.wordCount}`);
}

return part;
}
}