ワークフロー概要
ワークフローを使用すると、データフローで接続された型付きステップとして複雑なタスクシーケンスを定義し、オーケストレーションできます。各ステップには、Zodスキーマによって検証される明確に定義された入力と出力があります。
ワークフローは実行順序、依存関係、分岐、並列処理、エラーハンドリングを管理し、堅牢で再利用可能なプロセスを構築できます。ステップはネストまたはクローンして、より大きなワークフローを構成できます。
ワークフローは以下の方法で作成します:
createStep
でステップを定義し、入力/出力スキーマとビジネスロジックを指定するcreateWorkflow
でステップを構成し、実行フローを定義する- ワークフローを実行して全体のシーケンスを実行し、中断、再開、ストリーミング結果の組み込みサポートを利用する
この構造により、完全な型安全性とランタイム検証が提供され、ワークフロー全体でデータの整合性が保証されます。
📹 視聴: → ワークフローの紹介とエージェントとの比較 YouTube(7分)
はじめに
ワークフローを使用するには、必要な依存関係をインストールします。
npm install @mastra/core
workflows
サブパスから必要な関数をインポートします。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
ステップを作成する
ステップはワークフローの基本構成要素です。createStep
を使ってステップを作成します。
const step1 = createStep({...});
詳細は createStep を参照してください。
ワークフローを作成する
createWorkflow
でワークフローを作成し、.commit()
で確定します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.commit();
詳細は workflow を参照してください。
ステップの合成
ワークフローのステップは .then()
を使って合成し、順次実行できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(step2)
.commit();
ステップはさまざまな方法で合成できます。詳しくは Control Flow を参照してください。
ステップのクローン
ワークフローのステップは cloneStep()
で複製し、任意のワークフロー用メソッドで利用できます。
import { createWorkflow, createStep, cloneStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const clonedStep = cloneStep(step1, { id: "cloned-step" });
const step2 = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
})
})
.then(step1)
.then(clonedStep)
.then(step2)
.commit();
ワークフローの登録
メインの Mastra インスタンスで workflows
を使ってワークフローを登録します:
import { Mastra } from "@mastra/core/mastra";
import { PinoLogger } from "@mastra/loggers";
import { LibSQLStore } from "@mastra/libsql";
import { testWorkflow } from "./workflows/test-workflow";
export const mastra = new Mastra({
workflows: { testWorkflow },
storage: new LibSQLStore({
// テレメトリや評価などをメモリストレージに保存します。永続化が必要な場合は file:../mastra.db に変更してください
url: ":memory:"
}),
logger: new PinoLogger({
name: "Mastra",
level: "info"
})
});
ワークフローをローカルでテストする
ワークフローを実行してテストする方法は2つあります。
Mastra Playground
Mastra Dev Server を起動している状態で、ブラウザで http://localhost:4111/workflows にアクセスすると、Mastra Playground からワークフローを実行できます。
詳細は Local Dev Playground のドキュメントをご覧ください。
コマンドライン
createRunAsync
と start
を使ってワークフロー実行インスタンスを作成します:
import "dotenv/config";
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(result);
if (result.status === 'success') {
console.log(result.result.output);
}
詳しくは createRunAsync および start を参照してください。
このワークフローを起動するには、次を実行します:
npx tsx src/test-workflow.ts
ワークフロー実行結果
start()
または resume()
でワークフローを実行すると、結果に応じて次のいずれかの形になります。
ステータス success
{
"status": "success",
"steps": {
// ...
"step-1": {
// ...
"status": "success",
}
},
"result": {
"output": "London + step-1"
}
}
- status: ワークフロー実行の最終状態を示します(
success
、suspended
、error
のいずれか) - steps: 入力と出力を含む、ワークフロー内の各ステップの一覧
- status: 各ステップの結果を示します
- result:
outputSchema
に従って型付けされたワークフローの最終出力
ステータス suspended
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
- suspended: 続行前に入力待ちとなっているステップを列挙する任意の配列
ステータス failed
{
"status": "failed",
"steps": {
// ...
"step-1": {
// ...
"status": "failed",
"error": "Test error",
}
},
"error": "Test error"
}
- error: ワークフローが失敗した場合のエラーメッセージを含む任意のフィールド
ストリームワークフロー
上で示した run メソッドと同様に、ワークフローもストリーミングできます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.stream({
inputData: {
city: "London"
}
});
for await (const chunk of result.stream) {
console.log(chunk);
}
ワークフローの監視
ワークフローは監視でき、発行される各イベントを確認できます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
run.watch((event) => {
console.log(event);
});
const result = await run.start({
inputData: {
city: "London"
}
});
詳細は watch を参照してください。
関連
- ガイドセクションのWorkflow Guideは、主要な概念を解説するチュートリアルです。
- Parallel Steps のワークフロー例
- Conditional Branching のワークフロー例
- Inngest のワークフロー例
- Suspend and Resume のワークフロー例
ワークフロー(レガシー)
レガシーワークフローのドキュメントについては、ワークフロー(レガシー)をご覧ください。