Inngest ワークフロー
Inngest は、インフラの管理を必要とせずにバックグラウンドワークフローを構築・実行できる、開発者向けプラットフォームです。
Inngest と Mastra の連携方法
Inngest と Mastra はワークフローのモデルを揃えることで統合されています。Inngest はロジックをステップで構成された関数として整理し、Mastra の createWorkflow
と createStep
で定義されたワークフローはこのパラダイムに直接対応します。各 Mastra ワークフローは一意の識別子を持つ Inngest の関数となり、ワークフロー内の各ステップは Inngest のステップにマッピングされます。
serve
関数は、Mastra のワークフローを Inngest の関数として登録し、実行と監視に必要なイベントハンドラーを設定することで、両システムを橋渡しします。
イベントによってワークフローがトリガーされると、Inngest はステップごとに実行し、各ステップの結果をメモ化します。つまり、ワークフローが再試行または再開された場合は、完了済みのステップがスキップされ、効率的で信頼性の高い実行が確保されます。Mastra のループ、条件分岐、ネストされたワークフローといった制御フローのプリミティブは、同じ Inngest の関数/ステップモデルへシームレスに変換され、合成、分岐、一時停止といった高度なワークフロー機能が保たれます。
リアルタイム監視、一時停止/再開、ステップ単位の可観測性は、Inngest の publish-subscribe システムとダッシュボードによって有効化されます。各ステップの実行時には、その状態と出力が Mastra のストレージで追跡され、必要に応じて再開できます。
セットアップ
npm install @mastra/inngest @mastra/core @mastra/deployer
Inngest ワークフローの構築
このガイドでは、Inngest と Mastra を使ってワークフローを作成する方法を解説し、値が 10 に達するまでカウントアップするカウンターアプリを例として紹介します。
Inngest の初期化
Inngest の連携を初期化して、Mastra 互換のワークフロー用ヘルパーを取得します。createWorkflow と createStep 関数は、Mastra と Inngest に対応するワークフローおよびステップのオブジェクトを作成するために使用されます。
開発環境
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime";
export const inngest = new Inngest({
id: "mastra",
baseUrl:"http://localhost:8288",
isDev: true,
middleware: [realtimeMiddleware()],
});
本番環境
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime";
export const inngest = new Inngest({
id: "mastra",
middleware: [realtimeMiddleware()],
});
ステップの作成
ワークフローを構成する各ステップを定義します:
import { z } from "zod";
import { inngest } from "../inngest";
import { init } from "@mastra/inngest";
// Mastra で Inngest を初期化し、ローカルの Inngest サーバーを指す
const { createWorkflow, createStep } = init(inngest);
// ステップ: カウンター値を1増やす
const incrementStep = createStep({
id: "increment",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 1 };
},
});
ワークフローの作成
dountil
ループパターンを使って、各ステップをワークフローに組み立てます。createWorkflow 関数は、inngest サーバー上で呼び出し可能な関数を作成します。
// workflow that is registered as a function on inngest server
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
}).then(incrementStep);
workflow.commit();
export { workflow as incrementWorkflow };
Mastra インスタンスの設定とワークフローの実行
Mastra にワークフローを登録し、Inngest の API エンドポイントを設定します。
import { Mastra } from "@mastra/core/mastra";
import { serve as inngestServe } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import { PinoLogger } from "@mastra/loggers";
// ワークフローと Inngest の API エンドポイントで Mastra を構成
export const mastra = new Mastra({
workflows: {
incrementWorkflow,
},
server: {
// ローカルの Docker コンテナが Mastra サーバーに接続できるようにするために必要なサーバー設定
host: "0.0.0.0",
apiRoutes: [
// この API ルートは Inngest サーバー上で Mastra のワークフロー(Inngest 関数)を登録するために使用されます
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
// inngestServe は次のようにして Mastra のワークフローを Inngest と統合します:
// 1. 各ワークフローに対して一意の ID(workflow.${workflowId})で Inngest 関数を作成
// 2. 次を行うイベントハンドラを設定:
// - 各ワークフロー実行ごとに一意の run ID を生成
// - ステップ実行を管理する InngestExecutionEngine を作成
// - ワークフロー状態の永続化とリアルタイム更新を処理
// 3. workflow:${workflowId}:${runId} チャンネル経由でのリアルタイム監視のための Pub/Sub を構築
//
// 任意: ワークフローとあわせて提供する追加の Inngest 関数を渡すことも可能です:
// createHandler: async ({ mastra }) => inngestServe({
// mastra,
// inngest,
// functions: [customFunction1, customFunction2] // ユーザー定義の Inngest 関数
// }),
},
],
},
logger: new PinoLogger({
name: "Mastra",
level: "info",
}),
});
ワークフローをローカルで実行する
前提条件:
- Docker がインストールされ、起動していること
- Mastra プロジェクトがセットアップ済み
- 依存関係がインストール済み(
npm install
)
npx mastra dev
を実行して、ローカルで Mastra サーバーを起動します(ポート 4111 で待ち受けます)。- Inngest の開発サーバーを起動(Docker 経由) 新しいターミナルで次を実行します:
docker run --rm -p 8288:8288 \
inngest/inngest \
inngest dev -u http://host.docker.internal:4111/api/inngest
注:
-u
の後の URL は、Inngest の開発サーバーに Mastra の/api/inngest
エンドポイントの場所を指定します。
- Inngest ダッシュボードを開く
- ブラウザで http://localhost:8288 にアクセスします。
- サイドバーの Apps セクションに移動します。
- Mastra のワークフローが登録されているはずです。
- ワークフローを起動する
- サイドバーの Functions セクションに移動します。
- Mastra のワークフローを選択します。
- Invoke をクリックし、次の入力を使用します:
{
"data": {
"inputData": {
"value": 5
}
}
}
- ワークフローの実行状況を監視する
- サイドバーの Runs タブに移動します。
- 最新の実行をクリックして、ステップごとの進行状況を確認します。
本番環境でワークフローを実行する
前提条件:
- Vercel アカウントと Vercel CLI がインストールされていること(
npm i -g vercel
)- Inngest アカウント
- Vercel トークン(推奨: 環境変数として設定)
- Mastra インスタンスに Vercel Deployer を追加する
import { VercelDeployer } from "@mastra/deployer-vercel";
export const mastra = new Mastra({
// ...other config
deployer: new VercelDeployer({
teamSlug: "your_team_slug",
projectName: "your_project_name",
// you can get your vercel token from the vercel dashboard by clicking on the user icon in the top right corner
// and then clicking on "Account Settings" and then clicking on "Tokens" on the left sidebar.
token: "your_vercel_token",
}),
});
注意: Vercel のトークンを環境変数に設定してください:
export VERCEL_TOKEN=your_vercel_token
- Mastra インスタンスをビルドする
npx mastra build
- Vercel にデプロイする
cd .mastra/output
vercel --prod
ヒント: まだの場合は、
vercel login
で Vercel CLI にログインしてください。
- Inngest ダッシュボードと同期する
- Inngest ダッシュボード にアクセスします。
- Sync new app with Vercel をクリックし、指示に従います。
- Mastra のワークフローがアプリとして登録されていることを確認します。
- ワークフローを呼び出す
- Functions セクションで
workflow.increment-workflow
を選択します。 - 右上の All actions > Invoke をクリックします。
- 次の入力を指定します:
{
"data": {
"inputData": {
"value": 5
}
}
}
- 実行を監視する
- Runs タブに移動します。
- 最新の実行をクリックし、ステップごとの進行状況を確認します。
上級者向け: カスタム Inngest 関数の追加
inngestServe
の任意パラメーター functions
を使用すると、Mastra のワークフローと併せて追加の Inngest 関数を提供できます。
カスタム関数の作成
まず、カスタム Inngest 関数を作成します。
import { inngest } from "./inngest";
// カスタム Inngest 関数を定義
export const customEmailFunction = inngest.createFunction(
{ id: 'send-welcome-email' },
{ event: 'user/registered' },
async ({ event }) => {
// ここにカスタムのメール処理ロジック
console.log(`ウェルカムメールを ${event.data.email} に送信します`);
return { status: 'email_sent' };
}
);
export const customWebhookFunction = inngest.createFunction(
{ id: 'process-webhook' },
{ event: 'webhook/received' },
async ({ event }) => {
// カスタムの Webhook 処理
console.log(`Webhook を処理中: ${event.data.type}`);
return { processed: true };
}
);
ワークフローでカスタム関数を提供する
Mastra の設定を更新して、カスタム関数を含めます:
import { Mastra } from "@mastra/core/mastra";
import { serve as inngestServe } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import { customEmailFunction, customWebhookFunction } from "./inngest/custom-functions";
export const mastra = new Mastra({
workflows: {
incrementWorkflow,
},
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => inngestServe({
mastra,
inngest,
functions: [customEmailFunction, customWebhookFunction] // カスタム関数を追加
}),
},
],
},
});
関数の登録
カスタム関数を含めると、次のことが行われます。
- Mastra のワークフローは、自動的に
workflow.${workflowId}
のような ID を持つ Inngest 関数に変換されます - カスタム関数は、指定した ID(例:
send-welcome-email
、process-webhook
)を保持します - すべての関数は同じ
/api/inngest
エンドポイントでまとめて提供されます
これにより、Mastra のワークフローオーケストレーションを既存の Inngest 関数とシームレスに統合できます。