注解使用指南
AgentKit SDK 提供了一套简洁的装饰器(注解),用于快速构建不同类型的 Agent 应用。本文档详细介绍各种注解的使用方法和最佳实践。
Simple Agent 注解
AgentkitSimpleApp 是最常用的应用框架,提供标准的 HTTP 服务端点。
@app.entrypoint
定义 Agent 的主入口函数,处理来自 Platform 的调用请求。
函数签名
python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
pass参数说明
payload (dict): 请求体,包含用户输入和配置
prompt(str): 用户输入的提示词- 其他自定义字段
headers (dict): 请求头,包含上下文信息
user_id(str): 用户 IDsession_id(str): 会话 IDrequest_id(str): 请求追踪 ID- 其他自定义头部
返回值 (str): Agent 的响应结果
完整示例
python
from agentkit.apps import AgentkitSimpleApp
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather
app = AgentkitSimpleApp()
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
"""Agent 主入口函数"""
# 1. 提取输入参数
prompt = payload.get("prompt", "")
user_id = headers.get("user_id", "anonymous")
session_id = headers.get("session_id", "default")
# 2. 调用 Agent 运行
response = await runner.run(
messages=prompt,
user_id=user_id,
session_id=session_id
)
# 3. 返回结果
return response注意事项
- 必须是异步函数:使用
async def定义 - 参数顺序固定:第一个参数是 payload,第二个是 headers
- 返回字符串:返回值会被自动封装为 JSON 响应
- 错误处理:建议在函数内部处理异常
错误处理示例
python
import logging
logger = logging.getLogger(__name__)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
try:
prompt = payload["prompt"]
response = await runner.run(messages=prompt)
return response
except KeyError as e:
logger.error(f"Missing required field: {e}")
return f"Error: Missing required field {e}"
except Exception as e:
logger.error(f"Agent execution failed: {e}")
return f"Error: {str(e)}"@app.ping
定义健康检查函数,用于 Platform 和 Kubernetes 的健康探测。
函数签名
python
@app.ping
def ping() -> str:
pass参数说明
- 无参数:健康检查函数不接收任何参数
- 返回值 (str): 健康状态信息,通常返回 "pong" 或 "ok"
基本示例
python
@app.ping
def ping() -> str:
"""健康检查"""
return "pong!"高级示例:包含依赖检查
python
import redis
from sqlalchemy import create_engine
# 初始化依赖
redis_client = redis.Redis(host='localhost', port=6379)
db_engine = create_engine('postgresql://...')
@app.ping
def ping() -> str:
"""健康检查,包含依赖服务检测"""
try:
# 检查 Redis 连接
redis_client.ping()
# 检查数据库连接
with db_engine.connect() as conn:
conn.execute("SELECT 1")
return "ok - all services healthy"
except Exception as e:
logger.error(f"Health check failed: {e}")
return f"degraded - {str(e)}"注意事项
- 无参数:函数签名必须为
def ping() -> str: - 同步函数:使用
def而非async def - 快速响应:应在 1 秒内返回,避免超时
- 轻量级检查:避免执行耗时操作
可用端点
健康检查函数会自动注册到以下端点:
GET /ping- 基础健康检查GET /health- 健康状态GET /readiness- Kubernetes 就绪探针GET /liveness- Kubernetes 存活探针
@app.async_task(规划中)
用于定义异步任务,支持长时间运行的操作。
python
@app.async_task
async def process_long_task(task_id: str, data: dict) -> dict:
"""处理长时间运行的任务"""
# 长时间运行的逻辑
result = await heavy_computation(data)
return result注意:此功能正在规划中,暂未实现。
MCP Agent 注解
AgentkitMCPApp 用于构建 MCP (Model Context Protocol) 服务,将函数封装为标准的 MCP 工具。
@app.tool
将函数注册为 MCP 工具,供 LLM 调用。
函数签名
python
@app.tool
def tool_name(param1: str, param2: int) -> dict:
pass参数说明
- 函数参数:工具的输入参数,必须有类型注解
- 返回值:工具的执行结果,建议返回 dict 类型
- 文档字符串:会作为工具描述,供 LLM 理解工具用途
基本示例
python
from agentkit.apps import AgentkitMCPApp
mcp_app = AgentkitMCPApp()
@mcp_app.tool
def get_city_weather(city: str) -> dict[str, str]:
"""获取城市天气信息
Args:
city: 城市名称(必须为英文)
Returns:
包含天气状况和温度的字典
"""
weather_data = {
"beijing": {"condition": "Sunny", "temperature": 25},
"shanghai": {"condition": "Cloudy", "temperature": 22},
}
city = city.lower().strip()
if city in weather_data:
info = weather_data[city]
return {"result": f"{info['condition']}, {info['temperature']}°C"}
else:
return {"result": f"Weather information not found for {city}"}异步工具示例
python
import aiohttp
@mcp_app.tool
async def fetch_web_content(url: str) -> dict:
"""获取网页内容
Args:
url: 网页 URL
Returns:
包含网页内容的字典
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return {
"status": response.status,
"content": content[:1000] # 截取前 1000 字符
}注意事项
- 类型注解必需:所有参数必须有类型注解,MCP 需要此信息生成工具模式
- 详细的文档字符串:文档字符串会作为工具描述,帮助 LLM 理解工具用途
- 同步和异步均可:支持
def和async def - 返回结构化数据:建议返回 dict 类型,便于 LLM 解析
- 错误处理:在函数内部处理异常,避免工具调用失败
复杂参数示例
python
from typing import List, Optional
from pydantic import BaseModel
class SearchQuery(BaseModel):
keywords: List[str]
max_results: int = 10
filters: Optional[dict] = None
@mcp_app.tool
def search_documents(query: SearchQuery) -> dict:
"""搜索文档
Args:
query: 搜索查询对象,包含关键词、结果数量和过滤条件
Returns:
搜索结果列表
"""
# 实现搜索逻辑
results = perform_search(
keywords=query.keywords,
max_results=query.max_results,
filters=query.filters
)
return {"results": results, "count": len(results)}@app.agent_as_a_tool
将整个 Agent 封装为 MCP 工具,实现 Agent 的组合和协作。
函数签名
python
@app.agent_as_a_tool
async def agent_tool(prompt: str) -> str:
pass使用示例
python
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather
# 创建一个专门的天气 Agent
weather_agent = Agent(tools=[get_city_weather])
weather_runner = Runner(agent=weather_agent)
mcp_app = AgentkitMCPApp()
@mcp_app.agent_as_a_tool
async def weather_assistant(prompt: str) -> str:
"""天气助手 Agent
专门处理天气相关查询的 Agent,可以查询全球城市的天气信息。
Args:
prompt: 用户的天气查询
Returns:
天气查询结果
"""
response = await weather_runner.run(messages=prompt)
return response多 Agent 协作示例
python
# 创建多个专门的 Agent
weather_agent = Agent(tools=[get_city_weather])
news_agent = Agent(tools=[get_latest_news])
calendar_agent = Agent(tools=[check_schedule])
@mcp_app.agent_as_a_tool
async def weather_assistant(query: str) -> str:
"""天气助手"""
return await Runner(agent=weather_agent).run(messages=query)
@mcp_app.agent_as_a_tool
async def news_assistant(query: str) -> str:
"""新闻助手"""
return await Runner(agent=news_agent).run(messages=query)
@mcp_app.agent_as_a_tool
async def calendar_assistant(query: str) -> str:
"""日程助手"""
return await Runner(agent=calendar_agent).run(messages=query)注意事项
- Agent 职责单一:每个 Agent 应专注于特定领域
- 清晰的描述:文档字符串要明确 Agent 的能力边界
- 异步执行:通常是异步函数
- 合理的超时:设置合理的执行超时时间
A2A Agent 注解
AgentkitA2aApp 用于构建 A2A (Agent-to-Agent) 应用,支持 Agent 之间的通信和协作。
@app.agent_executor
注册 Agent 执行器,定义 Agent 的执行逻辑。
函数签名
python
@app.agent_executor(runner=runner, **kwargs)
class MyAgentExecutor(A2aAgentExecutor):
pass参数说明
- runner: Agent 的运行器实例
- kwargs: 其他配置参数
基本示例
python
from agentkit.apps import AgentkitA2aApp
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from veadk import Agent, Runner
from veadk.a2a.agent_card import get_agent_card
from veadk.tools.demo_tools import get_city_weather
# 创建 A2A 应用
a2a_app = AgentkitA2aApp()
# 创建 Agent
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)
# 注册执行器
@a2a_app.agent_executor(runner=runner)
class WeatherAgentExecutor(A2aAgentExecutor):
"""天气查询 Agent 执行器"""
pass
# 运行应用
if __name__ == "__main__":
a2a_app.run(
agent_card=get_agent_card(agent=agent, url="http://127.0.0.1:8000"),
host="127.0.0.1",
port=8000,
)自定义执行器示例
python
from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
@a2a_app.agent_executor(runner=runner)
class CustomAgentExecutor(AgentExecutor):
"""自定义 Agent 执行器"""
async def execute(
self,
context: RequestContext,
event_queue: EventQueue
) -> str:
"""执行 Agent 逻辑
Args:
context: 请求上下文,包含输入消息和历史
event_queue: 事件队列,用于发送中间结果
Returns:
Agent 的响应结果
"""
# 从上下文提取输入
user_message = context.current_input
# 发送中间事件(可选)
await event_queue.put({
"type": "thinking",
"content": "正在思考..."
})
# 执行 Agent
response = await self.runner.run(messages=user_message)
return response注意事项
- 继承 AgentExecutor:必须继承自
AgentExecutor或其子类 - 提供 runner:必须通过参数传入 runner 实例
- 实现 execute 方法:如需自定义逻辑,重写
execute方法 - 使用 event_queue:通过事件队列发送中间状态
@app.task_store
注册任务存储,用于持久化 A2A 任务状态。
函数签名
python
@app.task_store(**kwargs)
class MyTaskStore(TaskStore):
pass使用默认存储
python
# 不指定 task_store,会使用内存存储(InMemoryTaskStore)
a2a_app = AgentkitA2aApp()
@a2a_app.agent_executor(runner=runner)
class MyExecutor(A2aAgentExecutor):
pass自定义任务存储示例
python
from a2a.server.tasks.task_store import TaskStore
from a2a.types import Task
import redis
@a2a_app.task_store(redis_url="redis://localhost:6379")
class RedisTaskStore(TaskStore):
"""基于 Redis 的任务存储"""
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url)
async def save_task(self, task: Task) -> None:
"""保存任务"""
task_data = task.model_dump_json()
self.redis_client.set(f"task:{task.id}", task_data)
async def get_task(self, task_id: str) -> Task | None:
"""获取任务"""
task_data = self.redis_client.get(f"task:{task_id}")
if task_data:
return Task.model_validate_json(task_data)
return None
async def delete_task(self, task_id: str) -> None:
"""删除任务"""
self.redis_client.delete(f"task:{task_id}")注意事项
- 可选装饰器:如不指定,使用内存存储
- 继承 TaskStore:必须继承自
TaskStore - 实现必需方法:实现
save_task、get_task、delete_task等方法 - 持久化存储:生产环境建议使用持久化存储(Redis、数据库等)
最佳实践
1. 错误处理
所有装饰的函数都应该有良好的错误处理:
python
import logging
logger = logging.getLogger(__name__)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
try:
# 业务逻辑
result = await process(payload)
return result
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return f"Invalid input: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return "An error occurred. Please try again later."2. 日志记录
添加适当的日志以便追踪问题:
python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
request_id = headers.get("request_id", "unknown")
logger.info(f"[{request_id}] Processing request: {payload}")
try:
response = await runner.run(messages=payload["prompt"])
logger.info(f"[{request_id}] Request completed successfully")
return response
except Exception as e:
logger.error(f"[{request_id}] Request failed: {e}")
raise3. 类型注解
使用完整的类型注解提高代码质量:
python
from typing import Dict, Any
@app.entrypoint
async def run(payload: Dict[str, Any], headers: Dict[str, str]) -> str:
prompt: str = payload["prompt"]
user_id: str = headers.get("user_id", "anonymous")
# ...4. 文档字符串
为所有函数添加详细的文档字符串:
python
@mcp_app.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> dict:
"""计算两个地理坐标之间的距离
使用 Haversine 公式计算地球表面两点间的大圆距离。
Args:
lat1: 起点纬度(度)
lon1: 起点经度(度)
lat2: 终点纬度(度)
lon2: 终点经度(度)
Returns:
包含距离信息的字典,距离单位为公里
Example:
>>> calculate_distance(39.9, 116.4, 31.2, 121.5)
{"distance_km": 1067.5}
"""
# 实现逻辑5. 性能优化
避免在装饰的函数中执行耗时初始化:
python
# ❌ 不好的做法
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
agent = Agent(tools=[...]) # 每次请求都创建
runner = Runner(agent=agent)
return await runner.run(messages=payload["prompt"])
# ✅ 好的做法
agent = Agent(tools=[...]) # 只创建一次
runner = Runner(agent=agent)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
return await runner.run(messages=payload["prompt"])下一步
- 查看
samples/目录下的完整示例代码 - 阅读 SDK 概览 了解整体架构
- 部署应用到 AgentKit Platform
