出力プロセッサ
出力プロセッサを使用すると、言語モデルが生成したAIの応答を、ユーザーに返す前の段階で傍受・変更・検証・フィルタリングできます。これは、応答の検証、コンテンツモデレーション、応答の変換、AI生成コンテンツに対するセーフティ制御の実装に有用です。
プロセッサは、会話スレッド内のAI応答メッセージに対して動作します。コンテンツの変更、フィルタリング、検証ができ、条件に合致した場合は応答を丸ごと中止することも可能です。
組み込みプロセッサ
Mastra には一般的なユースケース向けに、いくつかの組み込み出力プロセッサが用意されています。
ModerationProcessor
このプロセッサは、LLM を用いて複数のカテゴリにわたる不適切コンテンツを検出し、モデレーションを行います。
import { ModerationProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new ModerationProcessor({
model: openai("gpt-4.1-nano"), // 高速かつコスト効率の高いモデルを使用
threshold: 0.7, // フラグ付けの信頼度しきい値
strategy: 'block', // フラグ付けされたコンテンツをブロック
categories: ['hate', 'harassment', 'violence'], // カスタムカテゴリ
}),
],
});
利用可能なオプション:
model
: モデレーション分析用の言語モデル(必須)categories
: チェックするカテゴリの配列(デフォルト: [‘hate’,‘hate/threatening’,‘harassment’,‘harassment/threatening’,‘self-harm’,‘self-harm/intent’,‘self-harm/instructions’,‘sexual’,‘sexual/minors’,‘violence’,‘violence/graphic’])threshold
: フラグ付けの信頼度しきい値(0〜1、デフォルト: 0.5)strategy
: コンテンツがフラグ付けされた場合のアクション(デフォルト: ‘block’)customInstructions
: モデレーションエージェント向けのカスタム指示
利用可能な戦略:
block
: エラーとしてレスポンスを拒否(デフォルト)warn
: 警告を記録するがコンテンツは通すfilter
: フラグ付けされたメッセージを削除して処理を継続
PIIDetector
このプロセッサは、AI のレスポンスから個人を特定できる情報(PII)を検出し、必要に応じてマスキング・置換を行います。
import { PIIDetector } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new PIIDetector({
model: openai("gpt-4.1-nano"),
threshold: 0.6,
strategy: 'redact', // 検出された PII を自動的にマスキング
detectionTypes: ['email', 'phone', 'credit-card', 'ssn', 'api-key', 'crypto-wallet', 'iban'],
redactionMethod: 'mask', // 形式を保ちながらマスキング
preserveFormat: true, // マスキング後も元の構造を維持
includeDetections: true, // コンプライアンス監査用に詳細を記録
}),
],
});
利用可能なオプション:
model
: PII 検出用の言語モデル(必須)detectionTypes
: 検出する PII タイプの配列(デフォルト: [‘email’, ‘phone’, ‘credit-card’, ‘ssn’, ‘api-key’, ‘ip-address’, ‘name’, ‘address’, ‘date-of-birth’, ‘url’, ‘uuid’, ‘crypto-wallet’, ‘iban’])threshold
: フラグ付けの信頼度しきい値(0〜1、デフォルト: 0.6)strategy
: PII が検出された場合のアクション(デフォルト: ‘block’)redactionMethod
: PII の編集方法(‘mask’, ‘hash’, ‘remove’, ‘placeholder’、デフォルト: ‘mask’)preserveFormat
: 編集時に PII の構造を維持(デフォルト: true)includeDetections
: コンプライアンス目的でログに検出詳細を含める(デフォルト: false)instructions
: エージェント向けのカスタム検出指示
利用可能な戦略:
block
: PII を含むレスポンスを拒否(デフォルト)warn
: 警告を記録するが通すfilter
: PII を含むメッセージを削除redact
: PII をプレースホルダー値に置き換える
BatchPartsProcessor
このプロセッサは複数のストリームパーツをまとめてバッチ化し、送出頻度を下げます。ネットワーク負荷の軽減やユーザー体験の向上に有用です。
import { BatchPartsProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new BatchPartsProcessor({
maxBatchSize: 5, // 一度にまとめる最大パーツ数
maxWaitTime: 100, // 送出前に待機する最大時間(ms)
emitOnNonText: true, // 非テキストパーツは即時送出
}),
],
});
利用可能なオプション:
maxBatchSize
: まとめるパーツ数の上限(デフォルト: 3)maxWaitTime
: バッチ送出前の最大待機時間(ms、デフォルト: 50)emitOnNonText
: 非テキストパーツ受信時に即時送出するか(デフォルト: true)
TokenLimiterProcessor
このプロセッサは AI のレスポンスのトークン数を制限し、上限超過時に切り詰めるか中断します。
import { TokenLimiterProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new TokenLimiterProcessor({
maxTokens: 1000, // 許可されるトークン数の上限
strategy: 'truncate', // 上限超過時に切り詰める
includePromptTokens: false, // レスポンス側のトークンのみをカウント
}),
],
});
利用可能なオプション:
maxTokens
: 許可されるトークン数の上限(必須)strategy
: トークン上限超過時の動作(‘truncate’ | ‘abort’、デフォルト: ‘truncate’)includePromptTokens
: カウントにプロンプト側のトークンを含めるか(デフォルト: false)
SystemPromptScrubber
このプロセッサは、セキュリティ上の脆弱性につながり得るシステムプロンプトやその他の機密情報を検出してマスキングします。
import { SystemPromptScrubber } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new SystemPromptScrubber({
model: openai("gpt-4o-mini"),
threshold: 0.7, // 検出の信頼度しきい値
strategy: 'redact', // 検出したシステムプロンプトをマスキング
instructions: 'システムプロンプト、指示、または機密性の高い情報を検出する',
}),
],
});
利用可能なオプション:
model
: 検出に使用する言語モデル(必須)threshold
: 検出の信頼度しきい値(0〜1、デフォルト: 0.6)strategy
: システムプロンプト検出時の動作(‘block’ | ‘warn’ | ‘redact’、デフォルト: ‘redact’)instructions
: エージェント向けのカスタム検出指示
複数のプロセッサの適用
複数の出力プロセッサを連結できます。これらは outputProcessors
配列に記載された順に順次実行されます。あるプロセッサの出力が次のプロセッサの入力になります。
順序が重要です! 一般的には、テキスト正規化を最初に、セキュリティチェックを次に、コンテンツの変更を最後に配置するのがベストプラクティスです。
import { Agent } from "@mastra/core/agent";
import {
ModerationProcessor,
PIIDetector
} from "@mastra/core/processors";
const secureAgent = new Agent({
outputProcessors: [
// 1. セキュリティ上の脅威をチェック
new ModerationProcessor({ model: openai("gpt-4.1-nano") }),
// 2. PII を処理
new PIIDetector({ model: openai("gpt-4.1-nano"), strategy: 'redact' }),
],
});
カスタム出力プロセッサの作成
Processor
インターフェースを実装することで、カスタム出力プロセッサを作成できます。Processor
は、processOutputStream
(ストリーミング用)または processOutputResult
(最終結果用)、もしくはその両方を実装していれば、出力処理に使用できます。
ストリーミング出力プロセッサ
import type { Processor, MastraMessageV2 } from "@mastra/core/processors";
import type { ChunkType } from "@mastra/core/stream";
class ResponseLengthLimiter implements Processor {
readonly name = 'response-length-limiter';
constructor(private maxLength: number = 1000) {}
async processOutputStream({ part, streamParts, state, abort }: {
part: ChunkType;
streamParts: ChunkType[];
state: Record<string, any>;
abort: (reason?: string) => never;
}): Promise<ChunkType | null | undefined> {
// 累積長を state で追跡。各プロセッサは自身の state を持つ
if (!state.cumulativeLength) {
state.cumulativeLength = 0;
}
if (part.type === 'text-delta') {
state.cumulativeLength += part.payload.text.length;
if (state.cumulativeLength > this.maxLength) {
abort(`Response too long: ${state.cumulativeLength} characters (max: ${this.maxLength})`);
}
}
return part; // このパートを出力
}
}
最終結果プロセッサ
import type { Processor, MastraMessageV2 } from "@mastra/core/processors";
class ResponseValidator implements Processor {
readonly name = 'response-validator';
constructor(private requiredKeywords: string[] = []) {}
processOutputResult({ messages, abort }: {
messages: MastraMessageV2[];
abort: (reason?: string) => never
}): MastraMessageV2[] {
const responseText = messages
.map(msg => msg.content.parts
.filter(part => part.type === 'text')
.map(part => (part as any).text)
.join('')
)
.join('');
// 必須キーワードをチェック
for (const keyword of this.requiredKeywords) {
if (!responseText.toLowerCase().includes(keyword.toLowerCase())) {
abort(`Response missing required keyword: ${keyword}`);
}
}
return messages;
}
}
カスタム出力プロセッサを作成する際のポイント:
- 処理後のデータ(parts または messages)を必ず返す
abort(reason)
を使用して処理を早期終了する。abort はレスポンスのブロックを模擬するために用いられ、abort
によって投げられるエラーは TripWire のインスタンスとなる- ストリーミングプロセッサでは、パートの出力をスキップするには
null
またはundefined
を返す - プロセッサは単一責務に絞る
- プロセッサ内でエージェントを使用する場合は、高速なモデルを使い、可能な限りレスポンスサイズを抑え、システムプロンプトはできるだけ簡潔にする
エージェントメソッドとの統合
出力プロセッサは generate()
と streamVNext()
の両方で機能します。プロセッサのパイプラインは、エージェントが応答を生成した後、ユーザーに返される前に完了します。
// generate() の後、結果を返す前にプロセッサが実行される
const result = await agent.generate('Hello');
console.log(result.text); // 処理済みテキスト
console.log(result.object); // 該当する場合の構造化データ
// streamVNext() では各パートごとにプロセッサが実行される
const stream = await agent.streamVNext('Hello');
for await (const part of stream) {
console.log(part); // 処理済みパート
}
呼び出しごとの上書き
出力プロセッサは呼び出し単位で上書きできます:
// この呼び出しに限り出力プロセッサを上書き
const result = await agent.generate('Hello', {
outputProcessors: [
new ModerationProcessor({ model: openai("gpt-4.1-nano") }),
],
});
// ストリーミングでも同様
const stream = await agent.streamVNext('Hello', {
outputProcessors: [
new TokenLimiterProcessor({ maxTokens: 500 }),
],
});
Structured Output Processor
StructuredOutputProcessor を使うには、structuredOutput
オプションを指定します:
import { z } from "zod";
const result = await agent.generate('Analyze this text', {
structuredOutput: {
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number(),
}),
model: openai("gpt-4o-mini"),
errorStrategy: 'warn',
},
});
console.log(result.text); // 元のテキスト
console.log(result.object); // 型付き構造化データ: { sentiment: 'positive', confidence: 0.8 }
いずれかのプロセッサが abort()
を呼び出すと、リクエストは即時に終了し、以降のプロセッサは実行されません。エージェントは、ブロック理由の詳細(result.tripwireReason
)とともに 200 応答を返します。
入力プロセッサと出力プロセッサ
- 入力プロセッサ: ユーザーのメッセージが言語モデルに届く前に処理する
- 出力プロセッサ: 生成後、ユーザーに返される前の LLM の応答を処理する
入力プロセッサはユーザー入力の検証とセキュリティ対策に、出力プロセッサは LLM 生成コンテンツの応答検証と安全管理に利用します。
詳細は、Input Processors のドキュメントをご覧ください。ユーザーメッセージの処理について解説しています。