ダイナミックワークフロー
このガイドでは、ワークフローステップ内でダイナミックなワークフローを作成する方法を説明します。この高度なパターンを使用すると、実行時の条件に基づいてワークフローを動的に作成および実行できます。
概要
動的ワークフローは、実行時データに基づいてワークフローを作成する必要がある場合に役立ちます。
実装
動的ワークフローを作成するための鍵は、ステップの execute
関数内から Mastra インスタンスにアクセスし、それを使って新しいワークフローを作成・実行することです。
基本例
import { Mastra, Step, Workflow } from "@mastra/core";
import { z } from "zod";
const isMastra = (mastra: any): mastra is Mastra => {
return mastra && typeof mastra === "object" && mastra instanceof Mastra;
};
// Step that creates and runs a dynamic workflow
const createDynamicWorkflow = new Step({
id: "createDynamicWorkflow",
outputSchema: z.object({
dynamicWorkflowResult: z.any(),
}),
execute: async ({ context, mastra }) => {
if (!mastra) {
throw new Error("Mastra instance not available");
}
if (!isMastra(mastra)) {
throw new Error("Invalid Mastra instance");
}
const inputData = context.triggerData.inputData;
// Create a new dynamic workflow
const dynamicWorkflow = new Workflow({
name: "dynamic-workflow",
mastra, // Pass the mastra instance to the new workflow
triggerSchema: z.object({
dynamicInput: z.string(),
}),
});
// Define steps for the dynamic workflow
const dynamicStep = new Step({
id: "dynamicStep",
execute: async ({ context }) => {
const dynamicInput = context.triggerData.dynamicInput;
return {
processedValue: `Processed: ${dynamicInput}`,
};
},
});
// Build and commit the dynamic workflow
dynamicWorkflow.step(dynamicStep).commit();
// Create a run and execute the dynamic workflow
const run = dynamicWorkflow.createRun();
const result = await run.start({
triggerData: {
dynamicInput: inputData,
},
});
let dynamicWorkflowResult;
if (result.results["dynamicStep"]?.status === "success") {
dynamicWorkflowResult =
result.results["dynamicStep"]?.output.processedValue;
} else {
throw new Error("Dynamic workflow failed");
}
// Return the result from the dynamic workflow
return {
dynamicWorkflowResult,
};
},
});
// Main workflow that uses the dynamic workflow creator
const mainWorkflow = new Workflow({
name: "main-workflow",
triggerSchema: z.object({
inputData: z.string(),
}),
mastra: new Mastra(),
});
mainWorkflow.step(createDynamicWorkflow).commit();
// Register the workflow with Mastra
export const mastra = new Mastra({
workflows: { mainWorkflow },
});
const run = mainWorkflow.createRun();
const result = await run.start({
triggerData: {
inputData: "test",
},
});
上級例:ワークフローファクトリー
入力パラメータに基づいて異なるワークフローを生成するワークフローファクトリーを作成できます。
const isMastra = (mastra: any): mastra is Mastra => {
return mastra && typeof mastra === "object" && mastra instanceof Mastra;
};
const workflowFactory = new Step({
id: "workflowFactory",
inputSchema: z.object({
workflowType: z.enum(["simple", "complex"]),
inputData: z.string(),
}),
outputSchema: z.object({
result: z.any(),
}),
execute: async ({ context, mastra }) => {
if (!mastra) {
throw new Error("Mastra instance not available");
}
if (!isMastra(mastra)) {
throw new Error("Invalid Mastra instance");
}
// Create a new dynamic workflow based on the type
const dynamicWorkflow = new Workflow({
name: `dynamic-${context.workflowType}-workflow`,
mastra,
triggerSchema: z.object({
input: z.string(),
}),
});
if (context.workflowType === "simple") {
// Simple workflow with a single step
const simpleStep = new Step({
id: "simpleStep",
execute: async ({ context }) => {
return {
result: `Simple processing: ${context.triggerData.input}`,
};
},
});
dynamicWorkflow.step(simpleStep).commit();
} else {
// Complex workflow with multiple steps
const step1 = new Step({
id: "step1",
outputSchema: z.object({
intermediateResult: z.string(),
}),
execute: async ({ context }) => {
return {
intermediateResult: `First processing: ${context.triggerData.input}`,
};
},
});
const step2 = new Step({
id: "step2",
execute: async ({ context }) => {
const intermediate = context.getStepResult(step1).intermediateResult;
return {
finalResult: `Second processing: ${intermediate}`,
};
},
});
dynamicWorkflow.step(step1).then(step2).commit();
}
// Execute the dynamic workflow
const run = dynamicWorkflow.createRun();
const result = await run.start({
triggerData: {
input: context.inputData,
},
});
// Return the appropriate result based on workflow type
if (context.workflowType === "simple") {
return {
// @ts-ignore
result: result.results["simpleStep"]?.output,
};
} else {
return {
// @ts-ignore
result: result.results["step2"]?.output,
};
}
},
});
重要な考慮事項
-
Mastraインスタンス:
execute
関数のmastra
パラメータは、動的ワークフローを作成するために不可欠なMastraインスタンスへのアクセスを提供します。 -
エラー処理: 動的ワークフローを作成する前に、常にMastraインスタンスが利用可能かどうかを確認してください。
-
リソース管理: 動的ワークフローはリソースを消費するため、単一の実行で多くのワークフローを作成することには注意が必要です。
-
ワークフローのライフサイクル: 動的ワークフローは自動的にメインのMastraインスタンスに登録されません。明示的に登録しない限り、ステップ実行の期間中のみ存在します。
-
デバッグ: 動的ワークフローのデバッグは難しい場合があります。作成と実行を追跡するための詳細なログ記録を追加することを検討してください。
ユースケース
- 条件付きワークフロー選択: 入力データに基づいて異なるワークフローパターンを選択する
- パラメータ化されたワークフロー: 動的な設定でワークフローを作成する
- ワークフローテンプレート: テンプレートを使って特化したワークフローを生成する
- マルチテナントアプリケーション: 異なるテナントごとに分離されたワークフローを作成する
結論
動的ワークフローは、柔軟で適応性の高いワークフローシステムを構築するための強力な方法です。ステップ実行内でMastraインスタンスを活用することで、実行時の状況や要件に応じて反応するワークフローを作成できます。