ツールのストリーミング
Mastra のツール・ストリーミングでは、実行が完了するのを待たずに、実行中のツールから増分的な結果を送信できます。これにより、部分的な進捗や中間状態、段階的なデータを、ユーザーや上流のエージェント/ワークフローに直接提示できます。
ストリームへの書き込み方法は主に次の2通りです:
- ツール内から: すべてのツールは
writer
引数を受け取ります。これは書き込み可能なストリームで、実行の進行に応じて更新をプッシュできます。 - エージェントのストリームから: エージェントの
streamVNext
の出力をツールのwriter
に直接パイプでき、余分なグルーコードなしにエージェントの応答をツールの結果へと自然に連結できます。
書き込み可能なツール・ストリームとエージェントのストリーミングを組み合わせることで、中間結果がシステム内を通ってユーザー体験へと流れる過程をきめ細かく制御できます。
ツールを使うエージェント
エージェントのストリーミングはツール呼び出しと組み合わせられ、ツールの出力をエージェントのストリーミング応答に直接書き込めます。これにより、全体のやり取りの一部としてツールの動作を可視化できます。
import { openai } from "@ai-sdk/openai";
import { Agent } from "@mastra/core/agent";
import { testTool } from "../tools/test-tool";
export const testAgent = new Agent({
name: "test-agent",
instructions: "You are a weather agent.",
model: openai("gpt-4o-mini"),
tools: { testTool }
});
writer
引数の使用
writer
引数はツールの execute
関数に渡され、アクティブなストリームへカスタムイベント、データ、値などを書き出すために使えます。これにより、実行中でもツールが中間結果やステータス更新を提供できます。
writer.write(...)
の呼び出しは必ず await
してください。そうしないとストリームがロックされ、WritableStream is locked
エラーが発生します。
import { createTool } from "@mastra/core/tools";
export const testTool = createTool({
// ...
execute: async ({ context, writer }) => {
const { value } = context;
await writer?.write({
type: "custom-event",
status: "pending"
});
const response = await fetch(...);
await writer?.write({
type: "custom-event",
status: "success"
});
return {
value: ""
};
}
});
ストリームのペイロードを検査する
ストリームに書き込まれたイベントは、送出されるチャンクに含まれます。これらのチャンクを検査すると、イベントタイプ、中間値、ツール固有のデータなど、任意のカスタムフィールドにアクセスできます。
const stream = await testAgent.streamVNext([
"What is the weather in London?",
"Use the testTool"
]);
for await (const chunk of stream) {
if (chunk.payload.output?.type === "custom-event") {
console.log(JSON.stringify(chunk, null, 2));
}
}
エージェントを使うツール
エージェントの textStream
をツールの writer
にパイプします。これにより部分的な出力がストリーミングされ、Mastra はエージェントの利用状況をツール実行に自動的に集計します。
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
export const testTool = createTool({
// ...
execute: async ({ context, mastra, writer }) => {
const { city } = context;
const testAgent = mastra?.getAgent("testAgent");
const stream = await testAgent?.streamVNext(`What is the weather in ${city}?`);
await stream!.textStream.pipeTo(writer!);
return {
value: await stream!.text
};
}
});