Workflows概要
Workflowsを使用すると、データフローで接続された型付きステップとして複雑なタスクシーケンスを定義し、オーケストレーションできます。各ステップには、Zodスキーマによって検証される明確に定義された入力と出力があります。
ワークフローは実行順序、依存関係、分岐、並列処理、エラーハンドリングを管理し、堅牢で再利用可能なプロセスを構築できるようにします。ステップはネストまたはクローンして、より大きなワークフローを構成できます。
ワークフローは以下の方法で作成します:
createStep
でステップを定義し、入力/出力スキーマとビジネスロジックを指定する。createWorkflow
でステップを構成し、実行フローを定義する。- ワークフローを実行してシーケンス全体を実行し、中断、再開、結果のストリーミングの組み込みサポートを利用する。
この構造により、完全な型安全性とランタイム検証が提供され、ワークフロー全体でデータの整合性が保証されます。
はじめに
ワークフローを使用するには、まずワークフローモジュールから必要な関数をインポートします:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
ステップの作成
ステップはワークフローの構成要素です。createStep
を使用してステップを作成します:
const cityCoordinatesStep = createStep({
id: "city-step",
description: "Gets coordinates for city",
inputSchema: z.object({
city: z.string()
}),
outputSchema: z.object({
city_name: z.string(),
city_latitude: z.number(),
city_longitude: z.number()
}),
execute: async ({ inputData }) => {
const { city } = inputData;
const geocodingResponse = await fetch(`https://geocoding-api.open-meteo.com/v1/search?name=${city}`);
const geocodingData = await geocodingResponse.json();
const { name, latitude, longitude } = geocodingData.results[0];
return {
city_name: name,
city_latitude: latitude,
city_longitude: longitude
};
}
});
詳細についてはcreateStepを参照してください。
ワークフローの作成
createWorkflow
を使用してワークフローを作成します。.commit()
を使用してワークフローを終了します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const cityCoordinatesStep = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
city: z.string()
}),
outputSchema: z.object({
outcome: z.string()
})
})
.then(cityCoordinatesStep)
.commit();
ステップの組み合わせ
ワークフローステップは.then()
を使用して組み合わせ、順次実行することができます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const cityCoordinatesStep = createStep({...});
const cityWeatherStep = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
city: z.string()
}),
outputSchema: z.object({
outcome: z.string()
})
})
.then(cityCoordinatesStep)
.then(cityWeatherStep)
.commit();
ステップは様々な方法で組み合わせることができます。詳細については制御フローを参照してください。
ステップのクローン
ワークフローステップはcloneStep()
を使用してクローンでき、任意のワークフローメソッドで使用できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const cityCoordinatesStep = createStep({...});
const clonedStep = cloneStep(cityCoordinatesStep, { id: "cloned-step" });
const cityWeatherStep = createStep({...});
export const testWorkflow = createWorkflow({
id: "test-workflow",
description: 'Test workflow',
inputSchema: z.object({
city: z.string()
}),
outputSchema: z.object({
outcome: z.string()
})
})
.then(cityCoordinatesStep)
.then(clonedStep)
.then(cityWeatherStep)
.commit();
ワークフローの登録
メインのMastraインスタンスでworkflows
を使用してワークフローを登録します:
import { Mastra } from "@mastra/core/mastra";
import { PinoLogger } from "@mastra/loggers";
import { LibSQLStore } from "@mastra/libsql";
import { testWorkflow } from "./workflows/test-workflow";
export const mastra = new Mastra({
workflows: { testWorkflow },
storage: new LibSQLStore({
// stores telemetry, evals, ... into memory storage, if it needs to persist, change to file:../mastra.db
url: ":memory:"
}),
logger: new PinoLogger({
name: "Mastra",
level: "info"
})
});
ワークフローの実行
ワークフローを実行してテストする方法は2つあります。
Mastra Playground
Mastra Dev Serverが実行されている状態で、ブラウザでhttp://localhost:4111/workflows にアクセスしてMastra Playgroundからワークフローを実行できます。
コマンドライン
createRun
とstart
を使用して、任意のMastraワークフローの実行インスタンスを作成します:
import { mastra } from "./mastra";
const run = mastra.getWorkflow("testWorkflow").createRun();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
このワークフローをトリガーするには、以下を実行します:
npx tsx src/test-workflow.ts
ワークフローのストリーミング
上記で示した実行方法と同様に、ワークフローはストリーミングすることもできます:
import { mastra } from "./mastra";
const run = mastra.getWorkflow("testWorkflow").createRun();
const result = await run.stream({
inputData: {
city: "London"
}
});
for await (const chunk of result.stream) {
console.log(chunk);
}
ワークフローの監視
ワークフローを監視することもでき、発行される各イベントを検査できます。
import { mastra } from "./mastra";
const run = mastra.getWorkflow("testWorkflow").createRun();
run.watch((event) => {
console.log(event);
});
const result = await run.start({
inputData: {
city: "London"
}
});
詳細についてはwatchを参照してください。
その他のリソース
- GuidesセクションのWorkflow Guideは、主要な概念をカバーするチュートリアルです。
- Parallel Stepsワークフローの例
- Conditional Branchingワークフローの例
- Inngestワークフローの例
- Suspend and Resumeワークフローの例