イベント駆動型ワークフロー
MastraはafterEvent
およびresumeWithEvent
メソッドを通じてイベント駆動型ワークフローの組み込みサポートを提供しています。これらのメソッドを使用すると、特定のイベントが発生するのを待機している間に実行を一時停止し、イベントデータが利用可能になったときにそのデータでワークフローを再開することができます。
概要
イベント駆動型ワークフローは以下のようなシナリオで役立ちます:
- 外部システムの処理完了を待つ必要がある場合
- 特定のポイントでユーザーの承認や入力が必要な場合
- 非同期操作の調整が必要な場合
- 長時間実行されるプロセスを異なるサービス間で実行を分割する必要がある場合
イベントの定義
イベント駆動型の手法を使用する前に、ワークフロー構成でワークフローがリッスンするイベントを定義する必要があります。
import { Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const workflow = new Workflow({
name: "approval-workflow",
triggerSchema: z.object({ requestId: z.string() }),
events: {
// Define events with their validation schemas
approvalReceived: {
schema: z.object({
approved: z.boolean(),
approverName: z.string(),
comment: z.string().optional(),
}),
},
documentUploaded: {
schema: z.object({
documentId: z.string(),
documentType: z.enum(["invoice", "receipt", "contract"]),
metadata: z.record(z.string()).optional(),
}),
},
},
});
各イベントには名前と、そのイベントが発生した際に期待されるデータの構造を定義するスキーマが必要です。
afterEvent()
afterEvent
メソッドは、ワークフロー内に特定のイベントを自動的に待機する中断ポイントを作成します。
構文
workflow.afterEvent(eventName: string): Workflow
パラメータ
eventName
: 待機するイベントの名前(ワークフローのevents
設定で定義されている必要があります)
戻り値
メソッドチェーン用のワークフローインスタンスを返します。
動作の仕組み
afterEvent
が呼び出されると、Mastraは以下を行います:
__eventName_event
というIDを持つ特別なステップを作成します- このステップを設定して、ワークフローの実行を自動的に一時停止します
- イベントが受信された後の継続ポイントを設定します
使用例
workflow
.step(initialProcessStep)
.afterEvent("approvalReceived") // ワークフローはここで一時停止します
.step(postApprovalStep) // これはイベントが受信された後に実行されます
.then(finalStep)
.commit();
resumeWithEvent()
resumeWithEvent
メソッドは、特定のイベントにデータを提供することで、一時停止されたワークフローを再開します。
構文
run.resumeWithEvent(eventName: string, data: any): Promise<WorkflowRunResult>
パラメータ
eventName
: トリガーされるイベントの名前data
: イベントデータ(このイベントに定義されたスキーマに準拠する必要があります)
戻り値
再開後のワークフロー実行結果に解決するPromiseを返します。
動作の仕組み
resumeWithEvent
が呼び出されると、Mastraは以下を実行します:
- イベントデータをそのイベントに定義されたスキーマに対して検証します
- ワークフローのスナップショットを読み込みます
- イベントデータでコンテキストを更新します
- イベントステップから実行を再開します
- 後続のステップでワークフロー実行を継続します
使用例
// ワークフロー実行を作成
const run = workflow.createRun();
// ワークフローを開始
await run.start({ triggerData: { requestId: "req-123" } });
// 後で、イベントが発生した時:
const result = await run.resumeWithEvent("approvalReceived", {
approved: true,
approverName: "John Doe",
comment: "Looks good to me!",
});
console.log(result.results);
イベントデータへのアクセス
ワークフローがイベントデータで再開されると、そのデータはステップコンテキスト内で context.inputData.resumedEvent
として利用可能になります:
const processApprovalStep = new Step({
id: "processApproval",
execute: async ({ context }) => {
// Access the event data
const eventData = context.inputData.resumedEvent;
return {
processingResult: `Processed approval from ${eventData.approverName}`,
wasApproved: eventData.approved,
};
},
});
複数のイベント
異なる時点で複数の異なるイベントを待機するワークフローを作成できます:
workflow
.step(createRequest)
.afterEvent("approvalReceived")
.step(processApproval)
.afterEvent("documentUploaded")
.step(processDocument)
.commit();
複数のイベント中断ポイントを持つワークフローを再開する場合、現在の中断ポイントに対して正しいイベント名とデータを提供する必要があります。
実践例
この例では、承認とドキュメントのアップロードの両方が必要な完全なワークフローを示します。
import { Workflow, Step } from "@mastra/core/workflows";
import { z } from "zod";
// Define steps
const createRequest = new Step({
id: "createRequest",
execute: async () => ({ requestId: `req-${Date.now()}` }),
});
const processApproval = new Step({
id: "processApproval",
execute: async ({ context }) => {
const approvalData = context.inputData.resumedEvent;
return {
approved: approvalData.approved,
approver: approvalData.approverName,
};
},
});
const processDocument = new Step({
id: "processDocument",
execute: async ({ context }) => {
const documentData = context.inputData.resumedEvent;
return {
documentId: documentData.documentId,
processed: true,
type: documentData.documentType,
};
},
});
const finalizeRequest = new Step({
id: "finalizeRequest",
execute: async ({ context }) => {
const requestId = context.steps.createRequest.output.requestId;
const approved = context.steps.processApproval.output.approved;
const documentId = context.steps.processDocument.output.documentId;
return {
finalized: true,
summary: `Request ${requestId} was ${approved ? "approved" : "rejected"} with document ${documentId}`,
};
},
});
// Create workflow
const requestWorkflow = new Workflow({
name: "document-request-workflow",
events: {
approvalReceived: {
schema: z.object({
approved: z.boolean(),
approverName: z.string(),
}),
},
documentUploaded: {
schema: z.object({
documentId: z.string(),
documentType: z.enum(["invoice", "receipt", "contract"]),
}),
},
},
});
// Build workflow
requestWorkflow
.step(createRequest)
.afterEvent("approvalReceived")
.step(processApproval)
.afterEvent("documentUploaded")
.step(processDocument)
.then(finalizeRequest)
.commit();
// Export workflow
export { requestWorkflow };
サンプルワークフローの実行
import { requestWorkflow } from "./workflows";
import { mastra } from "./mastra";
async function runWorkflow() {
// Get the workflow
const workflow = mastra.getWorkflow("document-request-workflow");
const run = workflow.createRun();
// Start the workflow
const initialResult = await run.start();
console.log("Workflow started:", initialResult.results);
// Simulate receiving approval
const afterApprovalResult = await run.resumeWithEvent("approvalReceived", {
approved: true,
approverName: "Jane Smith",
});
console.log("After approval:", afterApprovalResult.results);
// Simulate document upload
const finalResult = await run.resumeWithEvent("documentUploaded", {
documentId: "doc-456",
documentType: "invoice",
});
console.log("Final result:", finalResult.results);
}
runWorkflow().catch(console.error);
ベストプラクティス
- 明確なイベントスキーマを定義する: Zod を使ってイベントデータのバリデーション用に正確なスキーマを作成しましょう
- 分かりやすいイベント名を使用する: イベントの目的が明確に伝わる名前を選びましょう
- イベントの未発生を処理する: イベントが発生しない場合やタイムアウトする場合にもワークフローが対応できるようにしましょう
- モニタリングを含める:
watch
メソッドを使って、イベント待ちで一時停止しているワークフローを監視しましょう - タイムアウトを考慮する: 発生しない可能性のあるイベントに対してタイムアウト機構を実装しましょう
- イベントをドキュメント化する: 他の開発者のために、ワークフローが依存するイベントを明確にドキュメント化しましょう