Skip to Content
DocsWorkflows (vNext)Overviewnew

Getting Started

To use vNext workflows, first import the necessary functions from the vNext module:

import { createWorkflow, createStep } from "@mastra/core/workflows/vNext"; import { z } from "zod"; // For schema validation

Key Concepts

vNext workflows consist of:

  • Schemas: Type definitions for inputs and outputs using Zod
  • Steps: Individual units of work with defined inputs and outputs
  • Workflows: Orchestrations of steps with defined execution patterns. A workflow is also a step and can be used as such in other workflows.
  • Workflow execution flow: How steps are executed and connected to each other

Schemas are defined using Zod both for inputs and outputs of steps and workflows. Schemas can also dictate what data does a step take when resuming from a suspended state, as well as what contextual information should be passed when suspending a step’s execution.

The inputs and outputs of steps that are connected together should match: the inputSchema of a step should be the same as the outputSchema of the previous step, for instance. The same is true, when using workflows as steps in other workflows, the workflow’s inputSchema should match the outputSchema of the step it is used as.

Steps are run using an execute function that receives a context object with inputs from the previous step and/or resume data if the step is being resumed from a suspended state. The execute function should return a value that matches its outputSchema.

Primitives such as .then(), .parallel() and .branch() describe the execution flow of workflows, and how tthe steps within them are connected. Running worksflows (whether standalone or as a step), their execution is dictated by their execution flow instead of an execute function. The final result of a workflow will always be the result of its last step, which should match the workflow’s outputSchema.

Creating Workflows

Steps

Steps are the building blocks of workflows. Create a step using createStep:

const myStep = createStep({ id: "my-step", description: "Does something useful", inputSchema: z.object({ inputValue: z.string(), }), outputSchema: z.object({ outputValue: z.string(), }), resumeSchema: z.object({ resumeValue: z.string(), }), suspendSchema: z.object({ suspendValue: z.string(), }), execute: async ({ inputData, mastra, getStepResult, getInitData, runtimeContext, }) => { const otherStepOutput = getStepResult(step2); const initData = getInitData<typeof workflow>(); // typed as the workflow input schema return { outputValue: `Processed: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`, }; }, });

Each step requires:

  • id: Unique identifier for the step
  • inputSchema: Zod schema defining expected input
  • outputSchema: Zod schema defining output shape
  • resumeSchema: Optional. Zod schema defining resume input
  • suspendSchema: Optional. Zod schema defining suspend input
  • execute: Async function that performs the step’s work

The execute function receives a context object with:

  • inputData: The input data matching the inputSchema
  • resumeData: The resume data matching the resumeSchema, when resuming the step from a suspended state. Only exists if the step is being resumed.
  • mastra: Access to mastra services (agents, tools, etc.)
  • getStepResult: Function to access results from other steps
  • getInitData: Function to access the initial input data of the workflow in any step
  • suspend: Function to pause workflow execution (for user interaction)

Workflow Structure

Create a workflow using createWorkflow:

const myWorkflow = createWorkflow({ id: "my-workflow", inputSchema: z.object({ startValue: z.string(), }), outputSchema: z.object({ result: z.string(), }), steps: [step1, step2, step3], // Declare steps used in this workflow }); const mastra = new Mastra({ vnext_workflows: { myWorkflow, }, }); const run = mastra.vnext_getWorkflow("myWorkflow").createRun();

The steps property in the workflow options provides type safety for accessing step results. When you declare the steps used in your workflow, TypeScript will ensure type safety when accessing result.steps:

// With steps declared in workflow options const workflow = createWorkflow({ id: "my-workflow", inputSchema: z.object({}), outputSchema: z.object({}), steps: [step1, step2], // TypeScript knows these steps exist }); const result = await workflow.createRun().start({ inputData: {} }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error throw result.error; } else if (result.status === "suspended") { console.log(result.suspended); // only exists if status is suspended } // TypeScript knows these properties exist and their types console.log(result.steps.step1.output); // Fully typed console.log(result.steps.step2.output); // Fully typed

Workflow definition requires:

  • id: Unique identifier for the workflow
  • inputSchema: Zod schema defining workflow input
  • outputSchema: Zod schema defining workflow output
  • steps: Array of steps used in the workflow (optional, but recommended for type safety)

Re-using steps and nested workflows

You can re-use steps and nested workflows by cloning them:

const clonedStep = cloneStep(myStep, { id: "cloned-step" }); const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });

This way you can use the same step or nested workflow in the same workflow multiple times.

import { createWorkflow, createStep, cloneStep, cloneWorkflow, } from "@mastra/core/workflows/vNext"; const myWorkflow = createWorkflow({ id: "my-workflow", steps: [step1, step2, step3], }); myWorkflow.then(step1).then(step2).then(step3).commit(); const parentWorkflow = createWorkflow({ id: "parent-workflow", steps: [myWorkflow, step4], }); parentWorkflow .then(myWorkflow) .then(step4) .then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" })) .then(cloneStep(step4, { id: "cloned-step-4" })) .commit();

Running Workflows

After defining a workflow, run it with:

// Create a run instance const run = myWorkflow.createRun(); // Start the workflow with input data const result = await run.start({ inputData: { startValue: "initial data", }, }); // Access the results console.log(result.steps); // All step results console.log(result.steps["step-id"].output); // Output from a specific step if (result.status === "success") { console.log(result.result); // The final result of the workflow, result of the last step (or `.map()` output, if used as last step) } else if (result.status === "suspended") { const resumeResult = await run.resume({ step: result.suspended[0], // there is always at least one step id in the suspended array, in this case we resume the first suspended execution path resumeData: { /* user input */ }, }); } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error }

Workflow Execution Result Schema

The result of running a workflow (either from start() or resume()) follows this TypeScript interface:

export type WorkflowResult<...> = | { status: 'success'; result: z.infer<TOutput>; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; } | { status: 'failed'; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; error: Error; } | { status: 'suspended'; steps: { [K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined ? StepResult<unknown> : StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>; }; suspended: [string[], ...string[][]]; };

Result Properties Explained

  1. status: Indicates the final state of the workflow execution

    • 'success': Workflow completed successfully
    • 'failed': Workflow encountered an error
    • 'suspended': Workflow is paused waiting for user input
  2. result: Contains the final output of the workflow, typed according to the workflow’s outputSchema

  3. suspended: Optional array of step IDs that are currently suspended. Only present when status is 'suspended'

  4. steps: A record containing the results of all executed steps

    • Keys are step IDs
    • Values are StepResult objects containing the step’s output
    • Type-safe based on each step’s outputSchema
  5. error: Optional error object present when status is 'failed'

Watching Workflow Execution

You can also watch workflow execution:

const run = myWorkflow.createRun(); // Add a watcher to monitor execution run.watch(event => { console.log('Step completed:', event.payload.currentStep.id); }); // Start the workflow const result = await run.start({ inputData: {...} });

The event object has the following schema:

type WatchEvent = { type: "watch"; payload: { currentStep?: { id: string; status: "running" | "completed" | "failed" | "suspended"; output?: Record<string, any>; payload?: Record<string, any>; }; workflowState: { status: "running" | "success" | "failed" | "suspended"; steps: Record< string, { status: "running" | "completed" | "failed" | "suspended"; output?: Record<string, any>; payload?: Record<string, any>; } >; result?: Record<string, any>; error?: Record<string, any>; payload?: Record<string, any>; }; }; eventTimestamp: Date; };

The currentStep property is only present when the workflow is running. When the workflow is finished the status on workflowState is changed, as well as the result and error properties. At the same time the currentStep property is removed.