🚀 WebSockets 模型上下文协议(MCP)实现
本项目聚焦于在 Cloudflare Workers 环境中,借助 WebSocket 协议对 Model Context Protocol (MCP) 进行扩展。通过此实现,能够很好地支持高频交易、实时协作环境以及需要快速响应的交互式代理等实时通信场景。
🚀 快速开始
本参考实现介绍了如何在 Cloudflare Workers 环境中使用 WebSocket 协议来扩展 Model Context Protocol (MCP),以支持实时通信场景。
✨ 主要特性
- 请求/响应协议:每个 WebSocket 消息包含
type
(消息类型)、id
(唯一请求标识符)、method
(MCP 方法名称)、params
(请求参数)和timestamp
(时间戳)等字段。
- 流式数据处理:支持大文件的分片传输,实现流式结果反馈机制,确保数据完整性和顺序性。
- 心跳检测:定期发送心跳包维持连接,监测连接状态变化,处理断线重连逻辑。
📦 安装指南
文档未提及安装步骤,此处跳过。
💻 使用示例
基础用法
import { MCPClient } from '@modelcontextprotocol/typescript-sdk';
class WebSocketTransport implements MCPTransport {
private ws: WebSocket;
private pendingRequests: Map<string, { resolve: (result: any) => void, reject: (error: any) => void}>;
constructor(serverUrl: string, agentId: string) {
this.ws = new WebSocket(`${serverUrl}/agent/${agentId}/websocket`);
this.pendingRequests = new Map();
this.ws.addEventListener('message', this.handleMessage.bind(this));
}
async send(method: string, params: any): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = crypto.randomUUID();
this.pendingRequests.set(requestId, { resolve, reject });
this.ws.send(JSON.stringify({
type: 'mcp_request',
request: { method, params },
requestId
}));
});
}
private handleMessage(event: MessageEvent) {
const message = JSON.parse(event.data);
if (message.type === 'mcp_response' && message.requestId) {
const pending = this.pendingRequests.get(message.requestId);
if (pending) {
pending.resolve(message.result);
this.pendingRequests.delete(message.requestId);
}
}
}
}
const transport = new WebSocketTransport('wss://example.com', 'agent-123');
const client = new MCPClient({ transport });
const result = await client.invoke('add', { a: 5, b: 3 });
高级用法
class WebSocketHandler {
private ws: WebSocket;
constructor(private readonly agentId: string) {}
async handleRequest(request: Request): Promise<Response> {
const response = new Response(null, {
status: 101,
webSocket: await this.upgradeToWebSocket(request)
});
return response;
}
private upgradeToWebSocket(request: Request): Promise<WebSocket> {
}
}
📚 详细文档
项目结构
.
├── src/
│ ├── mcp-websocket.ts # WebSocket 运输层实现
│ └── mcp-client.ts # MCP 客户端扩展
└── README.md # 项目文档
实现细节
核心组件
- WebSocket 连接管理:使用 Cloudflare Workers 的原生 WebSocket 支持,并集成到现有的 MCP 客户端架构中。
- 双向通信协议:具备请求/响应消息格式、流式数据处理机制以及心跳检测与连接状态监控功能。
- 状态管理:使用 Durable Objects 维护 WebSocket 连接状态,关联会话历史记录与上下文。
优势分析
对比传统 HTTP 请求的优势
- 实时性:保持长期连接,减少请求延迟。
- 带宽效率:减少来回次数,提高数据传输效率。
- 可扩展性:支持大规模并发连接,适合高吞吐量场景。
挑战与解决方案
主要挑战
- 状态管理复杂度:解决方案是使用 Durable Objects 维护会话状态。
- 消息可靠性:实施确认机制和重传策略。
- 错误处理:定义统一的错误编码和反馈机制。
🔧 技术细节
核心组件
-
WebSocket 连接管理
- 使用 Cloudflare Workers 的原生 WebSocket 支持
- 集成到现有的 MCP 客户端架构中
-
双向通信协议
- 请求/响应消息格式
- 流式数据处理机制
- 心跳检测与连接状态监控
-
状态管理
- 使用 Durable Objects 维护 WebSocket 连接状态
- 会话历史记录与上下文关联
关键代码示例
客户端侧 WebSocket 使用
import { MCPClient } from '@modelcontextprotocol/typescript-sdk';
class WebSocketTransport implements MCPTransport {
private ws: WebSocket;
private pendingRequests: Map<string, { resolve: (result: any) => void, reject: (error: any) => void}>;
constructor(serverUrl: string, agentId: string) {
this.ws = new WebSocket(`${serverUrl}/agent/${agentId}/websocket`);
this.pendingRequests = new Map();
this.ws.addEventListener('message', this.handleMessage.bind(this));
}
async send(method: string, params: any): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = crypto.randomUUID();
this.pendingRequests.set(requestId, { resolve, reject });
this.ws.send(JSON.stringify({
type: 'mcp_request',
request: { method, params },
requestId
}));
});
}
private handleMessage(event: MessageEvent) {
const message = JSON.parse(event.data);
if (message.type === 'mcp_response' && message.requestId) {
const pending = this.pendingRequests.get(message.requestId);
if (pending) {
pending.resolve(message.result);
this.pendingRequests.delete(message.requestId);
}
}
}
}
const transport = new WebSocketTransport('wss://example.com', 'agent-123');
const client = new MCPClient({ transport });
const result = await client.invoke('add', { a: 5, b: 3 });
服务器端 WebSocket 处理
class WebSocketHandler {
private ws: WebSocket;
constructor(private readonly agentId: string) {}
async handleRequest(request: Request): Promise<Response> {
const response = new Response(null, {
status: 101,
webSocket: await this.upgradeToWebSocket(request)
});
return response;
}
private upgradeToWebSocket(request: Request): Promise<WebSocket> {
}
}
📄 许可证
本项目遵循 MIT 协议。
Copyright (c) 2023 Your Name.
📞 联系方式