ワークフローのストリーミング
Mastra のワークフロー・ストリーミングでは、完了を待たずに実行中の増分結果を送信できます。これにより、部分的な進捗、中間状態、または段階的なデータを、ユーザーや上流のエージェント/ワークフローに直接提示できます。
ストリームへの書き込み方法は主に2つあります:
- ワークフローステップ内から: 各ワークフローステップは
writer
引数を受け取ります。これは、実行の進行に応じて更新をプッシュできる書き込み可能なストリームです。 - エージェントのストリームから: エージェントの
streamVNext
の出力をワークフローステップの writer に直接パイプすることもでき、余計なグルーコードなしでエージェントの応答をワークフローの結果へと容易に連結できます。
書き込み可能なワークフローストリームとエージェントのストリーミングを組み合わせることで、中間結果がシステム内をどのように流れ、ユーザー体験へ届くかをきめ細かく制御できます。
writer
引数の使用
writer
引数はワークフローステップの execute
関数に渡され、アクティブなストリームへカスタムイベント、データ、値を送出するのに使用できます。これにより、実行中でもワークフローステップが中間結果やステータス更新を提供できます。
writer.write(...)
の呼び出しは必ず await
してください。そうしないとストリームがロックされ、WritableStream is locked
エラーが発生します。
import { createStep } from "@mastra/core/workflows";
export const testStep = createStep({
// ...
execute: async ({ inputData, writer }) => {
const { value } = inputData;
await writer?.write({
type: "custom-event",
status: "pending"
});
const response = await fetch(...);
await writer?.write({
type: "custom-event",
status: "success"
});
return {
value: ""
};
},
});
ワークフローストリームのペイロードを検査する
ストリームに書き込まれたイベントは、出力されるチャンクに含まれます。これらのチャンクを検査して、イベントタイプ、中間値、ステップ固有のデータなどの任意のカスタムフィールドにアクセスできます。
const testWorkflow = mastra.getWorkflow("testWorkflow");
const run = await testWorkflow.createRunAsync();
const stream = await run.streamVNext({
inputData: {
value: "initial data"
}
});
for await (const chunk of stream) {
console.log(chunk);
}
エージェントを使ったワークフロー
エージェントの textStream
をワークフローステップの writer
にパイプします。これにより部分的な出力がストリーミングされ、Mastra はエージェントの使用状況をワークフロー実行に自動的に集約します。
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const testStep = createStep({
// ...
execute: async ({ inputData, mastra, writer }) => {
const { city } = inputData
const testAgent = mastra?.getAgent("testAgent");
const stream = await testAgent?.streamVNext(`What is the weather in ${city}$?`);
await stream!.textStream.pipeTo(writer!);
return {
value: await stream!.text,
};
},
});