Annotation Usage Guide
AgentKit SDK provides a concise set of decorators (annotations) for rapidly building different types of Agent applications. This document details the usage and best practices for various annotations.
Simple Agent Annotations
AgentkitSimpleApp is the most commonly used application framework, providing standard HTTP service endpoints.
@app.entrypoint
Defines the main entry function for the Agent, handling invocation requests from the Platform.
Function Signature
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
passParameter Description
payload (dict): Request body, containing user input and configuration
prompt(str): User input prompt- Other custom fields
headers (dict): Request headers, containing context information
user_id(str): User IDsession_id(str): Session IDrequest_id(str): Request trace ID- Other custom headers
Return Value (str): Agent's response result
Complete Example
from agentkit.apps import AgentkitSimpleApp
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather
app = AgentkitSimpleApp()
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
"""Agent main entry function"""
# 1. Extract input parameters
prompt = payload.get("prompt", "")
user_id = headers.get("user_id", "anonymous")
session_id = headers.get("session_id", "default")
# 2. Invoke Agent execution
response = await runner.run(
messages=prompt,
user_id=user_id,
session_id=session_id
)
# 3. Return result
return responseNotes
- Must be async function: Use
async defto define - Fixed parameter order: First parameter is payload, second is headers
- Return string: Return value is automatically encapsulated as JSON response
- Error handling: Recommended to handle exceptions within the function
Error Handling Example
import logging
logger = logging.getLogger(__name__)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
try:
prompt = payload["prompt"]
response = await runner.run(messages=prompt)
return response
except KeyError as e:
logger.error(f"Missing required field: {e}")
return f"Error: Missing required field {e}"
except Exception as e:
logger.error(f"Agent execution failed: {e}")
return f"Error: {str(e)}"@app.ping
Defines the health check function, used for Platform and Kubernetes health probes.
Function Signature
@app.ping
def ping() -> str:
passParameter Description
- No parameters: Health check function does not accept any parameters
- Return Value (str): Health status information, usually returns "pong" or "ok"
Basic Example
@app.ping
def ping() -> str:
"""Health check"""
return "pong!"Advanced Example: Including Dependency Checks
import redis
from sqlalchemy import create_engine
# Initialize dependencies
redis_client = redis.Redis(host='localhost', port=6379)
db_engine = create_engine('postgresql://...')
@app.ping
def ping() -> str:
"""Health check, including dependency service detection"""
try:
# Check Redis connection
redis_client.ping()
# Check database connection
with db_engine.connect() as conn:
conn.execute("SELECT 1")
return "ok - all services healthy"
except Exception as e:
logger.error(f"Health check failed: {e}")
return f"degraded - {str(e)}"Notes
- No parameters: Function signature must be
def ping() -> str: - Synchronous function: Use
defrather thanasync def - Quick response: Should return within 1 second to avoid timeout
- Lightweight check: Avoid performing time-consuming operations
Available Endpoints
Health check functions are automatically registered to the following endpoints:
GET /ping- Basic health checkGET /health- Health statusGET /readiness- Kubernetes readiness probeGET /liveness- Kubernetes liveness probe
@app.async_task (Planned)
Used to define asynchronous tasks, supporting long-running operations.
@app.async_task
async def process_long_task(task_id: str, data: dict) -> dict:
"""Process long-running tasks"""
# Long-running logic
result = await heavy_computation(data)
return resultNote: This feature is planned and not yet implemented.
MCP Agent Annotations
AgentkitMCPApp is used to build MCP (Model Context Protocol) services, encapsulating functions as standard MCP tools.
@app.tool
Registers a function as an MCP tool for LLM invocation.
Function Signature
@app.tool
def tool_name(param1: str, param2: int) -> dict:
passParameter Description
- Function parameters: Tool input parameters, must have type annotations
- Return Value: Tool execution result, recommended to return dict type
- Docstring: Used as tool description for LLM to understand tool purpose
Basic Example
from agentkit.apps import AgentkitMCPApp
mcp_app = AgentkitMCPApp()
@mcp_app.tool
def get_city_weather(city: str) -> dict[str, str]:
"""Get city weather information
Args:
city: City name (must be in English)
Returns:
Dictionary containing weather conditions and temperature
"""
weather_data = {
"beijing": {"condition": "Sunny", "temperature": 25},
"shanghai": {"condition": "Cloudy", "temperature": 22},
}
city = city.lower().strip()
if city in weather_data:
info = weather_data[city]
return {"result": f"{info['condition']}, {info['temperature']}°C"}
else:
return {"result": f"Weather information not found for {city}"}Async Tool Example
import aiohttp
@mcp_app.tool
async def fetch_web_content(url: str) -> dict:
"""Fetch web page content
Args:
url: Web page URL
Returns:
Dictionary containing web page content
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return {
"status": response.status,
"content": content[:1000] # Extract first 1000 characters
}Notes
- Type annotations required: All parameters must have type annotations; MCP needs this information to generate tool schema
- Detailed docstrings: Docstrings serve as tool descriptions to help LLM understand tool purpose
- Sync and async both supported: Supports
defandasync def - Return structured data: Recommended to return dict type for easy LLM parsing
- Error handling: Handle exceptions within the function to avoid tool call failures
Complex Parameter Example
from typing import List, Optional
from pydantic import BaseModel
class SearchQuery(BaseModel):
keywords: List[str]
max_results: int = 10
filters: Optional[dict] = None
@mcp_app.tool
def search_documents(query: SearchQuery) -> dict:
"""Search documents
Args:
query: Search query object, containing keywords, result count, and filter conditions
Returns:
Search results list
"""
# Implement search logic
results = perform_search(
keywords=query.keywords,
max_results=query.max_results,
filters=query.filters
)
return {"results": results, "count": len(results)}@app.agent_as_a_tool
Encapsulates the entire Agent as an MCP tool, enabling Agent composition and collaboration.
Function Signature
@app.agent_as_a_tool
async def agent_tool(prompt: str) -> str:
passUsage Example
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather
# Create a dedicated weather Agent
weather_agent = Agent(tools=[get_city_weather])
weather_runner = Runner(agent=weather_agent)
mcp_app = AgentkitMCPApp()
@mcp_app.agent_as_a_tool
async def weather_assistant(prompt: str) -> str:
"""Weather assistant Agent
An Agent specifically designed to handle weather-related queries, capable of querying weather information for cities worldwide.
Args:
prompt: User's weather query
Returns:
Weather query result
"""
response = await weather_runner.run(messages=prompt)
return responseMulti-Agent Collaboration Example
# Create multiple dedicated Agents
weather_agent = Agent(tools=[get_city_weather])
news_agent = Agent(tools=[get_latest_news])
calendar_agent = Agent(tools=[check_schedule])
@mcp_app.agent_as_a_tool
async def weather_assistant(query: str) -> str:
"""Weather assistant"""
return await Runner(agent=weather_agent).run(messages=query)
@mcp_app.agent_as_a_tool
async def news_assistant(query: str) -> str:
"""News assistant"""
return await Runner(agent=news_agent).run(messages=query)
@mcp_app.agent_as_a_tool
async def calendar_assistant(query: str) -> str:
"""Calendar assistant"""
return await Runner(agent=calendar_agent).run(messages=query)Notes
- Single Agent responsibility: Each Agent should focus on a specific domain
- Clear descriptions: Docstrings should clearly define the Agent's capability boundaries
- Async execution: Usually an async function
- Reasonable timeout: Set a reasonable execution timeout
A2A Agent Annotations
AgentkitA2aApp is used to build A2A (Agent-to-Agent) applications, supporting communication and collaboration between Agents.
@app.agent_executor
Registers an Agent executor, defining the Agent's execution logic.
Function Signature
@app.agent_executor(runner=runner, **kwargs)
class MyAgentExecutor(A2aAgentExecutor):
passParameter Description
- runner: Agent runner instance
- kwargs: Other configuration parameters
Basic Example
from agentkit.apps import AgentkitA2aApp
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from veadk import Agent, Runner
from veadk.a2a.agent_card import get_agent_card
from veadk.tools.demo_tools import get_city_weather
# Create A2A application
a2a_app = AgentkitA2aApp()
# Create Agent
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)
# Register executor
@a2a_app.agent_executor(runner=runner)
class WeatherAgentExecutor(A2aAgentExecutor):
"""Weather query Agent executor"""
pass
# Run application
if __name__ == "__main__":
a2a_app.run(
agent_card=get_agent_card(agent=agent, url="http://127.0.0.1:8000"),
host="127.0.0.1",
port=8000,
)Custom Executor Example
from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
@a2a_app.agent_executor(runner=runner)
class CustomAgentExecutor(AgentExecutor):
"""Custom Agent executor"""
async def execute(
self,
context: RequestContext,
event_queue: EventQueue
) -> str:
"""Execute Agent logic
Args:
context: Request context, containing input messages and history
event_queue: Event queue, used for sending intermediate results
Returns:
Agent's response result
"""
# Extract input from context
user_message = context.current_input
# Send intermediate events (optional)
await event_queue.put({
"type": "thinking",
"content": "Thinking..."
})
# Execute Agent
response = await self.runner.run(messages=user_message)
return responseNotes
- Inherit AgentExecutor: Must inherit from
AgentExecutoror its subclasses - Provide runner: Must pass runner instance through parameters
- Implement execute method: To customize logic, override the
executemethod - Use event_queue: Send intermediate states through the event queue
@app.task_store
Registers task storage for persisting A2A task states.
Function Signature
@app.task_store(**kwargs)
class MyTaskStore(TaskStore):
passUsing Default Storage
# Without specifying task_store, in-memory storage (InMemoryTaskStore) will be used
a2a_app = AgentkitA2aApp()
@a2a_app.agent_executor(runner=runner)
class MyExecutor(A2aAgentExecutor):
passCustom Task Storage Example
from a2a.server.tasks.task_store import TaskStore
from a2a.types import Task
import redis
@a2a_app.task_store(redis_url="redis://localhost:6379")
class RedisTaskStore(TaskStore):
"""Redis-based task storage"""
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url)
async def save_task(self, task: Task) -> None:
"""Save task"""
task_data = task.model_dump_json()
self.redis_client.set(f"task:{task.id}", task_data)
async def get_task(self, task_id: str) -> Task | None:
"""Get task"""
task_data = self.redis_client.get(f"task:{task_id}")
if task_data:
return Task.model_validate_json(task_data)
return None
async def delete_task(self, task_id: str) -> None:
"""Delete task"""
self.redis_client.delete(f"task:{task_id}")Notes
- Optional decorator: If not specified, in-memory storage is used
- Inherit TaskStore: Must inherit from
TaskStore - Implement required methods: Implement
save_task,get_task,delete_task, and other methods - Persistent storage: For production environments, persistent storage (Redis, database, etc.) is recommended
Best Practices
1. Error Handling
All decorated functions should have good error handling:
import logging
logger = logging.getLogger(__name__)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
try:
# Business logic
result = await process(payload)
return result
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return f"Invalid input: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return "An error occurred. Please try again later."2. Logging
Add appropriate logs for problem tracking:
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
request_id = headers.get("request_id", "unknown")
logger.info(f"[{request_id}] Processing request: {payload}")
try:
response = await runner.run(messages=payload["prompt"])
logger.info(f"[{request_id}] Request completed successfully")
return response
except Exception as e:
logger.error(f"[{request_id}] Request failed: {e}")
raise3. Type Annotations
Use complete type annotations to improve code quality:
from typing import Dict, Any
@app.entrypoint
async def run(payload: Dict[str, Any], headers: Dict[str, str]) -> str:
prompt: str = payload["prompt"]
user_id: str = headers.get("user_id", "anonymous")
# ...4. Docstrings
Add detailed docstrings for all functions:
@mcp_app.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> dict:
"""Calculate the distance between two geographic coordinates
Uses the Haversine formula to calculate the great-circle distance between two points on the Earth's surface.
Args:
lat1: Starting latitude (degrees)
lon1: Starting longitude (degrees)
lat2: Ending latitude (degrees)
lon2: Ending longitude (degrees)
Returns:
Dictionary containing distance information, distance unit is kilometers
Example:
>>> calculate_distance(39.9, 116.4, 31.2, 121.5)
{"distance_km": 1067.5}
"""
# Implementation logic5. Performance Optimization
Avoid performing time-consuming initialization in decorated functions:
# ❌ Bad practice
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
agent = Agent(tools=[...]) # Created on every request
runner = Runner(agent=agent)
return await runner.run(messages=payload["prompt"])
# ✅ Good practice
agent = Agent(tools=[...]) # Created only once
runner = Runner(agent=agent)
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
return await runner.run(messages=payload["prompt"])Next Steps
- Check the complete example code in the
samples/directory - Read the SDK Overview to understand the overall architecture
- Deploy the application to AgentKit Platform
