Mastra Syncs
Syncs in Mastra are async functions that handle data synchronization and background tasks. Unlike full job systems (e.g., Inngest or Trigger.dev), Mastra syncs are executed synchronously in the current process and do not include built-in retry, scheduling, or job queue capabilities.
Creating a Sync
Create a sync using the createSync
function:
import { createSync } from "@mastra/core";
import { z } from "zod";
export const mySync = createSync({
id: "my-sync",
description: "Synchronize user data",
inputSchema: z.object({
userId: z.string(),
data: z.record(z.any()),
}),
outputSchema: z.object({
success: z.boolean(),
message: z.string(),
}),
execute: async ({ context, engine }) => {
try {
await engine.syncRecords({
connectionId: "users",
name: "user_data",
records: [
{
externalId: context.userId,
data: context.data,
},
],
});
return {
success: true,
message: "Data synchronized successfully",
};
} catch (error) {
return {
success: false,
message: error.message,
};
}
},
});
Registering Syncs
Register syncs when initializing Mastra:
import { Mastra } from "@mastra/core";
import { PostgresEngine } from "@mastra/engine";
export const mastra = new Mastra({
engine: new PostgresEngine({
url: process.env.DATABASE_URL,
}),
syncs: {
mySync,
otherSync,
},
});
Executing Syncs
Execute a sync using the sync
method:
const result = await mastra.sync("mySync", {
userId: "123",
data: {
name: "John Doe",
email: "john@example.com",
},
});
Sync Context
Syncs receive a context object with:
context
: The input parameters (validated byinputSchema
)engine
: The Mastra engine instanceagents
: Available Mastra agentsvectors
: Vector storage capabilitiesllm
: LLM functionalityrunId
: Optional execution ID for tracking
Production Usage
For production environments, it’s recommended to wrap Mastra syncs in a proper job system. For example:
import { Queue } from "your-job-system";
const queue = new Queue("data-sync");
queue.process("sync-user-data", async (job) => {
const result = await mastra.sync("mySync", job.data);
return result;
});
Example Use Cases
- Data Synchronization:
export const mySync = createSync({
label: "My Sync",
description: "This is a test sync",
schema: z.object({
name: z.string(),
connectionId: z.string(),
records: z.array(
z.object({
data: z.record(z.any()),
externalId: z.string(),
}),
),
}),
outputShema: z.object({
message: z.string(),
}),
execute: async ({ context, engine }) => {
await engine.syncRecords({
name: context.name,
connectionId: context.connectionId,
records: context.records,
});
return {
message: "Hello",
};
},
});
- Web Crawling:
export const siteCrawlSync = createSync({
id: "site-crawl-sync",
label: "Site Crawl Sync",
inputSchema: z.object({
url: z.string(),
pathRegex: z.string(),
limit: z.number(),
}),
outputSchema: z.object({
success: z.boolean(),
crawlData: z.array(
z.object({
markdown: z.string(),
metadata: z.object({
sourceURL: z.string(),
}),
})
),
entityType: z.string(),
}),
description:
"Crawl a website and extract the markdown content and sync it to the database",
execute: async ({ context, engine, runId }) => {
const toolResult = await tools.siteCrawlTool.execute({
context,
runId,
});
const { crawlData, entityType } = toolResult;
if (!crawlData) {
return {
success: false,
crawlData: [],
entityType: "",
};
}
const recordsToPersist = await Promise.all(
crawlData?.flatMap(async ({ markdown, metadata }) => {
const doc = MDocument.fromMarkdown(markdown, metadata);
await doc.chunk({
strategy: "markdown",
options: {
maxSize: 8190,
},
});
const chunks = doc.getDocs();
return chunks.map((c, i) => {
return {
externalId: `${c.metadata?.sourceURL}_chunk_${i}`,
data: { markdown: c.text },
};
});
})
);
await engine?.syncRecords({
connectionId: "SYSTEM",
records: recordsToPersist.flatMap((r) => r),
name: entityType,
});
return {
success: true,
crawlData,
entityType,
};
},
Testing Syncs
You can test syncs using the mock engine:
const testSync = createSync({
id: "test",
description: "test",
inputSchema: z.object({
name: z.string(),
}),
outputSchema: z.object({
message: z.string(),
}),
execute: async ({ context }) => {
return {
message: `Hello, ${context.name}`,
};
},
});
Remember that syncs are meant for data synchronization tasks rather than full background job processing. For production applications requiring robust job scheduling, retries, and distributed execution, wrap Mastra syncs in your preferred job system (or use the job system of the platform you are deploying to).