MCP (Model Context Protocol) 详解

🔗 MCP (Model Context Protocol)

Claude提出的革命性AI协议标准

🎯 什么是MCP?

MCP (Model Context Protocol) 是由Anthropic(Claude的开发公司)在2024年11月提出的一个开放标准协议,旨在实现AI模型与外部数据源和工具的安全、标准化连接。

MCP解决了AI应用开发中的一个核心痛点:如何让AI模型安全、高效地访问和使用外部资源。在MCP之前,每个AI应用都需要自行处理与外部系统的集成,导致开发复杂、安全风险高、重复劳动多。

🔒 安全性

提供标准化的身份验证和授权机制,确保AI只能访问被明确授权的资源

🔧 标准化

统一的接口规范,让不同的AI模型和外部系统能够无缝对接

🚀 可扩展性

模块化设计,支持插件式扩展,轻松添加新的数据源和工具

⚡ 高效性

优化的通信协议,减少延迟,提升AI与外部系统的交互效率

🏗️ MCP架构概览

graph TB User[用户] --> Client[MCP客户端/AI模型] Client --> Server[MCP服务器] Server --> DB[(数据库)] Server --> API[外部API] Server --> Files[文件系统] Server --> Tools[工具集] Client -.->|JSON-RPC协议| Server Server -.->|标准化接口| DB Server -.->|RESTful/GraphQL| API Server -.->|文件I/O| Files Server -.->|函数调用| Tools

🛠️ 如何开发MCP

开发MCP主要涉及两个组件:
1. MCP Server:提供资源和工具的服务端
2. MCP Client:使用资源和工具的客户端(通常是AI应用)

📋 开发前准备

🔧 开发MCP Server

1

安装MCP SDK

Python安装
pip install mcp
Node.js安装
npm install @modelcontextprotocol/sdk
2

创建基础MCP Server

Python示例 - 基础服务器
#!/usr/bin/env python3
import asyncio
import json
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import Resource, Tool, TextContent

# 创建MCP服务器实例
server = Server("my-data-server")

@server.list_resources()
async def handle_list_resources() -> list[Resource]:
    """列出可用的资源"""
    return [
        Resource(
            uri="file:///data/users.json",
            name="用户数据",
            description="系统中的用户信息",
            mimeType="application/json"
        ),
        Resource(
            uri="db://localhost/products",
            name="产品数据库", 
            description="产品信息数据库连接"
        )
    ]

@server.read_resource()
async def handle_read_resource(uri: str) -> str:
    """读取指定资源的内容"""
    if uri == "file:///data/users.json":
        # 模拟读取用户数据
        users = [
            {"id": 1, "name": "张三", "email": "[email protected]"},
            {"id": 2, "name": "李四", "email": "[email protected]"}
        ]
        return json.dumps(users, ensure_ascii=False, indent=2)
    
    elif uri == "db://localhost/products":
        # 模拟数据库查询
        products = [
            {"id": 1, "name": "笔记本电脑", "price": 5999},
            {"id": 2, "name": "智能手机", "price": 2999}
        ]
        return json.dumps(products, ensure_ascii=False, indent=2)
    
    else:
        raise ValueError(f"未知的资源URI: {uri}")

@server.list_tools()
async def handle_list_tools() -> list[Tool]:
    """列出可用的工具"""
    return [
        Tool(
            name="send_email",
            description="发送电子邮件",
            inputSchema={
                "type": "object",
                "properties": {
                    "to": {"type": "string", "description": "收件人邮箱"},
                    "subject": {"type": "string", "description": "邮件主题"},
                    "body": {"type": "string", "description": "邮件正文"}
                },
                "required": ["to", "subject", "body"]
            }
        ),
        Tool(
            name="calculate",
            description="执行数学计算",
            inputSchema={
                "type": "object", 
                "properties": {
                    "expression": {"type": "string", "description": "数学表达式"}
                },
                "required": ["expression"]
            }
        )
    ]

@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
    """执行工具调用"""
    if name == "send_email":
        to = arguments.get("to")
        subject = arguments.get("subject") 
        body = arguments.get("body")
        
        # 模拟发送邮件
        result = f"✅ 邮件已发送到 {to}\n主题: {subject}\n内容: {body}"
        return [TextContent(type="text", text=result)]
    
    elif name == "calculate":
        expression = arguments.get("expression")
        try:
            # 安全的数学计算(生产环境需要更严格的验证)
            result = eval(expression, {"__builtins__": {}}, {})
            return [TextContent(type="text", text=f"计算结果: {expression} = {result}")]
        except Exception as e:
            return [TextContent(type="text", text=f"计算错误: {str(e)}")]
    
    else:
        raise ValueError(f"未知的工具: {name}")

async def main():
    # 启动服务器
    async with server.run_stdio() as context:
        await context.wait_for_completion()

if __name__ == "__main__":
    asyncio.run(main())
3

高级功能开发

添加数据库连接
import sqlite3
import json # 确保json被导入
from mcp.server import Server
from mcp.types import TextContent # 确保TextContent被导入

server = Server("database-server")

class DatabaseManager:
    def __init__(self, db_path: str):
        self.db_path = db_path
    
    async def query(self, sql: str, params: tuple = ()) -> list:
        """执行数据库查询"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row  # 返回字典格式
        cursor = conn.cursor()
        
        try:
            cursor.execute(sql, params)
            if sql.strip().upper().startswith('SELECT'):
                results = [dict(row) for row in cursor.fetchall()]
                return results
            else:
                conn.commit()
                return [{"affected_rows": cursor.rowcount}]
        finally:
            conn.close()

# 创建数据库管理器
db_manager = DatabaseManager("./app.db")

@server.call_tool()
async def handle_database_query(name: str, arguments: dict):
    """处理数据库查询工具"""
    if name == "db_query":
        sql = arguments.get("sql")
        params = arguments.get("params", [])
        
        try:
            results = await db_manager.query(sql, tuple(params))
            return [TextContent(
                type="text", 
                text=json.dumps(results, ensure_ascii=False, indent=2)
            )]
        except Exception as e:
            return [TextContent(type="text", text=f"数据库错误: {str(e)}")]
    else:
        raise ValueError(f"未知的工具: {name}") # 添加未知工具的处理
4

配置文件设置

mcp_config.json
{
  "server": {
    "name": "my-data-server",
    "version": "1.0.0",
    "description": "我的数据服务器",
    "capabilities": {
      "resources": true,
      "tools": true,
      "prompts": false
    }
  },
  "security": {
    "authentication": {
      "type": "api_key",
      "required": true
    },
    "rate_limiting": {
      "requests_per_minute": 100,
      "burst_size": 10
    }
  },
  "logging": {
    "level": "INFO",
    "file": "./logs/mcp_server.log"
  },
  "database": {
    "type": "sqlite",
    "path": "./data/app.db"
  }
}

📱 开发MCP Client

Python客户端示例
import asyncio
from mcp.client import ClientSession, StdioServerParameters
from mcp.types import TextContent # 确保TextContent被导入

async def main():
    # 连接到MCP服务器 (请确保 my_mcp_server.py 文件存在或路径正确)
    server_params = StdioServerParameters(
        command="python",
        args=["my_mcp_server.py"] # 这里的路径应指向你的MCP服务器脚本
    )
    
    try:
        async with ClientSession(server_params) as session:
            # 初始化连接
            await session.initialize()
            print("MCP客户端已连接.")
            
            # 列出可用资源
            resources = await session.list_resources()
            print("可用资源:")
            for resource in resources.resources:
                print(f"  - {resource.name}: {resource.description}")
            
            # 读取资源内容
            print("\n尝试读取资源: file:///data/users.json")
            try:
                content_response = await session.read_resource("file:///data/users.json")
                if content_response and content_response.contents:
                    for content_item in content_response.contents:
                        if isinstance(content_item, TextContent):
                            print(f"资源内容:\n{content_item.text}")
                else:
                    print("未获取到资源内容或内容为空。")
            except Exception as e:
                print(f"读取资源失败: {e}")

            # 列出可用工具
            tools = await session.list_tools()
            print("\n可用工具:")
            for tool in tools.tools:
                print(f"  - {tool.name}: {tool.description}")
            
            # 调用工具
            print("\n尝试调用工具: calculate")
            try:
                result = await session.call_tool(
                    "calculate",
                    {"expression": "2 + 3 * 4"}
                )
                if result and result.content:
                    for content_item in result.content:
                        if isinstance(content_item, TextContent):
                            print(f"计算结果: {content_item.text}")
                else:
                    print("未获取到计算结果或结果为空。")
            except Exception as e:
                print(f"调用计算工具失败: {e}")

            # 发送邮件 (如果服务器端有实现)
            print("\n尝试调用工具: send_email")
            try:
                email_result = await session.call_tool(
                    "send_email",
                    {
                        "to": "[email protected]",
                        "subject": "测试邮件",
                        "body": "这是一封通过MCP发送的测试邮件"
                    }
                )
                if email_result and email_result.content:
                    for content_item in email_result.content:
                        if isinstance(content_item, TextContent):
                            print(f"邮件发送结果: {content_item.text}")
                else:
                    print("未获取到邮件发送结果或结果为空。")
            except Exception as e:
                print(f"调用发送邮件工具失败: {e}")
                
    except Exception as e:
        print(f"MCP客户端连接或操作失败: {e}")

if __name__ == "__main__":
    asyncio.run(main())

🚀 如何使用MCP

💼 企业级应用场景

📊 数据分析AI

连接企业数据库、数据湖,让AI直接分析业务数据,生成报告和洞察

🔧 自动化运维

集成监控系统、CI/CD工具,AI可自动处理告警、部署应用

📝 文档处理

连接文档管理系统,AI可搜索、总结、生成各类企业文档

🛒 电商助手

集成订单系统、库存管理,AI可处理客户询问、管理订单

🔧 集成常见系统

1

连接Claude Desktop

在Claude Desktop中配置MCP服务器,让Claude直接使用你的数据和工具

Claude Desktop配置文件
{
  "mcpServers": {
    "my-data-server": {
      "command": "python",
      "args": ["path/to/my_mcp_server.py"],
      "env": {
        "DATABASE_URL": "sqlite:///app.db"
      }
    },
    "file-server": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/documents"]
    }
  }
}
2

Web应用集成

JavaScript/TypeScript集成
import { ClientSession } from '@modelcontextprotocol/sdk/client/index.js'; // 修正导入路径
import { StdioServerParameters } from '@modelcontextprotocol/sdk/server/stdio.js'; // 修正导入路径

class MCPService {
    constructor() {
        this.client = null; // 这个属性可能用不到,可以移除,或作为通用客户端实例
        this.session = null;
    }
    
    async connect() {
        const serverParams = new StdioServerParameters({
            command: 'python',
            args: ['./mcp_server.py'] // 确保路径正确
        });
        
        this.session = new ClientSession(serverParams);
        await this.session.initialize();
        console.log("MCP JS客户端已连接并初始化.");
    }
    
    async queryDatabase(sql, params = []) {
        if (!this.session) {
            console.error("MCP session not connected.");
            throw new Error("MCP session not connected.");
        }
        console.log(`执行数据库查询: ${sql}`);
        const result = await this.session.call_tool('db_query', {
            sql: sql,
            params: params
        });
        
        // 假设结果的第一个content是text类型
        if (result && result.content && result.content.length > 0 && result.content[0].type === 'text') {
            return JSON.parse(result.content[0].text);
        }
        throw new Error("No database query result or invalid format.");
    }
    
    async getCustomerInfo(customerId) {
        return await this.queryDatabase(
            'SELECT * FROM customers WHERE id = ?',
            [customerId]
        );
    }
    
    async sendNotification(message, channel) {
        if (!this.session) {
            console.error("MCP session not connected.");
            throw new Error("MCP session not connected.");
        }
        console.log(`发送通知: ${message} 到 ${channel}`);
        const result = await this.session.call_tool('send_notification', {
            message: message,
            channel: channel
        });
        if (result && result.content && result.content.length > 0 && result.content[0].type === 'text') {
            return result.content[0].text;
        }
        throw new Error("No notification result or invalid format.");
    }
}

// 立即执行函数,模拟在HTML加载后使用
(async () => {
    const mcpService = new MCPService();
    try {
        await mcpService.connect();

        // 查询客户信息
        const customer = await mcpService.getCustomerInfo(123);
        console.log('客户信息:', customer);

        // 发送通知
        await mcpService.sendNotification('订单已处理', 'slack');
    } catch (error) {
        console.error("MCP服务初始化或调用失败:", error);
    }
})();
3

生产环境部署

Docker部署
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd -m mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser

# 健康检查
# 注意:此处的健康检查假设您的mcp_server.py会启动一个HTTP服务在8000端口
# 实际MCP服务器通常通过STDIO通信,此处可能需要调整为更适合MCP的健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import socket; sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); sock.settimeout(1); try: sock.connect(('localhost', 8000)); sock.close(); exit(0) except: exit(1)"

# 启动服务
# MCP服务器通常通过STDIO运行,而不是监听端口。
# 如果您的MCP服务器同时提供HTTP API,则可以使用以下EXPOSE和CMD
EXPOSE 8000
CMD ["python", "mcp_server.py"] # MCP server通过STDIO与客户端通信,不需要端口unless you add an HTTP layer
docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    # MCP服务器通常通过STDIO运行,不需要直接暴露端口给外部
    # 如果您有HTTP管理接口或特定服务接口,再暴露
    # ports:
    #  - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/mcpdb
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
    restart: unless-stopped
    command: ["python", "mcp_server.py"] # 指定MCP服务器启动命令
    
  db:
    image: postgres:15
    environment:
      POSTGRES_DB: mcpdb
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

📈 监控和调试

监控和日志记录
import logging
import time
import asyncio # 确保导入asyncio
import json # 确保导入json
from functools import wraps
from mcp.server import Server
from mcp.types import TextContent # 确保TextContent被导入

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp_server.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def monitor_performance(func):
    """性能监控装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"{func.__name__} 执行完成,耗时: {duration:.3f}秒")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"{func.__name__} 执行失败,耗时: {duration:.3f}秒,错误: {str(e)}")
            raise
    return wrapper

server = Server("monitored-server")

@server.call_tool()
@monitor_performance
async def handle_monitored_tool(name: str, arguments: dict) -> list[TextContent]: # 明确返回类型
    """被监控的工具调用"""
    logger.info(f"工具调用: {name}, 参数: {arguments}")
    
    # 这里是你的工具逻辑
    if name == "complex_calculation":
        # 模拟复杂计算
        await asyncio.sleep(1)
        result = "计算完成"
        logger.info(f"工具 {name} 返回结果: {result}")
        return [TextContent(type="text", text=result)]
    else:
        raise ValueError(f"未知的工具: {name}") # 添加未知工具的处理
⚠️ 安全提醒
• 在生产环境中,务必实施严格的身份验证和授权
• 对所有输入进行验证和清理,防止注入攻击
• 使用HTTPS加密所有通信
• 定期更新依赖包,修复安全漏洞
• 实施访问控制和审计日志

🎯 最佳实践与建议

🔧 开发建议

🔒 安全实践

📊 性能优化

💡 发展趋势
MCP作为AI生态系统的重要基础设施,预计将在以下方面持续发展:
• 更丰富的标准化工具和连接器
• 更完善的安全和治理框架
• 与主流云平台的深度集成
• 支持更多编程语言和框架
• 企业级功能增强(如多租户、高可用等)

互动区域

登录后可以点赞此内容

参与互动

登录后可以点赞和评论此内容,与作者互动交流