DocsReferenceWorkflowsWorkflow

Workflow

The Workflow class enables you to create state machines for complex sequences of operations with conditional branching and data validation.

import { Workflow } from '@mastra/core';
 
const workflow = new Workflow('my-workflow');

API Reference

Constructor

name:

string
Identifier for the workflow

logger?:

Logger<WorkflowLogMessage>
Optional logger instance for workflow execution details

Core Methods

addStep()

Adds a Step to the workflow, including transitions to other steps. Returns the workflow instance for chaining. Learn more about steps.

setTriggerSchema()

Sets a schema for validating initial workflow data.

commit()

Validates and finalizes the workflow configuration. Must be called after adding all steps.

executeWorkflow()

Executes the workflow with optional trigger data.

Trigger Schemas

Trigger schemas validate the initial data passed to a workflow using Zod.

const workflow = new Workflow('order-process')
  .setTriggerSchema(
    z.object({
      orderId: z.string(),
      customer: z.object({
        id: z.string(),
        email: z.string().email()
      })
    })
  );

The schema:

  • Validates data passed to executeWorkflow()
  • Provides TypeScript types for your workflow input

Variables & Data Flow

Variables allow steps to access data from:

  • Previous steps’ outputs
  • Trigger data
  • Static payloads
workflow
  .addStep('createOrder', {
    // Access trigger data
    variables: {
      orderId: { stepId: 'trigger', path: 'orderId' }
    },
    action: async (data) => ({
      status: 'created',
      total: 100
    })
  })
  .addStep('processPayment', {
    variables: {
      // Access previous step's data
      orderStatus: { stepId: 'createOrder', path: 'status' },
      amount: { stepId: 'createOrder', path: 'total' }
    },
    // Add static values
    payload: {
      currency: 'USD'
    },
    action: async (data) => {
      // data contains:
      // - orderStatus and amount from createOrder step
      // - currency from payload
      return { status: 'paid' };
    }
  });

Variable Resolution

  • Variables are resolved in order of step execution
  • Each step can access outputs of all previous steps
  • Paths use dot notation for nested data
  • Missing or invalid paths throw errors during execution

Example

const workflow = new Workflow('process-data')
  .setTriggerSchema(z.object({
    items: z.array(z.object({
      id: z.number(),
      value: z.number()
    }))
  }))
  .addStep('filter', {
    action: async (data) => ({
      filtered: data.items.filter(item => item.value > 50)
    }),
    variables: {
      items: { stepId: 'trigger', path: '.' }
    },
    transitions: {
      process: {
        condition: {
          ref: { stepId: 'filter', path: 'filtered' },
          query: { $where: (value) => value.length > 0 }
        }
      }
    }
  })
  .addStep('process', {
    action: async ({ items }) => ({
      processed: items.map(item => ({
        id: item.id,
        doubled: item.value * 2
      }))
    }),
    variables: {
      items: { stepId: 'filter', path: 'filtered' }
    }
  })
  .commit();

Validation

Workflow validation happens at two key times:

1. At Commit Time

When you call .commit(), the workflow validates:

workflow
  .addStep('step1', {...})
  .addStep('step2', {...})
  .commit(); // Validates workflow structure
  • Circular dependencies between steps
  • Terminal paths (every path must end)
  • Unreachable steps
  • Variable references to non-existent steps
  • Duplicate step IDs

2. During Execution

When you call executeWorkflow(), it validates:

// Validates trigger data against schema
await workflow.executeWorkflow({
  orderId: '123',
  customer: {
    id: 'cust_123',
    email: 'invalid-email' // Will fail validation
  }
});
  • Trigger data against trigger schema
  • Each step’s input data against its inputSchema
  • Variable paths exist in referenced step outputs
  • Required variables are present

Error Handling

try {
  await workflow.executeWorkflow(data);
} catch (error) {
  if (error instanceof ValidationError) {
    // Handle validation errors
    console.log(error.type); // 'circular_dependency' | 'no_terminal_path' | 'unreachable_step'
    console.log(error.details); // { stepId?: string, path?: string[] }
  }
}