Skip to Content
ドキュメントWorkflows Vnext複雑なLLM操作の取り扱い | ワークフロー(vNext) | Mastra

はじめに

vNextワークフローを使用するには、まずvNextモジュールから必要な関数をインポートします:

import { createWorkflow, createStep } from "@mastra/core/workflows/vNext"; import { z } from "zod"; // For schema validation

主要な概念

vNextワークフローは以下で構成されています:

  • スキーマ:Zodを使用した入力と出力の型定義
  • ステップ:定義された入力と出力を持つ個々の作業単位
  • ワークフロー:定義された実行パターンを持つステップのオーケストレーション。ワークフローもステップであり、他のワークフローでステップとして使用できます。
  • ワークフロー実行フロー:ステップがどのように実行され、相互に接続されるか

スキーマはZodを使用して、ステップとワークフローの入力と出力の両方に対して定義されます。スキーマはまた、ステップが一時停止状態から再開する際に取得するデータや、ステップの実行を一時停止する際に渡すべき文脈情報を指定することもできます。

接続されているステップの入力と出力は一致する必要があります:例えば、あるステップのinputSchemaは前のステップのoutputSchemaと同じであるべきです。同様に、ワークフローを他のワークフローでステップとして使用する場合、ワークフローのinputSchemaは、それが使用されるステップのoutputSchemaと一致する必要があります。

ステップは、前のステップからの入力や、ステップが一時停止状態から再開される場合は再開データを含むコンテキストオブジェクトを受け取るexecute関数を使用して実行されます。execute関数はそのoutputSchemaに一致する値を返す必要があります。

.then().parallel().branch()などのプリミティブは、ワークフローの実行フローと、その中のステップがどのように接続されるかを記述します。ワークフロー(単独でもステップとしても)を実行する場合、その実行はexecute関数ではなく、実行フローによって決定されます。ワークフローの最終結果は常に最後のステップの結果となり、これはワークフローのoutputSchemaと一致する必要があります。

ワークフローの作成

ステップ

ステップはワークフローの構成要素です。createStepを使用してステップを作成します:

const myStep = createStep({ id: "my-step", description: "Does something useful", inputSchema: z.object({ inputValue: z.string(), }), outputSchema: z.object({ outputValue: z.string(), }), resumeSchema: z.object({ resumeValue: z.string(), }), suspendSchema: z.object({ suspendValue: z.string(), }), execute: async ({ inputData, mastra, getStepResult, getInitData, runtimeContext, }) => { const otherStepOutput = getStepResult(step2); const initData = getInitData<typeof workflow>(); // typed as the workflow input schema return { outputValue: `Processed: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`, }; }, });

各ステップには以下が必要です:

  • id: ステップの一意の識別子
  • inputSchema: 予想される入力を定義するZodスキーマ
  • outputSchema: 出力の形を定義するZodスキーマ
  • resumeSchema: オプション。再開入力を定義するZodスキーマ
  • suspendSchema: オプション。一時停止入力を定義するZodスキーマ
  • execute: ステップの作業を実行する非同期関数

execute関数は以下を含むコンテキストオブジェクトを受け取ります:

  • inputData: inputSchemaに一致する入力データ
  • resumeData: 一時停止状態からステップを再開する際に、resumeSchemaに一致する再開データ。ステップが再開される場合にのみ存在します。
  • mastra: mastraサービス(エージェント、ツールなど)へのアクセス
  • getStepResult: 他のステップの結果にアクセスするための関数
  • getInitData: どのステップでもワークフローの初期入力データにアクセスするための関数
  • suspend: ワークフロー実行を一時停止するための関数(ユーザーとのインタラクション用)

ワークフロー構造

createWorkflowを使用してワークフローを作成します:

const myWorkflow = createWorkflow({ id: "my-workflow", inputSchema: z.object({ startValue: z.string(), }), outputSchema: z.object({ result: z.string(), }), steps: [step1, step2, step3], // Declare steps used in this workflow }) .then(step1) .then(step2) .then(step3) .commit(); const mastra = new Mastra({ vnext_workflows: { myWorkflow, }, }); const run = mastra.vnext_getWorkflow("myWorkflow").createRun();

ワークフローオプションのstepsプロパティは、ステップ結果へのアクセスに対する型安全性を提供します。ワークフローで使用するステップを宣言すると、TypeScriptはresult.stepsへのアクセス時に型安全性を確保します:

// With steps declared in workflow options const workflow = createWorkflow({ id: "my-workflow", inputSchema: z.object({}), outputSchema: z.object({}), steps: [step1, step2], // TypeScript knows these steps exist }) .then(step1) .then(step2) .commit(); const result = await workflow.createRun().start({ inputData: {} }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error throw result.error; } else if (result.status === "suspended") { console.log(result.suspended); // only exists if status is suspended } // TypeScript knows these properties exist and their types console.log(result.steps.step1.output); // Fully typed console.log(result.steps.step2.output); // Fully typed

ワークフロー定義には以下が必要です:

  • id: ワークフローの一意の識別子
  • inputSchema: ワークフロー入力を定義するZodスキーマ
  • outputSchema: ワークフロー出力を定義するZodスキーマ
  • steps: ワークフローで使用されるステップの配列(オプションですが、型安全性のために推奨)

ステップとネストされたワークフローの再利用

ステップとネストされたワークフローをクローンして再利用できます:

const clonedStep = cloneStep(myStep, { id: "cloned-step" }); const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });

このようにして、同じステップやネストされたワークフローを同じワークフロー内で複数回使用できます。

import { createWorkflow, createStep, cloneStep, cloneWorkflow, } from "@mastra/core/workflows/vNext"; const myWorkflow = createWorkflow({ id: "my-workflow", steps: [step1, step2, step3], }); myWorkflow.then(step1).then(step2).then(step3).commit(); const parentWorkflow = createWorkflow({ id: "parent-workflow", steps: [myWorkflow, step4], }); parentWorkflow .then(myWorkflow) .then(step4) .then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" })) .then(cloneStep(step4, { id: "cloned-step-4" })) .commit();

ワークフローの実行

ワークフローを定義した後、以下のように実行します:

// 実行インスタンスを作成 const run = myWorkflow.createRun(); // 入力データでワークフローを開始 const result = await run.start({ inputData: { startValue: "initial data", }, }); // 結果にアクセス console.log(result.steps); // すべてのステップの結果 console.log(result.steps["step-id"].output); // 特定のステップからの出力 if (result.status === "success") { console.log(result.result); // ワークフローの最終結果、最後のステップの結果(または最後のステップとして`.map()`が使用された場合はその出力) } else if (result.status === "suspended") { const resumeResult = await run.resume({ step: result.suspended[0], // 一時停止された実行パスには常に少なくとも1つのステップIDがあります。この場合、最初の一時停止された実行パスを再開します resumeData: { /* ユーザー入力 */ }, }); } else if (result.status === "failed") { console.error(result.error); // ステータスが失敗の場合にのみ存在し、これはErrorのインスタンスです }

ワークフロー実行結果のスキーマ

ワークフローの実行結果(start()またはresume()からの)は、次のTypeScriptインターフェースに従います:

export type WorkflowResult<...> = | { status: 'success'; result: z.infer<TOutput>; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; } | { status: 'failed'; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; error: Error; } | { status: 'suspended'; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; suspended: [string[], ...string[][]]; };

結果プロパティの説明

  1. status: ワークフロー実行の最終状態を示します

    • 'success': ワークフローが正常に完了
    • 'failed': ワークフローでエラーが発生
    • 'suspended': ワークフローがユーザー入力を待機して一時停止中
  2. result: ワークフローの最終出力を含み、ワークフローのoutputSchemaに従って型付けされます

  3. suspended: 現在一時停止中のステップIDの配列(オプション)。status'suspended'の場合のみ存在します

  4. steps: 実行されたすべてのステップの結果を含むレコード

    • キーはステップID
    • 値はステップの出力を含むStepResultオブジェクト
    • 各ステップのoutputSchemaに基づいて型安全
  5. error: status'failed'の場合に存在するエラーオブジェクト(オプション)

ワークフロー実行の監視

ワークフロー実行を監視することもできます:

const run = myWorkflow.createRun(); // 実行を監視するウォッチャーを追加 run.watch(event => { console.log('ステップ完了:', event.payload.currentStep.id); }); // ワークフローを開始 const result = await run.start({ inputData: {...} });

eventオブジェクトは以下のスキーマを持っています:

type WatchEvent = { type: "watch"; payload: { currentStep?: { id: string; status: "running" | "completed" | "failed" | "suspended"; output?: Record<string, any>; payload?: Record<string, any>; }; workflowState: { status: "running" | "success" | "failed" | "suspended"; steps: Record< string, { status: "running" | "completed" | "failed" | "suspended"; output?: Record<string, any>; payload?: Record<string, any>; } >; result?: Record<string, any>; error?: Record<string, any>; payload?: Record<string, any>; }; }; eventTimestamp: Date; };

currentStepプロパティはワークフローの実行中にのみ存在します。ワークフローが終了すると、workflowStateのステータスが変更され、resultおよびerrorプロパティも更新されます。同時にcurrentStepプロパティは削除されます。