Processor interface
The Processor interface defines the contract for all processors in Mastra. Processors can implement one or more methods to handle different stages of the agent execution pipeline.
When processor methods runDirect link to When processor methods run
The six processor methods run at different points in the agent execution lifecycle:
┌─────────────────────────────────────────────────────────────────┐
│ Agent Execution Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ User Input │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ processInput │ ← Runs ONCE at start │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Agentic Loop │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ processInputStep │ ← Runs at EACH step │ │
│ │ └──────────┬──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Execution ──── API Error? ──┐ │ │
│ │ │ │ │ │
│ │ │ ┌───────────────────┐ │ │
│ │ │ │ processAPIError │ │ │
│ │ │ └─────────┬─────────┘ │ │
│ │ │ retry? └── Loop back ──┐ │ │
│ │ ▼ │ │ │
│ │ ┌──────────────────────┐ │ │ │
│ │ │ processOutputStream │ ← Runs on EACH stream chunk │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ processOutputStep │ ← Runs after EACH LLM step │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Tool Execution (if needed) │ │
│ │ │ │ │
│ │ └──────── Loop back if tools called ────────│ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ processOutputResult │ ← Runs ONCE after completion │
│ └─────────────────────┘ │
│ │ │
│ ▼ │
│ Final Response │
│ │
└─────────────────────────────────────────────────────────────────┘
| Method | When it runs | Use case |
|---|---|---|
processInput | Once at the start, before the agentic loop | Validate/transform initial user input, add context |
processInputStep | At each step of the agentic loop, before each LLM call | Transform messages between steps, handle tool results |
processAPIError | When an LLM API call fails | Inspect API rejections, optionally mutate state/messages, and request a retry |
processOutputStream | On each streaming chunk during LLM response | Filter/modify streaming content, detect patterns in real-time |
processOutputStep | After each LLM response, before tool execution | Validate output quality, implement guardrails with retry |
processOutputResult | Once after generation completes | Post-process final response, log results |
Interface definitionDirect link to Interface definition
interface Processor<TId extends string = string, TTripwireMetadata = unknown> {
readonly id: TId
readonly name?: string
readonly description?: string
/** Index of this processor in the workflow (set at runtime when combining processors). */
processorIndex?: number
/** When true, processOutputStream also receives `data-*` chunks. Default: false. */
processDataParts?: boolean
processInput?(
args: ProcessInputArgs<TTripwireMetadata>,
): Promise<ProcessInputResult> | ProcessInputResult
processInputStep?(
args: ProcessInputStepArgs<TTripwireMetadata>,
):
| Promise<ProcessInputStepResult | MessageList | MastraDBMessage[] | undefined | void>
| ProcessInputStepResult
| MessageList
| MastraDBMessage[]
| void
| undefined
processAPIError?(
args: ProcessAPIErrorArgs<TTripwireMetadata>,
): Promise<ProcessAPIErrorResult | void> | ProcessAPIErrorResult | void
processOutputStream?(
args: ProcessOutputStreamArgs<TTripwireMetadata>,
): Promise<ChunkType | null | undefined>
processOutputStep?(args: ProcessOutputStepArgs<TTripwireMetadata>): ProcessorMessageResult
processOutputResult?(args: ProcessOutputResultArgs<TTripwireMetadata>): ProcessorMessageResult
}
PropertiesDirect link to Properties
id:
name?:
description?:
processorIndex?:
processDataParts?:
Message argumentsDirect link to Message arguments
Most processor methods receive both messages and messageList. They point to the same underlying conversation but expose it differently.
messages vs messageListDirect link to messages-vs-messagelist
messages: A plain array ofMastraDBMessageobjects, scoped to the current stage. ForprocessInputandprocessInputStepthis excludes system messages. ForprocessOutputResultandprocessOutputStepthis includes the latest LLM response. The array is backed bymessageList, so editing a message'scontent.partsin place is visible to downstream processors and to persistence.messageList: The liveMessageListinstance backing the run. It exposes filtered views (input, response, remembered, all), multiple output formats (db, ui, core), and methods for mutating the conversation.
Use messages when you only need to read, map over, or lightly edit fields on the current stage's messages. Use messageList when you need to:
- Read messages from another stage, for example input messages while processing output.
- Add, remove, or replace whole messages.
- Convert to another format such as UI or core messages for a third-party API.
messages is always derived from messageList, so mutating messageList is the canonical way to add, remove, or reorder messages. For in-place edits to a message's content (for example, rewriting content.parts), mutating messages directly is equivalent. If you return a new array from messages, Mastra reconciles it against messageList for the current stage.
PersistenceDirect link to Persistence
When memory is enabled, only what ends up in messageList after all processors finish is persisted to storage. The two return styles are equivalent for persistence:
- Mutating
messageListdirectly (or returning the sameMessageListinstance) — recorded mutations are applied in place, so the saved conversation reflects your changes. - Returning a
MastraDBMessage[]or{ messages, systemMessages }— Mastra reconciles the returned array againstmessageListfor the current stage, removing missing messages and replacing system messages.
Returning a different MessageList instance is an error; always mutate the one passed to your processor.
Reading text from a messageDirect link to Reading text from a message
MastraDBMessage.content is a structured object, not a string. The canonical way to read user or assistant text is content.parts:
import type { MastraDBMessage } from '@mastra/core/memory'
function getText(message: MastraDBMessage): string {
let text = ''
if (message.content.parts) {
for (const part of message.content.parts) {
if (part.type === 'text' && typeof part.text === 'string') {
text += part.text
}
}
}
// Fallback for legacy messages that only have the flattened `content` string
if (!text && typeof message.content.content === 'string') {
text = message.content.content
}
return text
}
Key points:
message.content.partsis the primary source. A single message can contain multiple parts, including non-text parts such as tool calls, tool results, and file parts. Filter bypart.type === 'text'before readingpart.text.message.content.contentis a flattened string kept for backward compatibility. Use it only as a fallback whenpartsis empty or missing.message.contentitself is never a plain string onMastraDBMessage. LegacyCoreMessageshapes may be strings, but processors always receiveMastraDBMessage.
MethodsDirect link to Methods
processInputDirect link to processinput
Processes input messages before they're sent to the LLM. Runs once at the start of agent execution.
processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;
ProcessInputArgsDirect link to processinputargs
messages:
systemMessages:
messageList:
abort:
retryCount:
tracingContext?:
requestContext?:
ProcessInputResultDirect link to processinputresult
The method can return one of three types:
MastraDBMessage[]:
MessageList:
{ messages, systemMessages }:
processInputStepDirect link to processinputstep
Processes input messages at each step of the agentic loop, before they're sent to the LLM. Unlike processInput which runs once at the start, this runs at every step including tool call continuations.
processInputStep?<TTripwireMetadata = unknown>(
args: ProcessInputStepArgs<TTripwireMetadata>,
):
| Promise<ProcessInputStepResult | MessageList | MastraDBMessage[] | void | undefined>
| ProcessInputStepResult
| MessageList
| MastraDBMessage[]
| void
| undefined;
Execution order in the agentic loopDirect link to Execution order in the agentic loop
processInput(once at start)processInputStepfrom inputProcessors (at each step, before LLM call)prepareStepcallback (runs as part of the processInputStep pipeline, after inputProcessors)- LLM execution
- Tool execution (if needed)
- Repeat from step 2 if tools were called
ProcessInputStepArgsDirect link to processinputstepargs
messages:
messageList:
stepNumber:
steps:
systemMessages:
model:
toolChoice?:
activeTools?:
tools?:
providerOptions?:
modelSettings?:
structuredOutput?:
abort:
retryCount:
tracingContext?:
requestContext?:
ProcessInputStepResultDirect link to processinputstepresult
processInputStep can return several shapes:
ProcessInputStepResultobject — override any combination of the properties below for this step (described next).MessageList— return the samemessageListinstance to signal you mutated messages in place.MastraDBMessage[]— return a transformed messages array; replaces the step's messages.voidorundefined— return nothing to leave the step unchanged.
The object form can return any combination of these properties:
model?:
toolChoice?:
activeTools?:
tools?:
messages?:
messageList?:
systemMessages?:
providerOptions?:
modelSettings?:
structuredOutput?:
Processor chainingDirect link to Processor chaining
When multiple processors implement processInputStep, they run in order and changes chain through:
Processor 1: receives { model: 'gpt-5.4' } → returns { model: 'gpt-5.4-mini' }
Processor 2: receives { model: 'gpt-5.4-mini' } → returns { toolChoice: 'none' }
Final: model = 'gpt-5.4-mini', toolChoice = 'none'
System message isolationDirect link to System message isolation
System messages are reset to their original values at the start of each step. Modifications made in processInputStep only affect the current step, not subsequent steps.
Use casesDirect link to Use cases
- Dynamic model switching based on step number or context
- Disabling tools after a certain number of steps
- Dynamically adding or replacing tools based on conversation context
- Transforming message part types between providers (e.g.,
reasoning→thinkingfor Anthropic) - Modifying messages based on step number or accumulated context
- Adding step-specific system instructions
- Adjusting provider options per step (e.g., cache control)
- Modifying structured output schema based on step context
processAPIErrorDirect link to processapierror
Handles LLM API rejection errors before they surface as final errors. This runs when the API call fails with a non-retryable error (such as a 400 or 422 status code). Unlike processOutputStep which runs after successful responses, this runs when the API rejects the request.
Add processors that implement processAPIError to an agent's errorProcessors array.
Processors can inspect the error, modify the request (for example, by appending messages to the messageList), and return { retry: true } to signal a retry with the modified state.
processAPIError?(args: ProcessAPIErrorArgs): Promise<ProcessAPIErrorResult | void> | ProcessAPIErrorResult | void;
ProcessAPIErrorArgsDirect link to processapierrorargs
error:
messages:
messageList:
stepNumber:
steps:
state:
retryCount:
abort:
writer?:
requestContext?:
abortSignal?:
ProcessAPIErrorResultDirect link to processapierrorresult
retry:
Use casesDirect link to Use cases
- Handling API-specific rejections by modifying the request and retrying
- Converting non-retryable errors into retryable ones with request modifications
- Implementing model-specific error recovery strategies
Example: Custom error recoveryDirect link to Example: Custom error recovery
import { APICallError } from '@ai-sdk/provider'
import type { Processor, ProcessAPIErrorArgs, ProcessAPIErrorResult } from '@mastra/core/processors'
export class ErrorRecoveryProcessor implements Processor {
id = 'error-recovery'
processAPIError({
error,
messageList,
retryCount,
}: ProcessAPIErrorArgs): ProcessAPIErrorResult | void {
// Only retry once
if (retryCount > 0) return
// Check for a specific API error
if (APICallError.isInstance(error) && error.message.includes('context length exceeded')) {
// Trim older messages to fit within context
const messages = messageList.get.all.db()
if (messages.length > 4) {
messageList.removeByIds([messages[1]!.id, messages[2]!.id])
return { retry: true }
}
}
}
}
processOutputStreamDirect link to processoutputstream
Processes streaming output chunks with built-in state management. Allows processors to accumulate chunks and make decisions based on larger context.
processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;
ProcessOutputStreamArgsDirect link to processoutputstreamargs
part:
streamParts:
state:
abort:
retryCount:
messageList?:
tracingContext?:
requestContext?:
writer?:
Return valueDirect link to Return value
processOutputStream returns Promise<ChunkType | null | undefined>.
- Return the
ChunkTypeto emit the chunk. Return the originalpartto emit it unchanged, or a newChunkTypeto emit a modified chunk. - Return
nullto drop the chunk. Nothing is sent to the next processor or the client. - Return
undefined(including implicitundefinedfrom areturn;statement or a method that falls off the end) to drop the chunk.nullandundefinedbehave the same way.
Dropping a chunk only affects that single chunk. The stream continues and the next chunk is still processed. To stop the stream entirely, call abort().
processOutputResultDirect link to processoutputresult
Processes the complete output result after streaming or generation is finished.
processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;
ProcessOutputResultArgsDirect link to processoutputresultargs
messages:
messageList:
state:
result:
abort:
retryCount:
tracingContext?:
requestContext?:
writer?:
processOutputStepDirect link to processoutputstep
Processes output after each LLM response in the agentic loop, before tool execution. Unlike processOutputResult which runs once at the end, this runs at every step. This is the ideal method for implementing guardrails that can trigger retries.
processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;
ProcessOutputStepArgsDirect link to processoutputstepargs
messages:
messageList:
stepNumber:
finishReason?:
toolCalls?:
text?:
usage:
systemMessages:
steps:
state:
abort:
retryCount:
tracingContext?:
requestContext?:
Use casesDirect link to Use cases
- Implementing quality guardrails that can request retries
- Validating LLM output before tool execution
- Adding per-step logging or metrics
- Implementing output moderation with retry capability
Example: Quality guardrail with retryDirect link to Example: Quality guardrail with retry
import type { Processor } from '@mastra/core/processors'
export class QualityGuardrail implements Processor {
id = 'quality-guardrail'
async processOutputStep({ text, abort, retryCount }) {
const score = await evaluateResponseQuality(text)
if (score < 0.7) {
if (retryCount < 3) {
// Request retry with feedback for the LLM
abort('Response quality too low. Please provide more detail.', {
retry: true,
metadata: { qualityScore: score },
})
} else {
// Max retries reached, block the response
abort('Response quality too low after multiple attempts.')
}
}
return []
}
}
Processor typesDirect link to Processor types
Mastra provides type aliases to ensure processors implement the required methods:
// Must implement processInput OR processInputStep (or both)
type InputProcessor = Processor & ({ processInput: required } | { processInputStep: required })
// Must implement processOutputStream, processOutputStep, OR processOutputResult (or any combination)
type OutputProcessor = Processor &
(
| { processOutputStream: required }
| { processOutputStep: required }
| { processOutputResult: required }
)
// Must implement processAPIError
type ErrorProcessor = Processor & { processAPIError: required }
Configure processors that implement processAPIError in errorProcessors:
const agent = new Agent({
// ...
errorProcessors: [new PrefillErrorHandler()],
})
Usage examplesDirect link to Usage examples
Basic input processorDirect link to Basic input processor
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'
export class LowercaseProcessor implements Processor {
id = 'lowercase'
async processInput({ messages }): Promise<MastraDBMessage[]> {
return messages.map(msg => ({
...msg,
content: {
...msg.content,
parts: msg.content.parts?.map(part =>
part.type === 'text' ? { ...part, text: part.text.toLowerCase() } : part,
),
},
}))
}
}
Per-step processor with processInputStepDirect link to per-step-processor-with-processinputstep
import type {
Processor,
ProcessInputStepArgs,
ProcessInputStepResult,
} from '@mastra/core/processors'
export class DynamicModelProcessor implements Processor {
id = 'dynamic-model'
async processInputStep({
stepNumber,
steps,
toolChoice,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: 'openai/gpt-5-mini' }
}
// Switch to powerful model after tool calls
if (steps.length > 0 && steps[steps.length - 1].toolCalls?.length) {
return { model: 'openai/gpt-5.4' }
}
// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: 'none' }
}
return {}
}
}
Message transformer with processInputStepDirect link to message-transformer-with-processinputstep
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'
export class ReasoningTransformer implements Processor {
id = 'reasoning-transformer'
async processInputStep({ messages, messageList }) {
// Transform reasoning parts to thinking parts at each step
// This is useful when switching between model providers
for (const msg of messages) {
if (msg.role === 'assistant' && msg.content.parts) {
for (const part of msg.content.parts) {
if (part.type === 'reasoning') {
;(part as any).type = 'thinking'
}
}
}
}
return messageList
}
}
Hybrid processor (input and output)Direct link to Hybrid processor (input and output)
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'
import type { ChunkType } from '@mastra/core/stream'
export class ContentFilter implements Processor {
id = 'content-filter'
private blockedWords: string[]
constructor(blockedWords: string[]) {
this.blockedWords = blockedWords
}
async processInput({ messages, abort }): Promise<MastraDBMessage[]> {
for (const msg of messages) {
const text = msg.content.parts
?.filter(p => p.type === 'text')
.map(p => p.text)
.join(' ')
if (this.blockedWords.some(word => text?.includes(word))) {
abort('Blocked content detected in input')
}
}
return messages
}
async processOutputStream({ part, abort }): Promise<ChunkType | null> {
if (part.type === 'text-delta') {
if (this.blockedWords.some(word => part.payload.text.includes(word))) {
abort('Blocked content detected in output')
}
}
return part
}
}
Stream accumulator with stateDirect link to Stream accumulator with state
import type { Processor } from '@mastra/core/processors'
import type { ChunkType } from '@mastra/core/stream'
export class WordCounter implements Processor {
id = 'word-counter'
async processOutputStream({ part, state }): Promise<ChunkType> {
// Initialize state on first chunk
if (!state.wordCount) {
state.wordCount = 0
}
// Count words in text chunks
if (part.type === 'text-delta') {
const words = part.payload.text.split(/\s+/).filter(Boolean)
state.wordCount += words.length
}
// Log word count on finish
if (part.type === 'finish') {
console.log(`Total words: ${state.wordCount}`)
}
return part
}
}
State lifecycleDirect link to State lifecycle
Every processor receives a state object in processOutputStream, processOutputStep, processOutputResult, and processAPIError. State has three important properties:
- Per-processor: Each processor gets its own
stateobject, keyed by the processor'sid. Two processors with different ids cannot read or overwrite each other's state. - Per-request: A fresh state object is created at the start of every
agent.generate()oragent.stream()call. State does not leak between requests or between users. - Shared across methods: Within one request, the same
stateobject is passed toprocessOutputStream(for every chunk),processOutputStep(after every LLM step),processOutputResult(once at the end), andprocessAPIError(when an LLM call fails). Accumulate data inprocessOutputStreamand read it inprocessOutputResultorprocessAPIError.
Initialize fields defensively on first access, because state starts as an empty object:
import type { Processor } from '@mastra/core/processors'
export class WordCounter implements Processor {
id = 'word-counter'
async processOutputStream({ part, state }) {
state.wordCount ??= 0
if (part.type === 'text-delta') {
state.wordCount += part.payload.text.split(/\s+/).filter(Boolean).length
}
return part
}
}
Aborting and tripwire chunksDirect link to Aborting and tripwire chunks
The abort function on each method throws a TripWire error that stops processing and emits a tripwire chunk on the output stream. Clients can detect the chunk to distinguish a blocked response from a normal finish.
abort('Blocked content detected', { retry: false, metadata: { category: 'pii' } })
reason: A human-readable explanation. Appears astripwire.payload.reason.retry: Whentrue, the agent retries the same step withreasonfed back as feedback. Retries only run whenmaxProcessorRetriesis set on the agent or call; otherwise the request aborts. WhenerrorProcessorsare configured,maxProcessorRetriesdefaults to10for that call.metadata: Optional structured data attached to thetripwirechunk for downstream consumers.
The emitted tripwire chunk has the shape:
type TripwireChunk = {
type: 'tripwire'
runId: string
from: 'AGENT'
payload: {
reason: string
retry?: boolean
metadata?: unknown
processorId: string
}
}
In non-streaming calls (agent.generate()), the result exposes the same information as result.tripwire and result.finishReason === 'other'.
Emitting custom data chunksDirect link to Emitting custom data chunks
Processors with access to writer can stream custom data-* chunks to the client by calling writer.custom(chunk). Tools can do the same through their own writer. This is the only way for a processor to emit content outside of normal text and tool chunks.
await writer.custom({
type: 'data-moderation',
runId,
from: 'AGENT',
data: { level: 'warn', reason: 'Possibly unsafe' },
})
By default, processors do not see data-* chunks in processOutputStream so they don't accidentally process tool telemetry or their own output. Opt in by setting processDataParts: true on the processor:
class ModerationCollector implements Processor {
id = 'moderation-collector'
processDataParts = true
async processOutputStream({ part, state }) {
if (part.type === 'data-moderation') {
state.warnings ??= []
state.warnings.push(part.data)
}
return part
}
}
The chunk type must start with data- to be treated as a custom data chunk. Returning null or undefined from processOutputStream still drops the chunk, so a processor can inspect, modify, or filter custom data the same way it filters text chunks.
Configuring processors on an agentDirect link to Configuring processors on an agent
Processors are attached to an agent through three arrays:
import { Agent } from '@mastra/core/agent'
import { PrefillErrorHandler } from '@mastra/core/processors'
const agent = new Agent({
name: 'support-agent',
model: 'openai/gpt-5',
instructions: '...',
inputProcessors: [new ContentFilter(['secret'])],
outputProcessors: [new WordCounter()],
errorProcessors: [new PrefillErrorHandler()],
maxProcessorRetries: 3,
})
inputProcessors: Run before the LLM. Receives input messages.outputProcessors: Run during or after the LLM response. Receives output chunks or messages.errorProcessors: Run when the LLM API call throws. Receives the raw error.
Each array also accepts a function so processors can be built per-request from RequestContext:
new Agent({
// ...
inputProcessors: ({ requestContext }) => {
const blockedWords = requestContext.get('blockedWords') ?? []
return [new ContentFilter(blockedWords)]
},
})
Per-call overridesDirect link to Per-call overrides
agent.generate() and agent.stream() accept inputProcessors, outputProcessors, errorProcessors, and maxProcessorRetries. When any processor array is set on the call, it replaces the matching array configured on the agent for that request. Memory, workspace, skill, channel, and browser processors that Mastra adds automatically are always preserved and run around your array.
await agent.stream('Summarize this', {
outputProcessors: [new StreamFilter()],
maxProcessorRetries: 5,
})
maxProcessorRetries passed on the call overrides the agent default. If neither is set, processor-requested retries are treated as aborts.
RelatedDirect link to Related
- Processors overview: Conceptual guide to processors
- Guardrails: Security and validation processors
- Memory Processors: Memory-specific processors