返回文章列表
Agent
服务模板
MCP
Function Calling
LangGraph

Agent 服务模板抽取实用手册:从 API 到 Agent 可复用服务

2026年3月6日 07:3054 分钟加载中作者: Lhy099

概述

本文档提供 Agent 服务化过程中常用的模板抽取方案,包括:

  1. API → Function Schema 自动抽取
  2. Agent 微服务标准模板
  3. LangGraph 可复用 Agent 模板
  4. MCP (Model Context Protocol) 服务模板

模板一:API 到 Function Calling Schema 抽取

1.1 从 OpenAPI/Swagger 自动转换

"""
API Schema 抽取器:将 REST API 转换为 LLM Function Calling Schema
"""
from typing import Dict, Any, List, Optional
import json
from pydantic import BaseModel, Field

class ParameterSchema(BaseModel):
    """参数定义"""
    name: str
    type: str
    description: str
    required: bool = True
    enum: Optional[List[str]] = None
    default: Optional[Any] = None

class FunctionSchema(BaseModel):
    """OpenAI Function Calling 标准格式"""
    name: str
    description: str
    parameters: Dict[str, Any]
    
class APISchemaExtractor:
    """从 REST API 提取 Function Schema"""
    
    TYPE_MAPPING = {
        "string": "string",
        "integer": "number",
        "number": "number",
        "boolean": "boolean",
        "array": "array",
        "object": "object"
    }
    
    def extract_from_openapi(self, openapi_spec: Dict, path: str, method: str) -> FunctionSchema:
        """从 OpenAPI 规范提取 Function Schema"""
        endpoint = openapi_spec["paths"][path][method]
        
        # 构建函数名:如 GET /users/{id} → get_user_by_id
        operation_id = endpoint.get("operationId", self._generate_operation_id(path, method))
        description = endpoint.get("summary", endpoint.get("description", ""))
        
        # 提取参数
        properties = {}
        required = []
        
        # 处理路径参数和查询参数
        if "parameters" in endpoint:
            for param in endpoint["parameters"]:
                param_schema = self._convert_parameter(param)
                properties[param["name"]] = param_schema
                if param.get("required", False):
                    required.append(param["name"])
        
        # 处理请求体
        if "requestBody" in endpoint:
            body_schema = endpoint["requestBody"]["content"]["application/json"]["schema"]
            body_properties = self._convert_schema(body_schema)
            properties["body"] = {
                "type": "object",
                "description": "请求体参数",
                "properties": body_properties.get("properties", {}),
                "required": body_properties.get("required", [])
            }
            required.append("body")
        
        return FunctionSchema(
            name=operation_id,
            description=description,
            parameters={
                "type": "object",
                "properties": properties,
                "required": required
            }
        )
    
    def _convert_parameter(self, param: Dict) -> Dict:
        """转换单个参数"""
        schema = param.get("schema", {})
        result = {
            "type": self.TYPE_MAPPING.get(schema.get("type", "string"), "string"),
            "description": param.get("description", "")
        }
        if "enum" in schema:
            result["enum"] = schema["enum"]
        if "default" in schema:
            result["default"] = schema["default"]
        return result
    
    def _generate_operation_id(self, path: str, method: str) -> str:
        """生成操作 ID"""
        parts = path.strip("/").replace("{", "").replace("}", "").split("/")
        return f"{method.lower()}_{'_'.join(parts)}"

# 使用示例
"""
extractor = APISchemaExtractor()
schema = extractor.extract_from_openapi(openapi_spec, "/users/{id}", "get")
print(schema.json(indent=2))
"""

1.2 转换后标准格式示例

{
  "name": "get_weather",
  "description": "获取指定城市的天气信息",
  "parameters": {
    "type": "object",
    "properties": {
      "city": {
        "type": "string",
        "description": "城市名称,如 Beijing、Shanghai"
      },
      "unit": {
        "type": "string",
        "enum": ["celsius", "fahrenheit"],
        "default": "celsius",
        "description": "温度单位"
      }
    },
    "required": ["city"]
  }
}

模板二:Agent 微服务标准模板

2.1 项目结构模板

agent-service-template/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── pyproject.toml
├── README.md
├── .env.example
├── src/
│   ├── __init__.py
│   ├── main.py              # FastAPI 入口
│   ├── config.py            # 配置管理
│   ├── agent/
│   │   ├── __init__.py
│   │   ├── core.py          # Agent 核心逻辑
│   │   ├── tools.py         # 工具定义
│   │   └── memory.py        # 记忆管理
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes.py        # API 路由
│   │   ├── schemas.py       # Pydantic 模型
│   │   └── dependencies.py  # 依赖注入
│   └── utils/
│       ├── __init__.py
│       └── logger.py
├── tests/
│   ├── __init__.py
│   ├── test_agent.py
│   └── test_api.py
└── k8s/
    ├── deployment.yaml
    ├── service.yaml
    └── configmap.yaml

2.2 FastAPI 服务封装模板

# src/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from typing import Dict, Any, AsyncGenerator

from src.agent.core import AgentService
from src.api.schemas import AgentRequest, AgentResponse, HealthCheck
from src.config import Settings, get_settings

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
    """应用生命周期管理"""
    # 启动:初始化 Agent
    app.state.agent = AgentService()
    await app.state.agent.initialize()
    yield
    # 关闭:清理资源
    await app.state.agent.cleanup()

app = FastAPI(
    title="Agent Service API",
    description="可复用的 AI Agent 微服务模板",
    version="1.0.0",
    lifespan=lifespan
)

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 依赖注入:获取 Agent 实例
async def get_agent() -> AgentService:
    from fastapi import Request
    return Request.app.state.agent

@app.get("/health", response_model=HealthCheck)
async def health_check():
    """健康检查端点"""
    return HealthCheck(
        status="healthy",
        version="1.0.0",
        agent_status="ready"
    )

@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(
    request: AgentRequest,
    agent: AgentService = Depends(get_agent)
):
    """
    运行 Agent 任务
    
    - **query**: 用户查询
    - **context**: 上下文信息(可选)
    - **session_id**: 会话 ID(用于保持上下文)
    """
    try:
        result = await agent.run(
            query=request.query,
            context=request.context,
            session_id=request.session_id
        )
        return AgentResponse(
            success=True,
            data=result,
            session_id=request.session_id
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/agent/stream")
async def stream_agent(
    request: AgentRequest,
    agent: AgentService = Depends(get_agent)
):
    """流式响应 Agent 任务"""
    from fastapi.responses import StreamingResponse
    
    async def event_generator():
        async for chunk in agent.run_stream(
            query=request.query,
            session_id=request.session_id
        ):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

@app.get("/agent/tools")
async def list_tools(agent: AgentService = Depends(get_agent)):
    """列出 Agent 可用的工具"""
    return {
        "tools": [
            {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.args_schema.schema() if tool.args_schema else None
            }
            for tool in agent.tools
        ]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2.3 核心 Agent 服务类模板

# src/agent/core.py
from typing import List, Dict, Any, Optional, AsyncIterator
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import BaseTool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

from src.agent.tools import get_default_tools
from src.agent.memory import ConversationMemory

class AgentService:
    """可复用的 Agent 服务类"""
    
    def __init__(
        self,
        model_name: str = "gpt-4",
        temperature: float = 0.0,
        max_iterations: int = 10
    ):
        self.model_name = model_name
        self.temperature = temperature
        self.max_iterations = max_iterations
        self.llm: Optional[ChatOpenAI] = None
        self.agent_executor: Optional[AgentExecutor] = None
        self.tools: List[BaseTool] = []
        self.memory_store: Dict[str, ConversationMemory] = {}
    
    async def initialize(self):
        """初始化 Agent"""
        self.llm = ChatOpenAI(
            model=self.model_name,
            temperature=self.temperature
        )
        
        # 加载工具
        self.tools = get_default_tools()
        
        # 构建 Agent
        prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个有用的 AI 助手,可以使用工具帮助用户解决问题。"),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        agent = create_openai_tools_agent(self.llm, self.tools, prompt)
        
        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=self.tools,
            max_iterations=self.max_iterations,
            verbose=True
        )
    
    async def run(
        self,
        query: str,
        context: Optional[Dict[str, Any]] = None,
        session_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """运行 Agent 任务"""
        # 获取或创建记忆
        memory = self._get_memory(session_id)
        
        # 执行 Agent
        result = await self.agent_executor.ainvoke({
            "input": query,
            "chat_history": memory.get_messages()
        })
        
        # 更新记忆
        memory.add_message(HumanMessage(content=query))
        memory.add_message(AIMessage(content=result["output"]))
        
        return {
            "answer": result["output"],
            "intermediate_steps": result.get("intermediate_steps", []),
            "session_id": session_id
        }
    
    async def run_stream(
        self,
        query: str,
        session_id: Optional[str] = None
    ) -> AsyncIterator[str]:
        """流式运行 Agent"""
        memory = self._get_memory(session_id)
        
        async for chunk in self.agent_executor.astream({
            "input": query,
            "chat_history": memory.get_messages()
        }):
            if "output" in chunk:
                yield chunk["output"]
    
    def _get_memory(self, session_id: Optional[str]) -> ConversationMemory:
        """获取会话记忆"""
        if session_id not in self.memory_store:
            self.memory_store[session_id] = ConversationMemory()
        return self.memory_store[session_id]
    
    async def cleanup(self):
        """清理资源"""
        self.memory_store.clear()

模板三:LangGraph Agent 可复用模板

3.1 基础 ReAct Agent 模板

"""
LangGraph ReAct Agent 可复用模板
支持:工具调用、记忆、人机协作、错误重试
"""
from typing import TypedDict, Annotated, Sequence, List, Optional
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode
import operator

class AgentState(TypedDict):
    """Agent 状态定义"""
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next_step: Optional[str]
    error_count: int
    metadata: dict

class ReActAgentTemplate:
    """
    可复用的 ReAct Agent 模板
    
    使用示例:
        agent = ReActAgentTemplate(tools=[search_tool, calculator_tool])
        graph = agent.build()
        result = graph.invoke({
            "messages": [HumanMessage(content="查询北京天气")]
        })
    """
    
    def __init__(
        self,
        tools: List[BaseTool],
        model_name: str = "gpt-4",
        temperature: float = 0,
        max_error_retries: int = 3
    ):
        self.tools = tools
        self.tool_node = ToolNode(tools)
        self.model = ChatOpenAI(
            model=model_name,
            temperature=temperature
        ).bind_tools(tools)
        self.max_error_retries = max_error_retries
    
    def _should_continue(self, state: AgentState) -> str:
        """决定下一步动作"""
        messages = state["messages"]
        last_message = messages[-1]
        
        # 检查错误重试次数
        if state.get("error_count", 0) >= self.max_error_retries:
            return "end"
        
        # 检查是否需要调用工具
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "continue"
        
        return "end"
    
    def _call_model(self, state: AgentState) -> dict:
        """调用 LLM"""
        messages = state["messages"]
        response = self.model.invoke(messages)
        return {"messages": [response]}
    
    def _handle_error(self, state: AgentState) -> dict:
        """错误处理节点"""
        error_count = state.get("error_count", 0) + 1
        return {
            "error_count": error_count,
            "messages": [AIMessage(content=f"执行出错,正在重试 ({error_count}/{self.max_error_retries})...")]
        }
    
    def build(self) -> StateGraph:
        """构建 Agent 图"""
        workflow = StateGraph(AgentState)
        
        # 定义节点
        workflow.add_node("agent", self._call_model)
        workflow.add_node("tools", self.tool_node)
        workflow.add_node("error_handler", self._handle_error)
        
        # 定义边
        workflow.set_entry_point("agent")
        
        workflow.add_conditional_edges(
            "agent",
            self._should_continue,
            {
                "continue": "tools",
                "end": END
            }
        )
        
        workflow.add_edge("tools", "agent")
        
        # 错误处理边(工具执行失败时)
        workflow.add_conditional_edges(
            "tools",
            lambda state: "error" if state.get("error") else "success",
            {
                "error": "error_handler",
                "success": "agent"
            }
        )
        
        workflow.add_edge("error_handler", "agent")
        
        # 添加记忆持久化
        checkpointer = MemorySaver()
        return workflow.compile(checkpointer=checkpointer)

模板四:MCP (Model Context Protocol) 服务模板

4.1 MCP Server 基础模板

"""
MCP Server 标准模板
将 Agent 能力封装为 MCP 服务,供其他 Agent 调用
"""
from typing import Any
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    ImageContent,
    EmbeddedResource,
    LoggingLevel
)
import mcp.types as types

class MCPServerTemplate:
    """
    MCP Server 模板类
    
    使用示例:
        server = MCPServerTemplate("my-service")
        server.add_tool(
            name="search",
            description="搜索信息",
            input_schema={...},
            handler=search_handler
        )
        server.run()
    """
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.server = Server(service_name)
        self.tools: Dict[str, Tool] = {}
        self.handlers: Dict[str, callable] = {}
        self._setup_handlers()
    
    def _setup_handlers(self):
        """设置 MCP 协议处理器"""
        
        @self.server.list_resources()
        async def handle_list_resources() -> List[Resource]:
            """列出可用资源"""
            return [
                Resource(
                    uri="template://docs",
                    name="服务文档",
                    description="API 使用文档",
                    mimeType="text/markdown"
                )
            ]
        
        @self.server.read_resource()
        async def handle_read_resource(uri: str) -> str:
            """读取资源"""
            if uri == "template://docs":
                return f"# {self.service_name} 服务文档\n\n这是一个 MCP 服务。"
            raise ValueError(f"未知资源: {uri}")
        
        @self.server.list_tools()
        async def handle_list_tools() -> List[Tool]:
            """列出可用工具"""
            return list(self.tools.values())
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str,
            arguments: Dict[str, Any] | None
        ) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
            """调用工具"""
            if name not in self.handlers:
                raise ValueError(f"未知工具: {name}")
            
            try:
                result = await self.handlers[name](arguments or {})
                return [types.TextContent(type="text", text=str(result))]
            except Exception as e:
                return [types.TextContent(type="text", text=f"错误: {str(e)}")]
    
    def add_tool(
        self,
        name: str,
        description: str,
        input_schema: Dict[str, Any],
        handler: callable
    ):
        """
        添加工具
        
        Args:
            name: 工具名称
            description: 工具描述(LLM 用)
            input_schema: JSON Schema 格式的参数定义
            handler: 处理函数
        """
        self.tools[name] = Tool(
            name=name,
            description=description,
            inputSchema=input_schema
        )
        self.handlers[name] = handler
    
    def run(self):
        """启动 MCP Server"""
        async def main():
            async with stdio_server() as (read_stream, write_stream):
                await self.server.run(
                    read_stream,
                    write_stream,
                    InitializationOptions(
                        server_name=self.service_name,
                        server_version="1.0.0",
                        capabilities=self.server.get_capabilities(
                            notification_options=NotificationOptions(),
                            experimental_capabilities={}
                        )
                    )
                )
        
        import asyncio
        asyncio.run(main())

# 使用示例
"""
server = MCPServerTemplate("weather-service")

async def get_weather_handler(args):
    city = args.get("city")
    return f"{city} 今天天气晴朗,25°C"

server.add_tool(
    name="get_weather",
    description="获取指定城市的天气信息",
    input_schema={
        "type": "object",
        "properties": {
            "city": {"type": "string", "description": "城市名称"}
        },
        "required": ["city"]
    },
    handler=get_weather_handler
)

if __name__ == "__main__":
    server.run()
"""

模板五:Docker & K8s 部署模板

5.1 Dockerfile 模板

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY pyproject.toml .

RUN useradd -m -u 1000 agent && chown -R agent:agent /app
USER agent

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

EXPOSE 8000

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

5.2 Kubernetes 部署模板

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-service
  template:
    metadata:
      labels:
        app: agent-service
    spec:
      containers:
      - name: agent
        image: agent-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: agent-service
spec:
  selector:
    app: agent-service
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP

快速使用指南

场景 1:将现有 API 封装为 Agent 工具

from api_schema_extractor import APISchemaExtractor
import yaml

# 加载 OpenAPI 规范
with open("api.yaml") as f:
    openapi_spec = yaml.safe_load(f)

# 提取 Function Schema
extractor = APISchemaExtractor()
schema = extractor.extract_from_openapi(openapi_spec, "/orders", "post")

# 使用提取的 schema 创建 Agent 工具

场景 2:部署 LangGraph Agent 为微服务

# 1. 使用模板创建项目
cp -r agent-service-template my-agent-service
cd my-agent-agent-service

# 2. 修改 src/agent/core.py 实现业务逻辑

# 3. 配置环境变量
cp .env.example .env
# 编辑 .env 添加 OPENAI_API_KEY

# 4. 本地运行
docker-compose up -d

# 5. 部署到 K8s
kubectl apply -f k8s/

场景 3:创建 MCP Server

from mcp_template import MCPServerTemplate

server = MCPServerTemplate("my-tools")

# 添加工具
server.add_tool(
    name="calculate",
    description="计算表达式",
    input_schema={...},
    handler=calc_handler
)

server.run()

手册整理时间:2026-03-06
参考:LangChain、OpenAI、MCP 官方文档及生产实践

相关文章