Skip to content

注解使用指南

AgentKit SDK 提供了一套简洁的装饰器(注解),用于快速构建不同类型的 Agent 应用。本文档详细介绍各种注解的使用方法和最佳实践。

Simple Agent 注解

AgentkitSimpleApp 是最常用的应用框架,提供标准的 HTTP 服务端点。

@app.entrypoint

定义 Agent 的主入口函数,处理来自 Platform 的调用请求。

函数签名

python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    pass

参数说明

  • payload (dict): 请求体,包含用户输入和配置

    • prompt (str): 用户输入的提示词
    • 其他自定义字段
  • headers (dict): 请求头,包含上下文信息

    • user_id (str): 用户 ID
    • session_id (str): 会话 ID
    • request_id (str): 请求追踪 ID
    • 其他自定义头部
  • 返回值 (str): Agent 的响应结果

完整示例

python
from agentkit.apps import AgentkitSimpleApp
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather

app = AgentkitSimpleApp()
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    """Agent 主入口函数"""
    # 1. 提取输入参数
    prompt = payload.get("prompt", "")
    user_id = headers.get("user_id", "anonymous")
    session_id = headers.get("session_id", "default")
    
    # 2. 调用 Agent 运行
    response = await runner.run(
        messages=prompt,
        user_id=user_id,
        session_id=session_id
    )
    
    # 3. 返回结果
    return response

注意事项

  1. 必须是异步函数:使用 async def 定义
  2. 参数顺序固定:第一个参数是 payload,第二个是 headers
  3. 返回字符串:返回值会被自动封装为 JSON 响应
  4. 错误处理:建议在函数内部处理异常

错误处理示例

python
import logging

logger = logging.getLogger(__name__)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    try:
        prompt = payload["prompt"]
        response = await runner.run(messages=prompt)
        return response
    except KeyError as e:
        logger.error(f"Missing required field: {e}")
        return f"Error: Missing required field {e}"
    except Exception as e:
        logger.error(f"Agent execution failed: {e}")
        return f"Error: {str(e)}"

@app.ping

定义健康检查函数,用于 Platform 和 Kubernetes 的健康探测。

函数签名

python
@app.ping
def ping() -> str:
    pass

参数说明

  • 无参数:健康检查函数不接收任何参数
  • 返回值 (str): 健康状态信息,通常返回 "pong" 或 "ok"

基本示例

python
@app.ping
def ping() -> str:
    """健康检查"""
    return "pong!"

高级示例:包含依赖检查

python
import redis
from sqlalchemy import create_engine

# 初始化依赖
redis_client = redis.Redis(host='localhost', port=6379)
db_engine = create_engine('postgresql://...')

@app.ping
def ping() -> str:
    """健康检查,包含依赖服务检测"""
    try:
        # 检查 Redis 连接
        redis_client.ping()
        
        # 检查数据库连接
        with db_engine.connect() as conn:
            conn.execute("SELECT 1")
        
        return "ok - all services healthy"
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return f"degraded - {str(e)}"

注意事项

  1. 无参数:函数签名必须为 def ping() -> str:
  2. 同步函数:使用 def 而非 async def
  3. 快速响应:应在 1 秒内返回,避免超时
  4. 轻量级检查:避免执行耗时操作

可用端点

健康检查函数会自动注册到以下端点:

  • GET /ping - 基础健康检查
  • GET /health - 健康状态
  • GET /readiness - Kubernetes 就绪探针
  • GET /liveness - Kubernetes 存活探针

@app.async_task(规划中)

用于定义异步任务,支持长时间运行的操作。

python
@app.async_task
async def process_long_task(task_id: str, data: dict) -> dict:
    """处理长时间运行的任务"""
    # 长时间运行的逻辑
    result = await heavy_computation(data)
    return result

注意:此功能正在规划中,暂未实现。

MCP Agent 注解

AgentkitMCPApp 用于构建 MCP (Model Context Protocol) 服务,将函数封装为标准的 MCP 工具。

@app.tool

将函数注册为 MCP 工具,供 LLM 调用。

函数签名

python
@app.tool
def tool_name(param1: str, param2: int) -> dict:
    pass

参数说明

  • 函数参数:工具的输入参数,必须有类型注解
  • 返回值:工具的执行结果,建议返回 dict 类型
  • 文档字符串:会作为工具描述,供 LLM 理解工具用途

基本示例

python
from agentkit.apps import AgentkitMCPApp

mcp_app = AgentkitMCPApp()

@mcp_app.tool
def get_city_weather(city: str) -> dict[str, str]:
    """获取城市天气信息
    
    Args:
        city: 城市名称(必须为英文)
        
    Returns:
        包含天气状况和温度的字典
    """
    weather_data = {
        "beijing": {"condition": "Sunny", "temperature": 25},
        "shanghai": {"condition": "Cloudy", "temperature": 22},
    }
    
    city = city.lower().strip()
    if city in weather_data:
        info = weather_data[city]
        return {"result": f"{info['condition']}, {info['temperature']}°C"}
    else:
        return {"result": f"Weather information not found for {city}"}

异步工具示例

python
import aiohttp

@mcp_app.tool
async def fetch_web_content(url: str) -> dict:
    """获取网页内容
    
    Args:
        url: 网页 URL
        
    Returns:
        包含网页内容的字典
    """
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            return {
                "status": response.status,
                "content": content[:1000]  # 截取前 1000 字符
            }

注意事项

  1. 类型注解必需:所有参数必须有类型注解,MCP 需要此信息生成工具模式
  2. 详细的文档字符串:文档字符串会作为工具描述,帮助 LLM 理解工具用途
  3. 同步和异步均可:支持 defasync def
  4. 返回结构化数据:建议返回 dict 类型,便于 LLM 解析
  5. 错误处理:在函数内部处理异常,避免工具调用失败

复杂参数示例

python
from typing import List, Optional
from pydantic import BaseModel

class SearchQuery(BaseModel):
    keywords: List[str]
    max_results: int = 10
    filters: Optional[dict] = None

@mcp_app.tool
def search_documents(query: SearchQuery) -> dict:
    """搜索文档
    
    Args:
        query: 搜索查询对象,包含关键词、结果数量和过滤条件
        
    Returns:
        搜索结果列表
    """
    # 实现搜索逻辑
    results = perform_search(
        keywords=query.keywords,
        max_results=query.max_results,
        filters=query.filters
    )
    return {"results": results, "count": len(results)}

@app.agent_as_a_tool

将整个 Agent 封装为 MCP 工具,实现 Agent 的组合和协作。

函数签名

python
@app.agent_as_a_tool
async def agent_tool(prompt: str) -> str:
    pass

使用示例

python
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather

# 创建一个专门的天气 Agent
weather_agent = Agent(tools=[get_city_weather])
weather_runner = Runner(agent=weather_agent)

mcp_app = AgentkitMCPApp()

@mcp_app.agent_as_a_tool
async def weather_assistant(prompt: str) -> str:
    """天气助手 Agent
    
    专门处理天气相关查询的 Agent,可以查询全球城市的天气信息。
    
    Args:
        prompt: 用户的天气查询
        
    Returns:
        天气查询结果
    """
    response = await weather_runner.run(messages=prompt)
    return response

多 Agent 协作示例

python
# 创建多个专门的 Agent
weather_agent = Agent(tools=[get_city_weather])
news_agent = Agent(tools=[get_latest_news])
calendar_agent = Agent(tools=[check_schedule])

@mcp_app.agent_as_a_tool
async def weather_assistant(query: str) -> str:
    """天气助手"""
    return await Runner(agent=weather_agent).run(messages=query)

@mcp_app.agent_as_a_tool
async def news_assistant(query: str) -> str:
    """新闻助手"""
    return await Runner(agent=news_agent).run(messages=query)

@mcp_app.agent_as_a_tool
async def calendar_assistant(query: str) -> str:
    """日程助手"""
    return await Runner(agent=calendar_agent).run(messages=query)

注意事项

  1. Agent 职责单一:每个 Agent 应专注于特定领域
  2. 清晰的描述:文档字符串要明确 Agent 的能力边界
  3. 异步执行:通常是异步函数
  4. 合理的超时:设置合理的执行超时时间

A2A Agent 注解

AgentkitA2aApp 用于构建 A2A (Agent-to-Agent) 应用,支持 Agent 之间的通信和协作。

@app.agent_executor

注册 Agent 执行器,定义 Agent 的执行逻辑。

函数签名

python
@app.agent_executor(runner=runner, **kwargs)
class MyAgentExecutor(A2aAgentExecutor):
    pass

参数说明

  • runner: Agent 的运行器实例
  • kwargs: 其他配置参数

基本示例

python
from agentkit.apps import AgentkitA2aApp
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from veadk import Agent, Runner
from veadk.a2a.agent_card import get_agent_card
from veadk.tools.demo_tools import get_city_weather

# 创建 A2A 应用
a2a_app = AgentkitA2aApp()

# 创建 Agent
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)

# 注册执行器
@a2a_app.agent_executor(runner=runner)
class WeatherAgentExecutor(A2aAgentExecutor):
    """天气查询 Agent 执行器"""
    pass

# 运行应用
if __name__ == "__main__":
    a2a_app.run(
        agent_card=get_agent_card(agent=agent, url="http://127.0.0.1:8000"),
        host="127.0.0.1",
        port=8000,
    )

自定义执行器示例

python
from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue

@a2a_app.agent_executor(runner=runner)
class CustomAgentExecutor(AgentExecutor):
    """自定义 Agent 执行器"""
    
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> str:
        """执行 Agent 逻辑
        
        Args:
            context: 请求上下文,包含输入消息和历史
            event_queue: 事件队列,用于发送中间结果
            
        Returns:
            Agent 的响应结果
        """
        # 从上下文提取输入
        user_message = context.current_input
        
        # 发送中间事件(可选)
        await event_queue.put({
            "type": "thinking",
            "content": "正在思考..."
        })
        
        # 执行 Agent
        response = await self.runner.run(messages=user_message)
        
        return response

注意事项

  1. 继承 AgentExecutor:必须继承自 AgentExecutor 或其子类
  2. 提供 runner:必须通过参数传入 runner 实例
  3. 实现 execute 方法:如需自定义逻辑,重写 execute 方法
  4. 使用 event_queue:通过事件队列发送中间状态

@app.task_store

注册任务存储,用于持久化 A2A 任务状态。

函数签名

python
@app.task_store(**kwargs)
class MyTaskStore(TaskStore):
    pass

使用默认存储

python
# 不指定 task_store,会使用内存存储(InMemoryTaskStore)
a2a_app = AgentkitA2aApp()

@a2a_app.agent_executor(runner=runner)
class MyExecutor(A2aAgentExecutor):
    pass

自定义任务存储示例

python
from a2a.server.tasks.task_store import TaskStore
from a2a.types import Task
import redis

@a2a_app.task_store(redis_url="redis://localhost:6379")
class RedisTaskStore(TaskStore):
    """基于 Redis 的任务存储"""
    
    def __init__(self, redis_url: str):
        self.redis_client = redis.from_url(redis_url)
    
    async def save_task(self, task: Task) -> None:
        """保存任务"""
        task_data = task.model_dump_json()
        self.redis_client.set(f"task:{task.id}", task_data)
    
    async def get_task(self, task_id: str) -> Task | None:
        """获取任务"""
        task_data = self.redis_client.get(f"task:{task_id}")
        if task_data:
            return Task.model_validate_json(task_data)
        return None
    
    async def delete_task(self, task_id: str) -> None:
        """删除任务"""
        self.redis_client.delete(f"task:{task_id}")

注意事项

  1. 可选装饰器:如不指定,使用内存存储
  2. 继承 TaskStore:必须继承自 TaskStore
  3. 实现必需方法:实现 save_taskget_taskdelete_task 等方法
  4. 持久化存储:生产环境建议使用持久化存储(Redis、数据库等)

最佳实践

1. 错误处理

所有装饰的函数都应该有良好的错误处理:

python
import logging

logger = logging.getLogger(__name__)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    try:
        # 业务逻辑
        result = await process(payload)
        return result
    except ValueError as e:
        logger.warning(f"Invalid input: {e}")
        return f"Invalid input: {str(e)}"
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return "An error occurred. Please try again later."

2. 日志记录

添加适当的日志以便追踪问题:

python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    request_id = headers.get("request_id", "unknown")
    logger.info(f"[{request_id}] Processing request: {payload}")
    
    try:
        response = await runner.run(messages=payload["prompt"])
        logger.info(f"[{request_id}] Request completed successfully")
        return response
    except Exception as e:
        logger.error(f"[{request_id}] Request failed: {e}")
        raise

3. 类型注解

使用完整的类型注解提高代码质量:

python
from typing import Dict, Any

@app.entrypoint
async def run(payload: Dict[str, Any], headers: Dict[str, str]) -> str:
    prompt: str = payload["prompt"]
    user_id: str = headers.get("user_id", "anonymous")
    # ...

4. 文档字符串

为所有函数添加详细的文档字符串:

python
@mcp_app.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> dict:
    """计算两个地理坐标之间的距离
    
    使用 Haversine 公式计算地球表面两点间的大圆距离。
    
    Args:
        lat1: 起点纬度(度)
        lon1: 起点经度(度)
        lat2: 终点纬度(度)
        lon2: 终点经度(度)
        
    Returns:
        包含距离信息的字典,距离单位为公里
        
    Example:
        >>> calculate_distance(39.9, 116.4, 31.2, 121.5)
        {"distance_km": 1067.5}
    """
    # 实现逻辑

5. 性能优化

避免在装饰的函数中执行耗时初始化:

python
# ❌ 不好的做法
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    agent = Agent(tools=[...])  # 每次请求都创建
    runner = Runner(agent=agent)
    return await runner.run(messages=payload["prompt"])

# ✅ 好的做法
agent = Agent(tools=[...])  # 只创建一次
runner = Runner(agent=agent)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    return await runner.run(messages=payload["prompt"])

下一步

  • 查看 samples/ 目录下的完整示例代码
  • 阅读 SDK 概览 了解整体架构
  • 部署应用到 AgentKit Platform

Released under the Apache-2.0 License.