Skip to Content
リファレンスレガシーワークフローイベント駆動ワークフロー

イベント駆動型ワークフロー

Mastra は、afterEventresumeWithEvent メソッドを通じて、イベント駆動型ワークフローを標準でサポートしています。これらのメソッドを使用することで、特定のイベントが発生するのを待ってワークフローの実行を一時停止し、イベントデータが利用可能になった時点で再開するワークフローを作成できます。

概要

イベント駆動型ワークフローは、次のようなシナリオで役立ちます。

  • 外部システムの処理完了を待つ必要がある場合
  • 特定のタイミングでユーザーの承認や入力が必要な場合
  • 非同期処理を調整する必要がある場合
  • 長時間実行されるプロセスを複数のサービスに分割して実行する必要がある場合

イベントの定義

イベント駆動型の手法を使用する前に、ワークフロー構成でワークフローがリッスンするイベントを定義する必要があります。

import { LegacyWorkflow } from "@mastra/core/workflows/legacy"; import { z } from "zod"; const workflow = new LegacyWorkflow({ name: "approval-workflow", triggerSchema: z.object({ requestId: z.string() }), events: { // Define events with their validation schemas approvalReceived: { schema: z.object({ approved: z.boolean(), approverName: z.string(), comment: z.string().optional(), }), }, documentUploaded: { schema: z.object({ documentId: z.string(), documentType: z.enum(["invoice", "receipt", "contract"]), metadata: z.record(z.string()).optional(), }), }, }, });

各イベントには名前と、そのイベントが発生した際に期待されるデータの構造を定義するスキーマが必要です。

afterEvent()

afterEvent メソッドは、ワークフロー内に特定のイベントを自動的に待機するサスペンションポイントを作成します。

構文

workflow.afterEvent(eventName: string): LegacyWorkflow

パラメーター

  • eventName: 待機するイベントの名前(ワークフローの events 設定で定義されている必要があります)

戻り値

メソッドチェーンのためのワークフローインスタンスを返します。

動作概要

afterEvent が呼び出されると、Mastra は以下を行います:

  1. ID が __eventName_event の特別なステップを作成します
  2. このステップをワークフロー実行を自動的にサスペンドするように設定します
  3. イベント受信後の継続ポイントを設定します

使用例

workflow .step(initialProcessStep) .afterEvent("approvalReceived") // ここでワークフローがサスペンドされます .step(postApprovalStep) // この処理はイベント受信後に実行されます .then(finalStep) .commit();

resumeWithEvent()

resumeWithEvent メソッドは、特定のイベントに対するデータを提供することで、一時停止中のワークフローを再開します。

構文

run.resumeWithEvent(eventName: string, data: any): Promise<LegacyWorkflowRunResult>

パラメーター

  • eventName: トリガーされるイベントの名前
  • data: イベントデータ(このイベントのために定義されたスキーマに準拠している必要があります)

戻り値

再開後のワークフロー実行結果を解決する Promise を返します。

動作の仕組み

resumeWithEvent が呼び出されると、Mastra は以下を行います:

  1. イベントデータがそのイベントのために定義されたスキーマに合致しているか検証します
  2. ワークフローのスナップショットを読み込みます
  3. イベントデータでコンテキストを更新します
  4. イベントステップから実行を再開します
  5. その後のステップでワークフローの実行を継続します

使用例

// ワークフローランを作成 const run = workflow.createRun(); // ワークフローを開始 await run.start({ triggerData: { requestId: "req-123" } }); // 後で、イベントが発生したとき: const result = await run.resumeWithEvent("approvalReceived", { approved: true, approverName: "John Doe", comment: "Looks good to me!", }); console.log(result.results);

イベントデータへのアクセス

ワークフローがイベントデータで再開されると、そのデータはステップコンテキスト内の context.inputData.resumedEvent で利用できます。

const processApprovalStep = new LegacyStep({ id: "processApproval", execute: async ({ context }) => { // Access the event data const eventData = context.inputData.resumedEvent; return { processingResult: `Processed approval from ${eventData.approverName}`, wasApproved: eventData.approved, }; }, });

複数のイベント

さまざまなタイミングで複数の異なるイベントを待機するワークフローを作成できます。

workflow .step(createRequest) .afterEvent("approvalReceived") .step(processApproval) .afterEvent("documentUploaded") .step(processDocument) .commit();

複数のイベント停止ポイントがあるワークフローを再開する場合、現在の停止ポイントに対応する正しいイベント名とデータを指定する必要があります。

実践例

この例では、承認とドキュメントのアップロードの両方が必要な完全なワークフローを示します。

import { LegacyWorkflow, LegacyStep } from "@mastra/core/workflows/legacy"; import { z } from "zod"; // Define steps const createRequest = new LegacyStep({ id: "createRequest", execute: async () => ({ requestId: `req-${Date.now()}` }), }); const processApproval = new LegacyStep({ id: "processApproval", execute: async ({ context }) => { const approvalData = context.inputData.resumedEvent; return { approved: approvalData.approved, approver: approvalData.approverName, }; }, }); const processDocument = new LegacyStep({ id: "processDocument", execute: async ({ context }) => { const documentData = context.inputData.resumedEvent; return { documentId: documentData.documentId, processed: true, type: documentData.documentType, }; }, }); const finalizeRequest = new LegacyStep({ id: "finalizeRequest", execute: async ({ context }) => { const requestId = context.steps.createRequest.output.requestId; const approved = context.steps.processApproval.output.approved; const documentId = context.steps.processDocument.output.documentId; return { finalized: true, summary: `Request ${requestId} was ${approved ? "approved" : "rejected"} with document ${documentId}`, }; }, }); // Create workflow const requestWorkflow = new LegacyWorkflow({ name: "document-request-workflow", events: { approvalReceived: { schema: z.object({ approved: z.boolean(), approverName: z.string(), }), }, documentUploaded: { schema: z.object({ documentId: z.string(), documentType: z.enum(["invoice", "receipt", "contract"]), }), }, }, }); // Build workflow requestWorkflow .step(createRequest) .afterEvent("approvalReceived") .step(processApproval) .afterEvent("documentUploaded") .step(processDocument) .then(finalizeRequest) .commit(); // Export workflow export { requestWorkflow };

サンプルワークフローの実行

import { requestWorkflow } from "./workflows"; import { mastra } from "./mastra"; async function runWorkflow() { // Get the workflow const workflow = mastra.legacy_getWorkflow("document-request-workflow"); const run = workflow.createRun(); // Start the workflow const initialResult = await run.start(); console.log("Workflow started:", initialResult.results); // Simulate receiving approval const afterApprovalResult = await run.resumeWithEvent("approvalReceived", { approved: true, approverName: "Jane Smith", }); console.log("After approval:", afterApprovalResult.results); // Simulate document upload const finalResult = await run.resumeWithEvent("documentUploaded", { documentId: "doc-456", documentType: "invoice", }); console.log("Final result:", finalResult.results); } runWorkflow().catch(console.error);

ベストプラクティス

  1. 明確なイベントスキーマを定義する: Zod を使ってイベントデータのバリデーション用に正確なスキーマを作成しましょう
  2. 分かりやすいイベント名を使う: イベントの目的が明確に伝わる名前を選びましょう
  3. イベントの未発生を処理する: イベントが発生しない場合やタイムアウトする場合にもワークフローが対応できるようにしましょう
  4. モニタリングを含める: watch メソッドを使って、イベント待ちでサスペンドされているワークフローを監視しましょう
  5. タイムアウトを考慮する: 発生しない可能性のあるイベントに対してタイムアウト機構を実装しましょう
  6. イベントをドキュメント化する: 他の開発者のために、ワークフローが依存するイベントを明確にドキュメント化しましょう

関連