Mastra Syncs

Syncs in Mastra are async functions that handle data synchronization and background tasks. Unlike full job systems (e.g., Inngest or Trigger.dev), Mastra syncs are executed synchronously in the current process and do not include built-in retry, scheduling, or job queue capabilities.

Creating a Sync

Create a sync using the createSync function:

import { createSync } from "@mastra/core";
import { z } from "zod";
 
export const mySync = createSync({
  id: "my-sync",
  description: "Synchronize user data",
  inputSchema: z.object({
    userId: z.string(),
    data: z.record(z.any()),
  }),
  outputSchema: z.object({
    success: z.boolean(),
    message: z.string(),
  }),
  execute: async ({ context, engine }) => {
    try {
      await engine.syncRecords({
        connectionId: "users",
        name: "user_data",
        records: [
          {
            externalId: context.userId,
            data: context.data,
          },
        ],
      });
 
      return {
        success: true,
        message: "Data synchronized successfully",
      };
    } catch (error) {
      return {
        success: false,
        message: error.message,
      };
    }
  },
});

Registering Syncs

Register syncs when initializing Mastra:

import { Mastra } from "@mastra/core";
import { PostgresEngine } from "@mastra/engine";
 
export const mastra = new Mastra({
  engine: new PostgresEngine({
    url: process.env.DATABASE_URL,
  }),
  syncs: {
    mySync,
    otherSync,
  },
});

Executing Syncs

Execute a sync using the sync method:

const result = await mastra.sync("mySync", {
  userId: "123",
  data: {
    name: "John Doe",
    email: "john@example.com",
  },
});

Sync Context

Syncs receive a context object with:

  • context: The input parameters (validated by inputSchema)
  • engine: The Mastra engine instance
  • agents: Available Mastra agents
  • vectors: Vector storage capabilities
  • llm: LLM functionality
  • runId: Optional execution ID for tracking

Production Usage

For production environments, it’s recommended to wrap Mastra syncs in a proper job system. For example:

import { Queue } from "your-job-system";
 
const queue = new Queue("data-sync");
 
queue.process("sync-user-data", async (job) => {
  const result = await mastra.sync("mySync", job.data);
  return result;
});

Example Use Cases

  1. Data Synchronization:
export const mySync = createSync({
  label: "My Sync",
  description: "This is a test sync",
  schema: z.object({
    name: z.string(),
    connectionId: z.string(),
    records: z.array(
      z.object({
        data: z.record(z.any()),
        externalId: z.string(),
      }),
    ),
  }),
  outputShema: z.object({
    message: z.string(),
  }),
  execute: async ({ context, engine }) => {
    await engine.syncRecords({
      name: context.name,
      connectionId: context.connectionId,
      records: context.records,
    });
 
    return {
      message: "Hello",
    };
  },
});
  1. Web Crawling:
examples/openapi-spec-writer/src/mastra/syncs/index.ts
export const siteCrawlSync = createSync({
  id: "site-crawl-sync",
  label: "Site Crawl Sync",
  inputSchema: z.object({
    url: z.string(),
    pathRegex: z.string(),
    limit: z.number(),
  }),
  outputSchema: z.object({
    success: z.boolean(),
    crawlData: z.array(
      z.object({
        markdown: z.string(),
        metadata: z.object({
          sourceURL: z.string(),
        }),
      })
    ),
    entityType: z.string(),
  }),
  description:
    "Crawl a website and extract the markdown content and sync it to the database",
  execute: async ({ context, engine, runId }) => {
    const toolResult = await tools.siteCrawlTool.execute({
      context,
      runId,
    });
 
    const { crawlData, entityType } = toolResult;
 
    if (!crawlData) {
      return {
        success: false,
        crawlData: [],
        entityType: "",
      };
    }
 
    const recordsToPersist = await Promise.all(
      crawlData?.flatMap(async ({ markdown, metadata }) => {
        const doc = MDocument.fromMarkdown(markdown, metadata);
 
        await doc.chunk({
          strategy: "markdown",
          options: {
            maxSize: 8190,
          },
        });
 
        const chunks = doc.getDocs();
 
        return chunks.map((c, i) => {
          return {
            externalId: `${c.metadata?.sourceURL}_chunk_${i}`,
            data: { markdown: c.text },
          };
        });
      })
    );
 
    await engine?.syncRecords({
      connectionId: "SYSTEM",
      records: recordsToPersist.flatMap((r) => r),
      name: entityType,
    });
 
    return {
      success: true,
      crawlData,
      entityType,
    };
  },

Testing Syncs

You can test syncs using the mock engine:

packages/core/src/sync/sync.test.ts
const testSync = createSync({
  id: "test",
  description: "test",
  inputSchema: z.object({
    name: z.string(),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
  execute: async ({ context }) => {
    return {
      message: `Hello, ${context.name}`,
    };
  },
});

Remember that syncs are meant for data synchronization tasks rather than full background job processing. For production applications requiring robust job scheduling, retries, and distributed execution, wrap Mastra syncs in your preferred job system (or use the job system of the platform you are deploying to).


MIT 2025 © Nextra.