Getting Started
To use vNext workflows, first import the necessary functions from the vNext module:
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod"; // For schema validation
Key Concepts
vNext workflows consist of:
- Schemas: Type definitions for inputs and outputs using Zod
- Steps: Individual units of work with defined inputs and outputs
- Workflow: A structured sequence of steps designed to accomplish a specific task or process. Workflows define the order, dependencies, and logic for executing steps, and can themselves be reused as steps within other workflows.
- Workflow execution flow: The path and logic followed during a workflow run, including how steps are executed, how data moves between them, and how conditions, branches, and errors are handled.
Schemas are defined using Zod both for inputs and outputs of steps and workflows. Schemas can also dictate what data does a step take when resuming from a suspended state, as well as what contextual information should be passed when suspending a step’s execution.
The inputs and outputs of steps that are connected together should match: the inputSchema of a step should be the same as the outputSchema of the previous step, for instance. The same is true, when using workflows as steps in other workflows, the workflow’s inputSchema should match the outputSchema of the step it is used as.
Steps are run using an execute
function that receives a context object with inputs from the previous step and/or resume data if the step is being resumed from a suspended state. The execute
function should return a value that matches its outputSchema.
Primitives such as .then()
, .parallel()
and .branch()
describe the execution flow of workflows, and how the steps within them are connected. Running workflows (whether standalone or as a step), their execution is dictated by their execution flow instead of an execute
function. The final result of a workflow will always be the result of its last step, which should match the workflow’s outputSchema.
Creating Workflows
Steps
Steps are the building blocks of workflows. Create a step using createStep
:
// Create a step with defined input/output schemas and execution logic
const inputSchema = z.object({
inputValue: z.string(),
});
const myStep = createStep({
id: "my-step",
description: "Does something useful",
inputSchema,
outputSchema: z.object({
outputValue: z.string(),
}),
// Optional: Define the resume schema for step resumption
resumeSchema: z.object({
resumeValue: z.string(),
}),
// Optional: Define the suspend schema for step suspension
suspendSchema: z.object({
suspendValue: z.string(),
}),
execute: async ({
inputData,
mastra,
getStepResult,
getInitData,
runtimeContext,
}) => {
const otherStepOutput = getStepResult(step2);
const initData = getInitData<typeof inputSchema>(); // typed as the input schema variable (zod schema)
return {
outputValue: `Processed: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`,
};
},
});
Each step requires:
id
: Unique identifier for the stepinputSchema
: Zod schema defining expected inputoutputSchema
: Zod schema defining output shaperesumeSchema
: Optional. Zod schema defining resume inputsuspendSchema
: Optional. Zod schema defining suspend inputexecute
: Async function that performs the step’s work
The execute
function receives a context object with:
inputData
: The input data matching the inputSchemaresumeData
: The resume data matching the resumeSchema, when resuming the step from a suspended state. Only exists if the step is being resumed.mastra
: Access to mastra services (agents, tools, etc.)getStepResult
: Function to access results from other stepsgetInitData
: Function to access the initial input data of the workflow in any stepsuspend
: Function to pause workflow execution (for user interaction)
Workflow Structure
Create a workflow using createWorkflow
:
// Create a workflow with defined steps and execution flow
const myWorkflow = createWorkflow({
id: "my-workflow",
// Define the expected input structure (should match the first step's inputSchema)
inputSchema: z.object({
startValue: z.string(),
}),
// Define the expected output structure (should match the last step's outputSchema)
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2, step3], // Declare steps used in this workflow
})
.then(step1)
.then(step2)
.then(step3)
.commit();
// Register workflow with Mastra instance
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
},
});
// Create a run instance of the workflow
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
The steps
property in the workflow options provides type safety for accessing step results. When you declare the steps used in your workflow, TypeScript will ensure type safety when accessing result.steps
:
// With steps declared in workflow options
const workflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({}),
outputSchema: z.object({}),
steps: [step1, step2], // TypeScript knows these steps exist
})
.then(step1)
.then(step2)
.commit();
const result = await workflow.createRun().start({ inputData: {} });
if (result.status === "success") {
console.log(result.result); // only exists if status is success
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
throw result.error;
} else if (result.status === "suspended") {
console.log(result.suspended); // only exists if status is suspended
}
// TypeScript knows these properties exist and their types
console.log(result.steps.step1.output); // Fully typed
console.log(result.steps.step2.output); // Fully typed
Workflow definition requires:
id
: Unique identifier for the workflowinputSchema
: Zod schema defining workflow inputoutputSchema
: Zod schema defining workflow outputsteps
: Array of steps used in the workflow (optional, but recommended for type safety)
Re-using steps and nested workflows
You can re-use steps and nested workflows by cloning them:
const clonedStep = cloneStep(myStep, { id: "cloned-step" });
const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });
This way you can use the same step or nested workflow in the same workflow multiple times.
import {
createWorkflow,
createStep,
cloneStep,
cloneWorkflow,
} from "@mastra/core/workflows/vNext";
const myWorkflow = createWorkflow({
id: "my-workflow",
steps: [step1, step2, step3],
});
myWorkflow.then(step1).then(step2).then(step3).commit();
const parentWorkflow = createWorkflow({
id: "parent-workflow",
steps: [myWorkflow, step4],
});
parentWorkflow
.then(myWorkflow) // nested workflow
.then(step4)
.then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" })) // cloned workflow
.then(cloneStep(step4, { id: "cloned-step-4" })) // cloned step
.commit();
Running Workflows
After defining a workflow, run it with:
// Create a run instance
const run = myWorkflow.createRun();
// Start the workflow with input data
const result = await run.start({
inputData: {
startValue: "initial data",
},
});
// Access the results
console.log(result.steps); // All step results
console.log(result.steps["step-id"].output); // Output from a specific step
if (result.status === "success") {
console.log(result.result); // The final result of the workflow, result of the last step (or `.map()` output, if used as last step)
} else if (result.status === "suspended") {
const resumeResult = await run.resume({
step: result.suspended[0], // there is always at least one step id in the suspended array, in this case we resume the first suspended execution path
resumeData: {
/* user input */
},
});
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
}
Note: don’t destructure the run
instance
i.e. don’t do
const { start, resume, ... } = myWorkflow.createRun();
// NOR
const run = myWorkflow.createRun();
const { start, resume, ... } = run;
it will break the connection to the workflow and prevent it from running
Workflow Execution Result Schema
The result of running a workflow (either from start()
or resume()
) follows this TypeScript interface:
export type WorkflowResult<...> =
| {
status: 'success';
result: z.infer<TOutput>;
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
}
| {
status: 'failed';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
error: Error;
}
| {
status: 'suspended';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
suspended: [string[], ...string[][]];
};
Result Properties Explained
-
status: Indicates the final state of the workflow execution
'success'
: Workflow completed successfully'failed'
: Workflow encountered an error'suspended'
: Workflow is paused waiting for user input
-
result: Contains the final output of the workflow, typed according to the workflow’s
outputSchema
-
suspended: Optional array of step IDs that are currently suspended. Only present when
status
is'suspended'
-
steps: A record containing the results of all executed steps
- Keys are step IDs
- Values are
StepResult
objects containing the step’s output - Type-safe based on each step’s
outputSchema
-
error: Optional error object present when
status
is'failed'
Watching Workflow Execution
You can also watch workflow execution:
const run = myWorkflow.createRun();
// Add a watcher to monitor execution
run.watch(event => {
console.log('Step completed:', event.payload.currentStep.id);
});
// Start the workflow
const result = await run.start({ inputData: {...} });
The event
object has the following schema:
type WatchEvent = {
type: "watch";
payload: {
currentStep?: {
id: string;
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
};
workflowState: {
status: "running" | "success" | "failed" | "suspended";
steps: Record<
string,
{
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
}
>;
result?: Record<string, any>;
error?: Record<string, any>;
payload?: Record<string, any>;
};
};
eventTimestamp: Date;
};
The currentStep
property is only present when the workflow is running. When the workflow is finished the status on workflowState
is changed, as well as the result
and error
properties. At the same time the currentStep
property is removed.