ワークフローの概要
Workflows は、データフローでつながる型付きステップとして、複雑なタスクの一連処理を定義・オーケストレーションできます。各ステップには、Zod スキーマで検証される明確に定義された入力と出力があります。
ワークフローは、実行順序、依存関係、分岐、並列実行、エラー処理を管理し、堅牢で再利用可能なプロセスの構築を可能にします。ステップはネストやクローンにより、より大規模なワークフローを組み立てられます。
ワークフローは次の手順で作成します:
createStep
でステップを定義し、入出力スキーマとビジネスロジックを指定します。createWorkflow
でステップを組み合わせて、実行フローを定義します。- ワークフローを実行して全体のシーケンスを動かします。サスペンド、レジューム、結果のストリーミングを標準でサポートします。
この構成により、完全な型安全性とランタイム検証が担保され、ワークフロー全体でデータの整合性が保証されます。
📹 視聴: → ワークフローの紹介とエージェントとの比較 YouTube(7 分)
はじめに
ワークフローを使用するには、必要な依存関係をインストールします:
npm install @mastra/core
workflows
サブパスから必要な関数をインポートします:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
ステップを作成する
ステップはワークフローの基本要素です。createStep
を使ってステップを作成します:
const step1 = createStep({...});
詳細は createStep を参照してください。
ワークフローの作成
createWorkflow
を使ってワークフローを作成し、.commit()
で確定します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.commit();
詳細は workflow を参照してください。
ステップの構成
ワークフローのステップは、.then()
を使って構成し、順次実行できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(step2)
.commit();
ステップは複数の方法で構成できます。詳しくは Control Flow を参照してください。
ステップのクローン
ワークフローのステップは cloneStep()
を使ってクローンでき、任意のワークフローのメソッドで使用できます。
import { createWorkflow, createStep, cloneStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const clonedStep = cloneStep(step1, { id: "cloned-step" });
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(clonedStep)
.then(step2)
.commit();
ワークフローの登録
メインの Mastra インスタンスで workflows
を使ってワークフローを登録します:
import { Mastra } from "@mastra/core/mastra";
import { PinoLogger } from "@mastra/loggers";
import { LibSQLStore } from "@mastra/libsql";
import { testWorkflow } from "./workflows/test-workflow";
export const mastra = new Mastra({
workflows: { testWorkflow },
storage: new LibSQLStore({
// テレメトリや評価などをメモリストレージに保存します。永続化が必要な場合は file:../mastra.db に変更してください
url: ":memory:"
}),
logger: new PinoLogger({
name: "Mastra",
level: "info"
})
});
ローカルでのワークフローのテスト
ワークフローを実行してテストする方法は2つあります。
Mastra Playground
Mastra Dev Server を起動した状態で、ブラウザで http://localhost:4111/workflows にアクセスすると、Mastra Playground からワークフローを実行できます。
詳細は Local Dev Playground のドキュメントを参照してください。
コマンドライン
createRunAsync
と start
を使用してワークフローの実行インスタンスを作成します:
import "dotenv/config";
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(result);
if (result.status === 'success') {
console.log(result.result.output);
}
詳細は createRunAsync および start を参照してください。
このワークフローを実行するには、次を実行します:
npx tsx src/test-workflow.ts
ワークフローの実行結果
start()
または resume()
を使ってワークフローを実行した場合の結果は、状況に応じて次のいずれかになります。
ステータス: success
{
"status": "success",
"steps": {
// ...
"step-1": {
// ...
"status": "success",
}
},
"result": {
"output": "London + step-1"
}
}
- status: ワークフロー実行の最終状態を示します。
success
、suspended
、error
のいずれか - steps: 入力と出力を含めて、ワークフロー内の各ステップを一覧します
- status: 各ステップの結果を示します
- result:
outputSchema
に従って型付けされたワークフローの最終出力を含みます
ステータス: suspended
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
- suspended: 続行する前に入力待ちのステップを列挙する任意の配列
ステータス failed
{
"status": "failed",
"steps": {
// ...
"step-1": {
// ...
"status": "failed",
"error": "Test error",
}
},
"error": "Test error"
}
- error: ワークフローが失敗した場合にエラーメッセージを含む省略可能なフィールド
ストリームワークフロー
上で示した run メソッドと同様に、ワークフローもストリーミングできます:
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.stream({
inputData: {
city: "London"
}
});
for await (const chunk of result.stream) {
console.log(chunk);
}
詳細は stream を参照してください。
ワークフローの監視
ワークフローは監視でき、発行される各イベントを確認できます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
run.watch((event) => {
console.log(event);
});
const result = await run.start({
inputData: {
city: "London"
}
});
詳細は watch を参照してください。
関連
- ガイドセクションの Workflow Guide は、主要な概念を扱うチュートリアルです。
- Parallel Steps ワークフローの例
- Conditional Branching ワークフローの例
- Inngest ワークフローの例
- Suspend and Resume ワークフローの例
ワークフロー(レガシー)
レガシー版ワークフローのドキュメントについては、Workflows (Legacy) を参照してください。