Inngest ワークフロー
Inngest は、インフラストラクチャを管理することなく、バックグラウンドワークフローを構築・実行するための開発者プラットフォームです。
セットアップ
npm install @mastra/inngest @mastra/core @mastra/deployer @hono/node-server
ローカル開発環境
Inngestはローカル開発のために2つの方法を提供しています:
オプション1:Dockerを使用する
Dockerを使用してInngestをポート8288でローカルに実行し、ポート3000でイベントをリッスンするように設定します:
docker run --rm -p 8288:8288 \
inngest/inngest \
inngest dev -u http://host.docker.internal:3000/inngest/api
オプション2:Inngest CLI
あるいは、公式のInngest Dev Serverガイド に従ってInngest CLIをローカル開発に使用することもできます。
ヒント:Inngestが実行されると、http://localhost:8288 でInngestダッシュボードにアクセスして、ワークフローの実行をリアルタイムで監視およびデバッグできます。
Inngestワークフローの構築
このガイドでは、InngestとMastraを使用してワークフローを作成する方法を説明します。値が10に達するまでカウンターをインクリメントするアプリケーションを例として示します。
Inngestの初期化
Inngest統合を初期化して、Mastra互換のワークフローヘルパーを取得します:
src/mastra/workflows/inngest-workflow.ts
import { init } from "@mastra/inngest";
import { Inngest } from "inngest";
// Initialize Inngest with Mastra, pointing to your local Inngest server
const { createWorkflow, createStep } = init(
new Inngest({
id: "mastra",
baseUrl: `http://localhost:8288`,
}),
);
ステップの作成
ワークフローを構成する個々のステップを定義します:
src/mastra/workflows/inngest-workflow.ts
import { z } from "zod";
// Step 1: Increment the counter value
const incrementStep = createStep({
id: "increment",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 1 };
},
});
// Step 2: Log the current value (side effect)
const sideEffectStep = createStep({
id: "side-effect",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
console.log("Current value:", inputData.value);
return { value: inputData.value };
},
});
// Step 3: Final step after loop completion
const finalStep = createStep({
id: "final",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value };
},
});
ワークフローの作成
dountil
ループパターンを使用してステップをワークフローに構成します:
src/mastra/workflows/inngest-workflow.ts
// Create the main workflow that uses a do-until loop
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
})
// Loop until the condition is met (value reaches 10)
.dountil(
createWorkflow({
id: "increment-subworkflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
steps: [incrementStep, sideEffectStep],
})
.then(incrementStep)
.then(sideEffectStep)
.commit(),
async ({ inputData }) => inputData.value >= 10,
)
.then(finalStep);
workflow.commit();
export { workflow as incrementWorkflow };
Mastraインスタンスの設定とワークフローの実行
ワークフローをMastraに登録し、Inngest APIエンドポイントを設定します:
src/mastra/index.ts
import { Mastra } from "@mastra/core/mastra";
import { serve as inngestServe } from "@mastra/inngest";
import { PinoLogger } from "@mastra/loggers";
import { Inngest } from "inngest";
import { incrementWorkflow } from "./workflows/inngest-workflow";
import { realtimeMiddleware } from "@inngest/realtime";
import { serve } from "@hono/node-server";
import { createHonoServer } from "@mastra/deployer/server";
// Create an Inngest instance with realtime middleware for development
const inngest = new Inngest({
id: "mastra",
baseUrl: `http://localhost:8288`,
isDev: true,
middleware: [realtimeMiddleware()],
});
// Configure Mastra with the workflow and Inngest API endpoint
export const mastra = new Mastra({
vnext_workflows: {
incrementWorkflow,
},
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
},
],
},
logger: new PinoLogger({
name: "Mastra",
level: "info",
}),
});
// Create and start the Hono server
const app = await createHonoServer(mastra);
const srv = serve({
fetch: app.fetch,
port: 3000,
});
// Get the workflow, create a run, and start it with an initial value
const workflow = mastra.vnext_getWorkflow("incrementWorkflow");
const run = workflow.createRun({});
const result = await run.start({ inputData: { value: 5 } });
console.dir(result, { depth: null });
// Close the server when done
srv.close();
ワークフローを開始した後、Inngestダッシュボード にアクセスして実行の進捗状況を監視し、ステップの出力を確認し、問題をデバッグすることができます。
本番環境での実行
Inngest CloudとVercelでMastraワークフローをデプロイするには、以下の手順に従ってください:
- Inngest初期化から
baseUrl
を削除します。 - MastraアプリケーションをVercelにデプロイします。公式のMastraデプロイメントガイドに従ってください:MastraをVercelにデプロイする
- Inngest Cloud にアクセスします。
- 公式のVercel統合を使用してVercelプロジェクトを接続します:Inngest Cloud Vercel統合ガイド
- これにより、サーバーレス関数が自動的に同期され、ワークフローエンドポイントが登録されます。
- Inngest Cloudダッシュボードを使用して、イベントをトリガーし、ワークフローの実行、ログ、ステップの出力を監視します。