Claude提出的革命性AI协议标准
MCP解决了AI应用开发中的一个核心痛点:如何让AI模型安全、高效地访问和使用外部资源。在MCP之前,每个AI应用都需要自行处理与外部系统的集成,导致开发复杂、安全风险高、重复劳动多。
提供标准化的身份验证和授权机制,确保AI只能访问被明确授权的资源
统一的接口规范,让不同的AI模型和外部系统能够无缝对接
模块化设计,支持插件式扩展,轻松添加新的数据源和工具
优化的通信协议,减少延迟,提升AI与外部系统的交互效率
pip install mcp
npm install @modelcontextprotocol/sdk
#!/usr/bin/env python3
import asyncio
import json
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import Resource, Tool, TextContent
# 创建MCP服务器实例
server = Server("my-data-server")
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
"""列出可用的资源"""
return [
Resource(
uri="file:///data/users.json",
name="用户数据",
description="系统中的用户信息",
mimeType="application/json"
),
Resource(
uri="db://localhost/products",
name="产品数据库",
description="产品信息数据库连接"
)
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""读取指定资源的内容"""
if uri == "file:///data/users.json":
# 模拟读取用户数据
users = [
{"id": 1, "name": "张三", "email": "[email protected]"},
{"id": 2, "name": "李四", "email": "[email protected]"}
]
return json.dumps(users, ensure_ascii=False, indent=2)
elif uri == "db://localhost/products":
# 模拟数据库查询
products = [
{"id": 1, "name": "笔记本电脑", "price": 5999},
{"id": 2, "name": "智能手机", "price": 2999}
]
return json.dumps(products, ensure_ascii=False, indent=2)
else:
raise ValueError(f"未知的资源URI: {uri}")
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""列出可用的工具"""
return [
Tool(
name="send_email",
description="发送电子邮件",
inputSchema={
"type": "object",
"properties": {
"to": {"type": "string", "description": "收件人邮箱"},
"subject": {"type": "string", "description": "邮件主题"},
"body": {"type": "string", "description": "邮件正文"}
},
"required": ["to", "subject", "body"]
}
),
Tool(
name="calculate",
description="执行数学计算",
inputSchema={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "数学表达式"}
},
"required": ["expression"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""执行工具调用"""
if name == "send_email":
to = arguments.get("to")
subject = arguments.get("subject")
body = arguments.get("body")
# 模拟发送邮件
result = f"✅ 邮件已发送到 {to}\n主题: {subject}\n内容: {body}"
return [TextContent(type="text", text=result)]
elif name == "calculate":
expression = arguments.get("expression")
try:
# 安全的数学计算(生产环境需要更严格的验证)
result = eval(expression, {"__builtins__": {}}, {})
return [TextContent(type="text", text=f"计算结果: {expression} = {result}")]
except Exception as e:
return [TextContent(type="text", text=f"计算错误: {str(e)}")]
else:
raise ValueError(f"未知的工具: {name}")
async def main():
# 启动服务器
async with server.run_stdio() as context:
await context.wait_for_completion()
if __name__ == "__main__":
asyncio.run(main())
import sqlite3
import json # 确保json被导入
from mcp.server import Server
from mcp.types import TextContent # 确保TextContent被导入
server = Server("database-server")
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
async def query(self, sql: str, params: tuple = ()) -> list:
"""执行数据库查询"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 返回字典格式
cursor = conn.cursor()
try:
cursor.execute(sql, params)
if sql.strip().upper().startswith('SELECT'):
results = [dict(row) for row in cursor.fetchall()]
return results
else:
conn.commit()
return [{"affected_rows": cursor.rowcount}]
finally:
conn.close()
# 创建数据库管理器
db_manager = DatabaseManager("./app.db")
@server.call_tool()
async def handle_database_query(name: str, arguments: dict):
"""处理数据库查询工具"""
if name == "db_query":
sql = arguments.get("sql")
params = arguments.get("params", [])
try:
results = await db_manager.query(sql, tuple(params))
return [TextContent(
type="text",
text=json.dumps(results, ensure_ascii=False, indent=2)
)]
except Exception as e:
return [TextContent(type="text", text=f"数据库错误: {str(e)}")]
else:
raise ValueError(f"未知的工具: {name}") # 添加未知工具的处理
{
"server": {
"name": "my-data-server",
"version": "1.0.0",
"description": "我的数据服务器",
"capabilities": {
"resources": true,
"tools": true,
"prompts": false
}
},
"security": {
"authentication": {
"type": "api_key",
"required": true
},
"rate_limiting": {
"requests_per_minute": 100,
"burst_size": 10
}
},
"logging": {
"level": "INFO",
"file": "./logs/mcp_server.log"
},
"database": {
"type": "sqlite",
"path": "./data/app.db"
}
}
import asyncio
from mcp.client import ClientSession, StdioServerParameters
from mcp.types import TextContent # 确保TextContent被导入
async def main():
# 连接到MCP服务器 (请确保 my_mcp_server.py 文件存在或路径正确)
server_params = StdioServerParameters(
command="python",
args=["my_mcp_server.py"] # 这里的路径应指向你的MCP服务器脚本
)
try:
async with ClientSession(server_params) as session:
# 初始化连接
await session.initialize()
print("MCP客户端已连接.")
# 列出可用资源
resources = await session.list_resources()
print("可用资源:")
for resource in resources.resources:
print(f" - {resource.name}: {resource.description}")
# 读取资源内容
print("\n尝试读取资源: file:///data/users.json")
try:
content_response = await session.read_resource("file:///data/users.json")
if content_response and content_response.contents:
for content_item in content_response.contents:
if isinstance(content_item, TextContent):
print(f"资源内容:\n{content_item.text}")
else:
print("未获取到资源内容或内容为空。")
except Exception as e:
print(f"读取资源失败: {e}")
# 列出可用工具
tools = await session.list_tools()
print("\n可用工具:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
# 调用工具
print("\n尝试调用工具: calculate")
try:
result = await session.call_tool(
"calculate",
{"expression": "2 + 3 * 4"}
)
if result and result.content:
for content_item in result.content:
if isinstance(content_item, TextContent):
print(f"计算结果: {content_item.text}")
else:
print("未获取到计算结果或结果为空。")
except Exception as e:
print(f"调用计算工具失败: {e}")
# 发送邮件 (如果服务器端有实现)
print("\n尝试调用工具: send_email")
try:
email_result = await session.call_tool(
"send_email",
{
"to": "[email protected]",
"subject": "测试邮件",
"body": "这是一封通过MCP发送的测试邮件"
}
)
if email_result and email_result.content:
for content_item in email_result.content:
if isinstance(content_item, TextContent):
print(f"邮件发送结果: {content_item.text}")
else:
print("未获取到邮件发送结果或结果为空。")
except Exception as e:
print(f"调用发送邮件工具失败: {e}")
except Exception as e:
print(f"MCP客户端连接或操作失败: {e}")
if __name__ == "__main__":
asyncio.run(main())
连接企业数据库、数据湖,让AI直接分析业务数据,生成报告和洞察
集成监控系统、CI/CD工具,AI可自动处理告警、部署应用
连接文档管理系统,AI可搜索、总结、生成各类企业文档
集成订单系统、库存管理,AI可处理客户询问、管理订单
在Claude Desktop中配置MCP服务器,让Claude直接使用你的数据和工具
{
"mcpServers": {
"my-data-server": {
"command": "python",
"args": ["path/to/my_mcp_server.py"],
"env": {
"DATABASE_URL": "sqlite:///app.db"
}
},
"file-server": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/documents"]
}
}
}
import { ClientSession } from '@modelcontextprotocol/sdk/client/index.js'; // 修正导入路径
import { StdioServerParameters } from '@modelcontextprotocol/sdk/server/stdio.js'; // 修正导入路径
class MCPService {
constructor() {
this.client = null; // 这个属性可能用不到,可以移除,或作为通用客户端实例
this.session = null;
}
async connect() {
const serverParams = new StdioServerParameters({
command: 'python',
args: ['./mcp_server.py'] // 确保路径正确
});
this.session = new ClientSession(serverParams);
await this.session.initialize();
console.log("MCP JS客户端已连接并初始化.");
}
async queryDatabase(sql, params = []) {
if (!this.session) {
console.error("MCP session not connected.");
throw new Error("MCP session not connected.");
}
console.log(`执行数据库查询: ${sql}`);
const result = await this.session.call_tool('db_query', {
sql: sql,
params: params
});
// 假设结果的第一个content是text类型
if (result && result.content && result.content.length > 0 && result.content[0].type === 'text') {
return JSON.parse(result.content[0].text);
}
throw new Error("No database query result or invalid format.");
}
async getCustomerInfo(customerId) {
return await this.queryDatabase(
'SELECT * FROM customers WHERE id = ?',
[customerId]
);
}
async sendNotification(message, channel) {
if (!this.session) {
console.error("MCP session not connected.");
throw new Error("MCP session not connected.");
}
console.log(`发送通知: ${message} 到 ${channel}`);
const result = await this.session.call_tool('send_notification', {
message: message,
channel: channel
});
if (result && result.content && result.content.length > 0 && result.content[0].type === 'text') {
return result.content[0].text;
}
throw new Error("No notification result or invalid format.");
}
}
// 立即执行函数,模拟在HTML加载后使用
(async () => {
const mcpService = new MCPService();
try {
await mcpService.connect();
// 查询客户信息
const customer = await mcpService.getCustomerInfo(123);
console.log('客户信息:', customer);
// 发送通知
await mcpService.sendNotification('订单已处理', 'slack');
} catch (error) {
console.error("MCP服务初始化或调用失败:", error);
}
})();
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser
# 健康检查
# 注意:此处的健康检查假设您的mcp_server.py会启动一个HTTP服务在8000端口
# 实际MCP服务器通常通过STDIO通信,此处可能需要调整为更适合MCP的健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import socket; sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); sock.settimeout(1); try: sock.connect(('localhost', 8000)); sock.close(); exit(0) except: exit(1)"
# 启动服务
# MCP服务器通常通过STDIO运行,而不是监听端口。
# 如果您的MCP服务器同时提供HTTP API,则可以使用以下EXPOSE和CMD
EXPOSE 8000
CMD ["python", "mcp_server.py"] # MCP server通过STDIO与客户端通信,不需要端口unless you add an HTTP layer
version: '3.8'
services:
mcp-server:
build: .
# MCP服务器通常通过STDIO运行,不需要直接暴露端口给外部
# 如果您有HTTP管理接口或特定服务接口,再暴露
# ports:
# - "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/mcpdb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs
- ./data:/app/data
restart: unless-stopped
command: ["python", "mcp_server.py"] # 指定MCP服务器启动命令
db:
image: postgres:15
environment:
POSTGRES_DB: mcpdb
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
import logging
import time
import asyncio # 确保导入asyncio
import json # 确保导入json
from functools import wraps
from mcp.server import Server
from mcp.types import TextContent # 确保TextContent被导入
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def monitor_performance(func):
"""性能监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"{func.__name__} 执行完成,耗时: {duration:.3f}秒")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"{func.__name__} 执行失败,耗时: {duration:.3f}秒,错误: {str(e)}")
raise
return wrapper
server = Server("monitored-server")
@server.call_tool()
@monitor_performance
async def handle_monitored_tool(name: str, arguments: dict) -> list[TextContent]: # 明确返回类型
"""被监控的工具调用"""
logger.info(f"工具调用: {name}, 参数: {arguments}")
# 这里是你的工具逻辑
if name == "complex_calculation":
# 模拟复杂计算
await asyncio.sleep(1)
result = "计算完成"
logger.info(f"工具 {name} 返回结果: {result}")
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"未知的工具: {name}") # 添加未知工具的处理