Skip to main content

Processors

Processors transform, validate, or control messages as they pass through an agent. They run at specific points in the agent's execution pipeline, allowing you to modify inputs before they reach the language model or outputs before they're returned to users.

Processors are configured as:

  • inputProcessors: Run before messages reach the language model.
  • outputProcessors: Run after the language model generates a response, but before it's returned to users.

You can use individual Processor objects or compose them into workflows using Mastra's workflow primitives. Workflows give you advanced control over processor execution order, parallel processing, and conditional logic.

Some processors implement both input and output logic and can be used in either array depending on where the transformation should occur.

Some built-in processors also persist hidden system reminder messages using <system-reminder>...</system-reminder> text plus metadata.systemReminder. These reminders stay available in raw memory history and retry/prompt reconstruction paths, but standard UI-facing message conversions and default memory recall hide them unless you explicitly opt in.

When to use processors
Direct link to When to use processors

Use processors to:

  • Normalize or validate user input
  • Add guardrails to your agent
  • Detect and prevent prompt injection or jailbreak attempts
  • Moderate content for safety or compliance
  • Transform messages (e.g., translate languages, filter tool calls)
  • Limit token usage or message history length
  • Redact sensitive information (PII)
  • Apply custom business logic to messages

Mastra includes several processors for common use cases. You can also create custom processors for application-specific requirements.

Quickstart
Direct link to Quickstart

Import and instantiate the processor, then pass it to the agent's inputProcessors or outputProcessors array:

src/mastra/agents/moderated-agent.ts
import { Agent } from '@mastra/core/agent'
import { ModerationProcessor } from '@mastra/core/processors'

export const moderatedAgent = new Agent({
name: 'moderated-agent',
instructions: 'You are a helpful assistant',
model: 'openai/gpt-5-mini',
inputProcessors: [
new ModerationProcessor({
model: 'openai/gpt-5-mini',
categories: ['hate', 'harassment', 'violence'],
threshold: 0.7,
strategy: 'block',
}),
],
})

Execution order
Direct link to Execution order

Processors run in the order they appear in the array:

inputProcessors: [new UnicodeNormalizer(), new PromptInjectionDetector(), new ModerationProcessor()]

For output processors, the order determines the sequence of transformations applied to the model's response.

With memory enabled
Direct link to With memory enabled

When memory is enabled on an agent, memory processors are automatically added to the pipeline:

Input processors:

[Memory Processors] → [Your inputProcessors]

Memory loads message history first, then your processors run.

Output processors:

[Your outputProcessors] → [Memory Processors]

Your processors run first, then memory persists messages.

This ordering ensures that if your output guardrail calls abort(), memory processors are skipped and no messages are saved. See Memory Processors for details.

Attach processors to an agent
Direct link to Attach processors to an agent

Processors are configured on the agent through three arrays:

import { Agent } from '@mastra/core/agent'
import { PrefillErrorHandler, TokenLimiter, ModerationProcessor } from '@mastra/core/processors'

const agent = new Agent({
name: 'support-agent',
model: 'openai/gpt-5',
instructions: '...',
inputProcessors: [
new TokenLimiter(4000),
new ModerationProcessor({ model: 'openai/gpt-4.1-nano' }),
],
outputProcessors: [new ModerationProcessor({ model: 'openai/gpt-4.1-nano' })],
errorProcessors: [new PrefillErrorHandler()],
})
  • inputProcessors run before the LLM.
  • outputProcessors run during and after the LLM response.
  • errorProcessors run when the LLM API call throws, so they can recover from provider errors.

Each array also accepts a function that returns an array, so processors can be built per-request from RequestContext:

new Agent({
// ...
inputProcessors: ({ requestContext }) => {
const limit = requestContext.get('tokenLimit') ?? 4000
return [new TokenLimiter(limit)]
},
})

Override processors per call
Direct link to Override processors per call

agent.generate() and agent.stream() accept the same three arrays. When you pass one, it replaces the matching array on the agent for that call only. Memory, workspace, and other framework-managed processors still run around your array.

await agent.stream('Summarize this', {
inputProcessors: [new TokenLimiter(2000)],
maxProcessorRetries: 5,
})

Create custom processors
Direct link to Create custom processors

Custom processors implement the Processor interface.

Processor methods receive two arguments for accessing the conversation:

  • messages: A snapshot array of MastraDBMessage objects for the current stage.
  • messageList: The live MessageList instance. Use it to read other stages, or to add, remove, or replace messages in place.

Text lives in message.content.parts, not on message.content itself. Iterate parts and filter by part.type === 'text' to read user or assistant text. A flattened message.content.content string exists for legacy compatibility and can be used as a fallback. See Message arguments in the Processor reference for full details.

Transform input messages
Direct link to Transform input messages

src/mastra/processors/custom-input.ts
import type { Processor, ProcessInputArgs } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class CustomInputProcessor implements Processor {
id = 'custom-input'

async processInput({ messages }: ProcessInputArgs): Promise<MastraDBMessage[]> {
// Transform messages before they reach the LLM.
// Text lives in content.parts — iterate parts and rewrite text parts only.
return messages.map(msg => ({
...msg,
content: {
...msg.content,
parts: msg.content.parts?.map(part =>
part.type === 'text' ? { ...part, text: part.text.toLowerCase() } : part,
),
},
}))
}
}

The processInput() method receives messages, systemMessages, and an abort() function. Return a MastraDBMessage[] to replace messages, or { messages, systemMessages } to also modify system messages.

See the Processor reference for all available arguments and return types.

Control each step
Direct link to Control each step

While processInput() runs once at the start of agent execution, processInputStep() runs at each step of the agentic loop (including tool call continuations). This enables per-step configuration changes like dynamic model switching or tool choice modifications.

src/mastra/processors/step-processor.ts
import type {
Processor,
ProcessInputStepArgs,
ProcessInputStepResult,
} from '@mastra/core/processors'

export class DynamicModelProcessor implements Processor {
id = 'dynamic-model'

async processInputStep({
stepNumber,
model,
toolChoice,
messageList,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: 'openai/gpt-5-mini' }
}

// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: 'none' }
}

// No changes for other steps
return {}
}
}

The method receives the current stepNumber, model, tools, toolChoice, messages, and more. Return an object with any properties you want to override for that step, for example { model, toolChoice, tools, systemMessages }.

See the Processor reference for all available arguments and return types.

Use the prepareStep() callback
Direct link to use-the-preparestep-callback

The prepareStep() callback on generate() or stream() is a shorthand for processInputStep(). Internally, Mastra wraps it in a processor that calls your function at each step. It accepts the same arguments and return type as processInputStep(), but doesn't require creating a class:

await agent.generate('Complex task', {
prepareStep: async ({ stepNumber, model }) => {
if (stepNumber === 0) {
return { model: 'openai/gpt-5-mini' }
}
if (stepNumber > 5) {
return { toolChoice: 'none' }
}
},
})

Transform output messages
Direct link to Transform output messages

src/mastra/processors/custom-output.ts
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class CustomOutputProcessor implements Processor {
id = 'custom-output'

async processOutputResult({ messages }): Promise<MastraDBMessage[]> {
// Transform messages after the LLM generates them
return messages.filter(msg => msg.role !== 'system')
}
}

The method also receives a result object with the full generation data — text, usage (token counts), finishReason, and steps (each containing toolCalls, toolResults, etc.). Use it to track usage or inspect tool calls:

src/mastra/processors/usage-tracker.ts
import type { Processor } from '@mastra/core/processors'

export class UsageTracker implements Processor {
id = 'usage-tracker'

async processOutputResult({ messages, result }) {
console.log(`Tokens: ${result.usage.inputTokens} in, ${result.usage.outputTokens} out`)
console.log(`Finish reason: ${result.finishReason}`)
return messages
}
}

Filter streamed output
Direct link to Filter streamed output

The processOutputStream() method transforms or filters streaming chunks before they reach the client:

src/mastra/processors/stream-filter.ts
import type { Processor } from '@mastra/core/processors'
import type { ChunkType } from '@mastra/core/stream'

export class StreamFilter implements Processor {
id = 'stream-filter'

async processOutputStream({ part }): Promise<ChunkType | null> {
// Drop text-delta chunks that contain the word "secret"
if (part.type === 'text-delta' && part.payload.text.includes('secret')) {
return null
}

// Return the (possibly modified) chunk to emit it
return part
}
}

Return values:

  • A ChunkType emits that chunk. Return the original part to pass it through unchanged.
  • null or undefined drops the chunk. Both behave the same way, so a method that falls through without returning also drops the chunk.
  • Dropping only affects one chunk. To stop the stream entirely, call abort().

To also receive custom data-* chunks emitted by tools via writer.custom(), set processDataParts = true on your processor. This lets you inspect, modify, or block tool-emitted data chunks before they reach the client.

Validate each response
Direct link to Validate each response

The processOutputStep() method runs after each LLM step, allowing you to validate the response and optionally request a retry:

src/mastra/processors/response-validator.ts
import type { Processor } from '@mastra/core/processors'

export class ResponseValidator implements Processor {
id = 'response-validator'

async processOutputStep({ text, abort, retryCount }) {
const isValid = await validateResponse(text)

if (!isValid && retryCount < 3) {
abort('Response did not meet requirements. Try again.', { retry: true })
}

return []
}
}

For more on retry behavior, see Retry mechanism in Advanced patterns.

Persist data across chunks and steps
Direct link to Persist data across chunks and steps

Output methods receive a state object that persists for the lifetime of one request. State is keyed by the processor's id, so each processor sees only its own data, and it is shared between processOutputStream, processOutputStep, and processOutputResult. A new state object is created for every new agent.generate() or agent.stream() call.

src/mastra/processors/word-counter.ts
import type { Processor } from '@mastra/core/processors'

export class WordCounter implements Processor {
id = 'word-counter'

async processOutputStream({ part, state }) {
state.wordCount ??= 0
if (part.type === 'text-delta') {
state.wordCount += part.payload.text.split(/\s+/).filter(Boolean).length
}
return part
}

async processOutputResult({ messages, state }) {
console.log(`Total words: ${state.wordCount}`)
return messages
}
}

Built-in utility processors
Direct link to Built-in utility processors

Mastra provides utility processors for common tasks:

For security and validation processors, see the Guardrails page for input/output guardrails and moderation processors. For memory-specific processors, see the Memory Processors page for processors that handle message history, semantic recall, and working memory.

TokenLimiter
Direct link to tokenlimiter

Prevents context window overflow by removing older messages when the total token count exceeds a specified limit. Prioritizes recent messages and preserves system messages.

import { Agent } from '@mastra/core/agent'
import { TokenLimiter } from '@mastra/core/processors'

const agent = new Agent({
name: 'my-agent',
model: 'openai/gpt-5.4',
inputProcessors: [new TokenLimiter(127000)],
})

See the TokenLimiterProcessor reference for custom encoding, strategy, and count mode options.

ToolCallFilter
Direct link to toolcallfilter

Removes tool calls and results from messages sent to the LLM, saving tokens on verbose tool interactions. Optionally exclude only specific tools. This filter only affects the LLM input, filtered messages are still saved to memory.

By default, ToolCallFilter filters the initial input before the agent loop starts. Use filterAfterToolSteps to also filter during each loop step while preserving recent tool-producing steps.

new ToolCallFilter({
filterAfterToolSteps: 2,
})

See the ToolCallFilter reference for configuration options and the Memory Processors page for pre-memory filtering.

ToolSearchProcessor
Direct link to toolsearchprocessor

Enables dynamic tool discovery for agents with large tool libraries. Instead of providing all tools upfront, the processor gives the agent search_tools and load_tool meta-tools to find and load tools by keyword on demand, reducing context token usage.

See the ToolSearchProcessor reference for configuration options and usage examples.

Advanced patterns
Direct link to Advanced patterns

Ensure a final response with maxSteps
Direct link to ensure-a-final-response-with-maxsteps

When using maxSteps to limit agent execution, the agent may return an empty response if it attempts a tool call on the final step. Use processInputStep() to force a text response on the last step:

src/mastra/processors/ensure-final-response.ts
import type {
Processor,
ProcessInputStepArgs,
ProcessInputStepResult,
} from '@mastra/core/processors'

export class EnsureFinalResponseProcessor implements Processor {
readonly id = 'ensure-final-response'

private maxSteps: number

constructor(maxSteps: number) {
this.maxSteps = maxSteps
}

async processInputStep({
stepNumber,
systemMessages,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// On the last step, prevent tool calls and instruct the LLM to summarize
if (stepNumber === this.maxSteps - 1) {
return {
tools: {},
toolChoice: 'none',
systemMessages: [
...systemMessages,
{
role: 'system',
content:
'You have reached the maximum number of steps. Summarize your progress so far and provide a best-effort response. If the task is incomplete, clearly indicate what remains to be done.',
},
],
}
}
return {}
}
}

Add it to inputProcessors and pass the same maxSteps value to generate() or stream():

const MAX_STEPS = 5

const agent = new Agent({
inputProcessors: [new EnsureFinalResponseProcessor(MAX_STEPS)],
// ...
})

await agent.generate('Your prompt', { maxSteps: MAX_STEPS })

Emit custom stream events
Direct link to Emit custom stream events

Output processors receive a writer object that lets you emit custom data chunks back to the client during streaming. This is useful for use cases like streaming moderation results or sending UI update signals without blocking the original stream.

src/mastra/processors/moderation-processor.ts
import type { Processor } from '@mastra/core/processors'

export class ModerationProcessor implements Processor {
id = 'moderation'

async processOutputResult({ messages, writer }) {
// Run moderation on the final output
const text = messages
.filter(m => m.role === 'assistant')
.flatMap(m => m.content.parts?.filter(p => p.type === 'text'))
.map(p => p.text)
.join(' ')

const result = await runModeration(text)

if (result.requiresChange) {
// Emit a custom event to the client with the moderated text
await writer?.custom({
type: 'data-moderation-update',
data: {
originalText: text,
moderatedText: result.moderatedText,
reason: result.reason,
},
})
}

return messages
}
}

On the client, listen for the custom chunk type in the stream:

const stream = await agent.stream('Hello')

for await (const chunk of stream.fullStream) {
if (chunk.type === 'data-moderation-update') {
// Update the UI with moderated text
updateDisplayedMessage(chunk.data.moderatedText)
}
}

Custom chunk types must use the data- prefix (e.g., data-moderation-update, data-status).

By default, processOutputStream() skips data-* chunks so it does not accidentally operate on tool telemetry or other processors' output. To inspect, modify, or block these chunks in a processor, set processDataParts = true on that processor:

class ModerationCollector implements Processor {
id = 'moderation-collector'
processDataParts = true

async processOutputStream({ part, state }) {
if (part.type === 'data-moderation-update') {
state.warnings ??= []
state.warnings.push(part.data)
}
return part
}
}

Add metadata to messages
Direct link to Add metadata to messages

You can add custom metadata to messages in processOutputResult. This metadata is accessible via the response object:

src/mastra/processors/metadata-processor.ts
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class MetadataProcessor implements Processor {
id = 'metadata-processor'

async processOutputResult({
messages,
}: {
messages: MastraDBMessage[]
}): Promise<MastraDBMessage[]> {
return messages.map(msg => {
if (msg.role === 'assistant') {
return {
...msg,
content: {
...msg.content,
metadata: {
...msg.content.metadata,
processedAt: new Date().toISOString(),
customData: 'your data here',
},
},
}
}
return msg
})
}
}

Access the metadata with generate():

const result = await agent.generate('Hello')

// The response includes uiMessages with processor-added metadata
const assistantMessage = result.response?.uiMessages?.find(m => m.role === 'assistant')
console.log(assistantMessage?.metadata?.customData)

For streaming, access metadata from the finish chunk payload or the stream.response promise.

Use workflows as processors
Direct link to Use workflows as processors

You can use Mastra workflows as processors to create complex processing pipelines with parallel execution, conditional branching, and error handling:

src/mastra/processors/moderation-workflow.ts
import { createWorkflow, createStep } from '@mastra/core/workflows'
import {
ProcessorStepSchema,
PromptInjectionDetector,
PIIDetector,
ModerationProcessor,
} from '@mastra/core/processors'
import { Agent } from '@mastra/core/agent'

// Create a workflow that runs multiple checks in parallel
const moderationWorkflow = createWorkflow({
id: 'moderation-pipeline',
inputSchema: ProcessorStepSchema,
outputSchema: ProcessorStepSchema,
})
.parallel([
createStep(
new PIIDetector({
strategy: 'redact',
}),
),
createStep(
new PromptInjectionDetector({
strategy: 'block',
}),
),
createStep(
new ModerationProcessor({
strategy: 'block',
}),
),
])
.map(async ({ inputData }) => {
return inputData['processor:pii-detector']
})
.commit()

// Use the workflow as an input processor
const agent = new Agent({
id: 'moderated-agent',
name: 'Moderated Agent',
model: 'openai/gpt-5.4',
inputProcessors: [moderationWorkflow],
})

After a .parallel() step, each branch result is keyed by its processor ID (e.g. processor:pii-detector). Use .map() to select the branch whose output the next step should receive.

If a branch uses a mutating strategy like redact, map to that branch so its transformed messages carry forward. If all branches only block, any branch works. Pick any one since none of them modify the messages.

When an agent is registered with Mastra, processor workflows are automatically registered as workflows, allowing you to view and debug them in the Studio.

Retry mechanism
Direct link to Retry mechanism

Processors can request that the LLM retry its response with feedback. This is useful for implementing quality checks, output validation, or iterative refinement:

src/mastra/processors/quality-checker.ts
import type { Processor } from '@mastra/core/processors'

export class QualityChecker implements Processor {
id = 'quality-checker'

async processOutputStep({ text, abort, retryCount }) {
const qualityScore = await evaluateQuality(text)

if (qualityScore < 0.7 && retryCount < 3) {
// Request a retry with feedback for the LLM
abort('Response quality score too low. Please provide a more detailed answer.', {
retry: true,
metadata: { score: qualityScore },
})
}

return []
}
}

const agent = new Agent({
id: 'quality-agent',
name: 'Quality Agent',
model: 'openai/gpt-5.4',
outputProcessors: [new QualityChecker()],
maxProcessorRetries: 3, // Maximum retry attempts. If unset, retries are disabled (unless errorProcessors are configured, in which case it defaults to 10).
})

The retry mechanism:

  • Only works in processOutputStep() and processInputStep() methods
  • Replays the step with the abort reason added as context for the LLM
  • Tracks retry count via the retryCount parameter
  • Respects maxProcessorRetries limit on the agent

Abort and tripwire chunks
Direct link to Abort and tripwire chunks

Calling abort(reason, options) throws a TripWire error that ends processing. On streams, Mastra emits a tripwire chunk clients can detect:

for await (const chunk of stream.fullStream) {
if (chunk.type === 'tripwire') {
console.log('Blocked by', chunk.payload.processorId, '-', chunk.payload.reason)
break
}
}

For agent.generate(), the result exposes the same information as result.tripwire with result.finishReason === 'other'.

abort accepts a second options argument:

  • retry: true asks the agent to retry instead of ending. Retries require maxProcessorRetries to be set on the agent or call.
  • metadata attaches structured data to the tripwire chunk so downstream consumers can branch on categories like pii, quality, or moderation.

API error handling
Direct link to API error handling

The processAPIError method handles LLM API rejections — errors where the API rejects the request (such as 400 or 422 status codes) rather than network or server failures. This lets you modify the request and retry when the API rejects the message format.

src/mastra/processors/api-error-handler.ts
import { APICallError } from '@ai-sdk/provider'
import type { Processor, ProcessAPIErrorArgs, ProcessAPIErrorResult } from '@mastra/core/processors'

export class ContextLengthHandler implements Processor {
id = 'context-length-handler'

processAPIError({
error,
messageList,
retryCount,
}: ProcessAPIErrorArgs): ProcessAPIErrorResult | void {
if (retryCount > 0) return

if (APICallError.isInstance(error) && error.message.includes('context length exceeded')) {
const messages = messageList.get.all.db()
if (messages.length > 4) {
messageList.removeByIds([messages[1]!.id, messages[2]!.id])
return { retry: true }
}
}
}
}

Mastra includes a built-in PrefillErrorHandler that automatically handles the Anthropic "assistant message prefill" error. This processor is auto-injected and requires no configuration.