Inngest ワークフロー
Inngest は、インフラストラクチャを管理することなく、バックグラウンドワークフローを構築・実行するための開発者プラットフォームです。
Mastraにおけるinngestの仕組み
InngestとMastraは、ワークフローモデルを整合させることで統合されています:Inngestはロジックをステップで構成される関数に整理し、createWorkflow
とcreateStep
を使用して定義されるMastraのワークフローはこのパラダイムに直接マッピングされます。各Mastraワークフローは一意の識別子を持つInngest関数となり、ワークフロー内の各ステップはInngestのステップにマッピングされます。
serve
関数は、MastraワークフローをInngest関数として登録し、実行とモニタリングに必要なイベントハンドラーを設定することで、両システムを橋渡しします。
イベントがワークフローをトリガーすると、Inngestはステップごとに実行し、各ステップの結果をメモ化します。これにより、ワークフローが再試行または再開された場合、完了したステップはスキップされ、効率的で信頼性の高い実行が保証されます。Mastraのループ、条件分岐、ネストされたワークフローなどの制御フロープリミティブは、Inngestの関数/ステップモデルにシームレスに変換され、コンポジション、分岐、一時停止などの高度なワークフロー機能が保持されます。
リアルタイムモニタリング、一時停止/再開、ステップレベルの可観測性は、Inngestのパブリッシュ-サブスクライブシステムとダッシュボードを通じて実現されます。各ステップが実行されると、その状態と出力はMastraストレージを使用して追跡され、必要に応じて再開することができます。
セットアップ
npm install @mastra/inngest @mastra/core @mastra/deployer
Inngest Workflowの構築
このガイドでは、InngestとMastraを使用してワークフローを作成する方法を説明し、値を10に達するまでインクリメントするカウンターアプリケーションのデモンストレーションを行います。
Inngestの初期化
Inngest統合を初期化して、Mastra互換のワークフローヘルパーを取得します。createWorkflowとcreateStep関数は、MastraとInngestに互換性のあるワークフローとステップオブジェクトを作成するために使用されます。
開発環境では
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime";
export const inngest = new Inngest({
id: "mastra",
baseUrl:"http://localhost:8288",
isDev: true,
middleware: [realtimeMiddleware()],
});
本番環境では
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime";
export const inngest = new Inngest({
id: "mastra",
middleware: [realtimeMiddleware()],
});
ステップの作成
ワークフローを構成する個々のステップを定義します:
import { z } from "zod";
import { inngest } from "../inngest";
import { init } from "@mastra/inngest";
// ローカルのInngestサーバーを指すように、MastraでInngestを初期化
const { createWorkflow, createStep } = init(inngest);
// ステップ: カウンター値をインクリメント
const incrementStep = createStep({
id: "increment",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 1 };
},
});
ワークフローの作成
dountil
ループパターンを使用してステップをワークフローに構成します。createWorkflow関数は、Inngestサーバー上で呼び出し可能な関数を作成します。
// Inngestサーバー上で関数として登録されるワークフロー
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
}).then(incrementStep);
workflow.commit();
export { workflow as incrementWorkflow };
Mastraインスタンスの設定とワークフローの実行
ワークフローをMastraに登録し、Inngest APIエンドポイントを設定します:
import { Mastra } from "@mastra/core/mastra";
import { serve as inngestServe } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import { PinoLogger } from "@mastra/loggers";
// ワークフローとInngest APIエンドポイントでMastraを設定
export const mastra = new Mastra({
workflows: {
incrementWorkflow,
},
server: {
// ローカルDockerコンテナがMastraサーバーに接続できるようにするためのサーバー設定が必要
host: "0.0.0.0",
apiRoutes: [
// このAPIルートは、InngestサーバーにMastraワークフロー(Inngest関数)を登録するために使用される
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
// inngestServe関数は以下によってMastraワークフローをInngestと統合します:
// 1. 一意のID(workflow.${workflowId})を持つ各ワークフローのInngest関数を作成
// 2. 以下を行うイベントハンドラーを設定:
// - 各ワークフロー実行に対して一意の実行IDを生成
// - ステップ実行を管理するInngestExecutionEngineを作成
// - ワークフロー状態の永続化とリアルタイム更新を処理
// 3. workflow:${workflowId}:${runId}チャンネルを通じて
// リアルタイム監視のためのパブリッシュ・サブスクライブシステムを確立
},
],
},
logger: new PinoLogger({
name: "Mastra",
level: "info",
}),
});
ワークフローのローカル実行
前提条件:
- Dockerがインストールされ、実行されている
- Mastraプロジェクトがセットアップされている
- 依存関係がインストールされている(
npm install
)
npx mastra dev
を実行して、ポート4111でサーバーを提供するためにローカルでMastraサーバーを起動します。- Inngest Dev Server(Docker経由)を開始 新しいターミナルで以下を実行:
docker run --rm -p 8288:8288 \
inngest/inngest \
inngest dev -u http://host.docker.internal:4111/inngest/api
注意:
-u
の後のURLは、Inngest dev serverにMastraの/api/inngest
エンドポイントの場所を伝えます。
- Inngest Dashboardを開く
- ブラウザで http://localhost:8288 にアクセスします。
- サイドバーの Apps セクションに移動します。
- Mastraワークフローが登録されているのが確認できるはずです。
- ワークフローを実行する
- サイドバーの Functions セクションに移動します。
- Mastraワークフローを選択します。
- Invoke をクリックして、以下の入力を使用します:
{
"data": {
"inputData": {
"value": 5
}
}
}
- ワークフローの実行を監視する
- サイドバーの Runs タブに移動します。
- 最新の実行をクリックして、ステップバイステップの実行進捗を確認します。
本番環境でのワークフロー実行
前提条件:
- Vercelアカウントとインストール済みのVercel CLI (
npm i -g vercel
)- Inngestアカウント
- Vercelトークン(推奨:環境変数として設定)
- MastraインスタンスにVercel Deployerを追加
import { VercelDeployer } from "@mastra/deployer-vercel";
export const mastra = new Mastra({
// ...その他の設定
deployer: new VercelDeployer({
teamSlug: "your_team_slug",
projectName: "your_project_name",
// vercelトークンは、右上のユーザーアイコンをクリックし、
// 「Account Settings」をクリックして、左サイドバーの「Tokens」をクリックすることで
// vercelダッシュボードから取得できます。
token: "your_vercel_token",
}),
});
注意: Vercelトークンを環境に設定してください:
export VERCEL_TOKEN=your_vercel_token
- mastraインスタンスをビルド
npx mastra build
- Vercelにデプロイ
cd .mastra/output
vercel --prod
ヒント: まだの場合は、
vercel login
でVercel CLIにログインしてください。
- Inngest Dashboardと同期
- Inngest dashboard に移動します。
- Sync new app with Vercel をクリックして、指示に従います。
- Mastraワークフローがアプリとして登録されているのが確認できるはずです。
- ワークフローを実行
- Functions セクションで、
workflow.increment-workflow
を選択します。 - All actions(右上)> Invoke をクリックします。
- 以下の入力を提供します:
{
"data": {
"inputData": {
"value": 5
}
}
}
- 実行を監視
- Runs タブに移動します。
- 最新の実行をクリックして、ステップバイステップの実行進捗を確認します。