Skip to Content

MCPClient

MCPClientクラスは、Mastraアプリケーションで複数のMCPサーバー接続とそのツールを管理する方法を提供します。接続のライフサイクル、ツールの名前空間管理を処理し、設定されたすべてのサーバーにわたるツールへのアクセスを提供します。

このクラスは非推奨のMastraMCPClientに代わるものです。

コンストラクタ

MCPClientクラスの新しいインスタンスを作成します。

constructor({ id?: string; servers: Record<string, MastraMCPServerDefinition>; timeout?: number; }: MCPClientOptions)

MCPClientOptions


id?:

string
設定インスタンスのオプションの一意識別子。同一の設定で複数のインスタンスを作成する際にメモリリークを防ぐために使用します。

servers:

Record<string, MastraMCPServerDefinition>
サーバー設定のマップ。各キーは一意のサーバー識別子であり、値はサーバー設定です。

timeout?:

number
= 60000
個々のサーバー設定で上書きされない限り、すべてのサーバーに適用されるグローバルタイムアウト値(ミリ秒単位)。

MastraMCPServerDefinition

serversマップ内の各サーバーはMastraMCPServerDefinitionタイプを使用して設定されます。トランスポートタイプは提供されたパラメータに基づいて検出されます:

  • commandが提供されている場合、Stdioトランスポートを使用します。
  • urlが提供されている場合、最初にStreamable HTTPトランスポートを試み、初期接続が失敗した場合はレガシーSSEトランスポートにフォールバックします。

command?:

string
Stdioサーバーの場合:実行するコマンド。

args?:

string[]
Stdioサーバーの場合:コマンドに渡す引数。

env?:

Record<string, string>
Stdioサーバーの場合:コマンドに設定する環境変数。

url?:

URL
HTTPサーバー(Streamable HTTPまたはSSE)の場合:サーバーのURL。

requestInit?:

RequestInit
HTTPサーバーの場合:fetch APIのリクエスト設定。

eventSourceInit?:

EventSourceInit
SSEフォールバックの場合:SSE接続用のカスタムフェッチ設定。SSEでカスタムヘッダーを使用する場合に必要です。

logger?:

LogHandler
ロギング用のオプションの追加ハンドラー。

timeout?:

number
サーバー固有のタイムアウト(ミリ秒単位)。

capabilities?:

ClientCapabilities
サーバー固有の機能設定。

enableServerLogs?:

boolean
= true
このサーバーのロギングを有効にするかどうか。

メソッド

getTools()

設定されたすべてのサーバーからすべてのツールを取得し、競合を防ぐためにツール名をサーバー名で名前空間化(serverName_toolNameの形式)します。 エージェント定義に渡すことを目的としています。

new Agent({ tools: await mcp.getTools() });

getToolsets()

名前空間化されたツール名(serverName.toolNameの形式)をそのツール実装にマッピングするオブジェクトを返します。 generate または stream メソッドに動的に渡すことを目的としています。

const res = await agent.stream(prompt, { toolsets: await mcp.getToolsets(), });

disconnect()

すべてのMCPサーバーから切断し、リソースをクリーンアップします。

async disconnect(): Promise<void>

resources プロパティ

MCPClientインスタンスには、リソース関連の操作へのアクセスを提供するresourcesプロパティがあります。

const mcpClient = new MCPClient({ /* ...servers configuration... */ }); // mcpClient.resourcesを介してリソースメソッドにアクセス const allResourcesByServer = await mcpClient.resources.list(); const templatesByServer = await mcpClient.resources.templates(); // ... その他のリソースメソッドも同様

resources.list()

接続されたすべてのMCPサーバーから利用可能なすべてのリソースを取得し、サーバー名でグループ化します。

async list(): Promise<Record<string, Resource[]>>

例:

const resourcesByServer = await mcpClient.resources.list(); for (const serverName in resourcesByServer) { console.log(`Resources from ${serverName}:`, resourcesByServer[serverName]); }

resources.templates()

接続されたすべてのMCPサーバーから利用可能なすべてのリソーステンプレートを取得し、サーバー名でグループ化します。

async templates(): Promise<Record<string, ResourceTemplate[]>>

例:

const templatesByServer = await mcpClient.resources.templates(); for (const serverName in templatesByServer) { console.log(`Templates from ${serverName}:`, templatesByServer[serverName]); }

resources.read(serverName: string, uri: string)

指定されたサーバーから特定のリソースの内容を読み取ります。

async read(serverName: string, uri: string): Promise<ReadResourceResult>
  • serverName: サーバーの識別子(serversコンストラクタオプションで使用されるキー)。
  • uri: 読み取るリソースのURI。

例:

const content = await mcpClient.resources.read("myWeatherServer", "weather://current"); console.log("Current weather:", content.contents[0].text);

resources.subscribe(serverName: string, uri: string)

指定されたサーバー上の特定のリソースの更新をサブスクライブします。

async subscribe(serverName: string, uri: string): Promise<object>

例:

await mcpClient.resources.subscribe("myWeatherServer", "weather://current");

resources.unsubscribe(serverName: string, uri: string)

指定されたサーバー上の特定のリソースの更新からサブスクリプションを解除します。

async unsubscribe(serverName: string, uri: string): Promise<object>

例:

await mcpClient.resources.unsubscribe("myWeatherServer", "weather://current");

resources.onUpdated(serverName: string, handler: (params: { uri: string }) => void)

特定のサーバー上でサブスクライブされたリソースが更新されたときに呼び出される通知ハンドラを設定します。

async onUpdated(serverName: string, handler: (params: { uri: string }) => void): Promise<void>

例:

mcpClient.resources.onUpdated("myWeatherServer", (params) => { console.log(`Resource updated on myWeatherServer: ${params.uri}`); // ここでリソースの内容を再取得したい場合 // await mcpClient.resources.read("myWeatherServer", params.uri); });

resources.onListChanged(serverName: string, handler: () => void)

特定のサーバー上で利用可能なリソースの全体リストが変更されたときに呼び出される通知ハンドラを設定します。

async onListChanged(serverName: string, handler: () => void): Promise<void>

例:

mcpClient.resources.onListChanged("myWeatherServer", () => { console.log("Resource list changed on myWeatherServer."); // リソースのリストを再取得する必要があります // await mcpClient.resources.list(); });

静的ツール設定

アプリ全体で単一のMCPサーバー接続を持つツールの場合、getTools()を使用してツールをエージェントに渡します:

import { MCPClient } from "@mastra/mcp"; import { Agent } from "@mastra/core/agent"; import { openai } from "@ai-sdk/openai"; const mcp = new MCPClient({ servers: { stockPrice: { command: "npx", args: ["tsx", "stock-price.ts"], env: { API_KEY: "your-api-key", }, log: (logMessage) => { console.log(`[${logMessage.level}] ${logMessage.message}`); }, }, weather: { url: new URL("http://localhost:8080/sse"), }, }, timeout: 30000, // グローバルな30秒タイムアウト }); // すべてのツールにアクセスできるエージェントを作成 const agent = new Agent({ name: "Multi-tool Agent", instructions: "You have access to multiple tool servers.", model: openai("gpt-4"), tools: await mcp.getTools(), }); // リソースメソッドの使用例 async function checkWeatherResource() { try { const weatherResources = await mcp.resources.list(); if (weatherResources.weather && weatherResources.weather.length > 0) { const currentWeatherURI = weatherResources.weather[0].uri; const weatherData = await mcp.resources.read('weather', currentWeatherURI); console.log('Weather data:', weatherData.contents[0].text); } } catch (error) { console.error("Error fetching weather resource:", error); } } checkWeatherResource();

動的ツールセット

各ユーザーに新しいMCP接続が必要な場合は、getToolsets()を使用し、streamまたはgenerate呼び出し時にツールを追加します:

import { Agent } from "@mastra/core/agent"; import { MCPClient } from "@mastra/mcp"; import { openai } from "@ai-sdk/openai"; // まずツールなしでエージェントを作成 const agent = new Agent({ name: "Multi-tool Agent", instructions: "You help users check stocks and weather.", model: openai("gpt-4"), }); // 後で、ユーザー固有の設定でMCPを構成 const mcp = new MCPClient({ servers: { stockPrice: { command: "npx", args: ["tsx", "stock-price.ts"], env: { API_KEY: "user-123-api-key", }, timeout: 20000, // サーバー固有のタイムアウト }, weather: { url: new URL("http://localhost:8080/sse"), requestInit: { headers: { Authorization: `Bearer user-123-token`, }, }, }, }, }); // すべてのツールセットをstream()またはgenerate()に渡す const response = await agent.stream( "How is AAPL doing and what is the weather?", { toolsets: await mcp.getToolsets(), }, );

インスタンス管理

MCPClientクラスには、複数のインスタンスを管理するためのメモリリーク防止機能が組み込まれています:

  1. idなしで同一の構成で複数のインスタンスを作成すると、メモリリークを防ぐためにエラーがスローされます
  2. 同一の構成で複数のインスタンスが必要な場合は、各インスタンスに一意のidを提供してください
  3. 同じ構成でインスタンスを再作成する前に、await configuration.disconnect()を呼び出してください
  4. 1つのインスタンスだけが必要な場合は、再作成を避けるために構成をより高いスコープに移動することを検討してください

例えば、idなしで同じ構成の複数のインスタンスを作成しようとすると:

// 最初のインスタンス - OK const mcp1 = new MCPClient({ servers: { /* ... */ }, }); // 同じ構成の2番目のインスタンス - エラーがスローされます const mcp2 = new MCPClient({ servers: { /* ... */ }, }); // 修正するには、以下のいずれかを行います: // 1. 一意のIDを追加する const mcp3 = new MCPClient({ id: "instance-1", servers: { /* ... */ }, }); // 2. または再作成する前に切断する await mcp1.disconnect(); const mcp4 = new MCPClient({ servers: { /* ... */ }, });

サーバーライフサイクル

MCPClientはサーバー接続を適切に処理します:

  1. 複数のサーバーへの自動接続管理
  2. 開発中にエラーメッセージが表示されないようにするための適切なサーバーシャットダウン
  3. 切断時のリソースの適切なクリーンアップ

SSEリクエストヘッダーの使用

レガシーSSE MCPトランスポートを使用する場合、MCP SDKのバグにより、requestIniteventSourceInitの両方を設定する必要があります:

const sseClient = new MCPClient({ servers: { exampleServer: { url: new URL("https://your-mcp-server.com/sse"), // 注意:requestInitだけではSSEには不十分です requestInit: { headers: { Authorization: "Bearer your-token", }, }, // これもカスタムヘッダーを持つSSE接続には必要です eventSourceInit: { fetch(input: Request | URL | string, init?: RequestInit) { const headers = new Headers(init?.headers || {}); headers.set("Authorization", "Bearer your-token"); return fetch(input, { ...init, headers, }); }, }, }, }, });

関連情報