Workflows概要
Workflowsを使用すると、データフローで接続された型付きステップとして複雑なタスクシーケンスを定義し、オーケストレーションできます。各ステップには、Zodスキーマによって検証される明確に定義された入力と出力があります。
ワークフローは実行順序、依存関係、分岐、並列処理、エラーハンドリングを管理し、堅牢で再利用可能なプロセスを構築できるようにします。ステップはネストまたはクローンして、より大きなワークフローを構成できます。
ワークフローは以下の方法で作成します:
createStep
でステップを定義し、入力/出力スキーマとビジネスロジックを指定する。createWorkflow
でステップを構成し、実行フローを定義する。- ワークフローを実行してシーケンス全体を実行し、中断、再開、結果のストリーミングの組み込みサポートを利用する。
この構造により、完全な型安全性とランタイム検証が提供され、ワークフロー全体でデータの整合性が保証されます。
はじめに
ワークフローを使用するには、まずワークフローモジュールから必要な関数をインポートします:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
ステップの作成
ステップはワークフローの構成要素です。createStep
を使用してステップを作成します:
const step1 = createStep({...});
詳細についてはcreateStepを参照してください。
ワークフローの作成
createWorkflow
を使用してワークフローを作成し、.commit()
で完了させます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.commit();
詳細についてはworkflowを参照してください。
ステップの組み合わせ
ワークフローのステップは.then()
を使用して組み合わせ、順次実行できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(step2)
.commit();
ステップは様々な方法で組み合わせることができます。詳細についてはControl Flowを参照してください。
ステップのクローン
ワークフローのステップはcloneStep()
を使用してクローンでき、任意のワークフローメソッドで使用できます。
import { createWorkflow, createStep, cloneStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const clonedStep = cloneStep(step1, { id: "cloned-step" });
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(clonedStep)
.then(step2)
.commit();
ワークフローの登録
メインのMastraインスタンスでworkflows
を使用してワークフローを登録します:
import { Mastra } from "@mastra/core/mastra";
import { PinoLogger } from "@mastra/loggers";
import { LibSQLStore } from "@mastra/libsql";
import { testWorkflow } from "./workflows/test-workflow";
export const mastra = new Mastra({
workflows: { testWorkflow },
storage: new LibSQLStore({
// stores telemetry, evals, ... into memory storage, if it needs to persist, change to file:../mastra.db
url: ":memory:"
}),
logger: new PinoLogger({
name: "Mastra",
level: "info"
})
});
ワークフローの実行
ワークフローを実行してテストする方法は2つあります。
Mastra Playground
Mastra Dev Serverが実行されている状態で、ブラウザでhttp://localhost:4111/workflows にアクセスしてMastra PlaygroundからワークフローをRUNできます。
コマンドライン
createRunAsync
とstart
を使用して、任意のMastraワークフローの実行インスタンスを作成します:
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
詳細についてはcreateRunAsyncとstartを参照してください。
このワークフローをトリガーするには、以下を実行します:
npx tsx src/test-workflow.ts
ワークフロー実行結果
start()
またはresume()
を使用してワークフローを実行した結果は、結果に応じて以下のいずれかのようになります。
ステータス success
{
"status": "success",
"steps": {
// ...
"step-1": {
// ...
"status": "success",
}
},
"result": {
"output": "London + step-1"
}
}
- status: ワークフロー実行の最終状態を示します。
success
、suspended
、またはerror
のいずれか - steps: ワークフロー内の各ステップを、入力と出力を含めてリストします
- status: 各個別ステップの結果を示します
- result:
outputSchema
に従って型付けされた、ワークフローの最終出力を含みます
ステータス suspended
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
- suspended: 継続前に入力を待機している任意のステップをリストするオプションの配列
ステータス failed
{
"status": "failed",
"steps": {
// ...
"step-1": {
// ...
"status": "failed",
"error": "Test error",
}
},
"error": "Test error"
}
- error: ワークフローが失敗した場合のエラーメッセージを含むオプションのフィールド
ワークフローのストリーミング
上記で示したrunメソッドと同様に、ワークフローはストリーミングすることもできます:
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.stream({
inputData: {
city: "London"
}
});
for await (const chunk of result.stream) {
console.log(chunk);
}
ワークフローの監視
ワークフローを監視することもでき、発行される各イベントを検査できます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
run.watch((event) => {
console.log(event);
});
const result = await run.start({
inputData: {
city: "London"
}
});
詳細についてはwatchを参照してください。
その他のリソース
- GuidesセクションのWorkflow Guideは、主要な概念をカバーするチュートリアルです。
- Parallel Stepsワークフローの例
- Conditional Branchingワークフローの例
- Inngestワークフローの例
- Suspend and Resumeワークフローの例