Skip to content

Annotation Usage Guide

AgentKit SDK provides a concise set of decorators (annotations) for rapidly building different types of Agent applications. This document details the usage and best practices for various annotations.

Simple Agent Annotations

AgentkitSimpleApp is the most commonly used application framework, providing standard HTTP service endpoints.

@app.entrypoint

Defines the main entry function for the Agent, handling invocation requests from the Platform.

Function Signature

python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    pass

Parameter Description

  • payload (dict): Request body, containing user input and configuration

    • prompt (str): User input prompt
    • Other custom fields
  • headers (dict): Request headers, containing context information

    • user_id (str): User ID
    • session_id (str): Session ID
    • request_id (str): Request trace ID
    • Other custom headers
  • Return Value (str): Agent's response result

Complete Example

python
from agentkit.apps import AgentkitSimpleApp
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather

app = AgentkitSimpleApp()
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    """Agent main entry function"""
    # 1. Extract input parameters
    prompt = payload.get("prompt", "")
    user_id = headers.get("user_id", "anonymous")
    session_id = headers.get("session_id", "default")

    # 2. Invoke Agent execution
    response = await runner.run(
        messages=prompt,
        user_id=user_id,
        session_id=session_id
    )

    # 3. Return result
    return response

Notes

  1. Must be async function: Use async def to define
  2. Fixed parameter order: First parameter is payload, second is headers
  3. Return string: Return value is automatically encapsulated as JSON response
  4. Error handling: Recommended to handle exceptions within the function

Error Handling Example

python
import logging

logger = logging.getLogger(__name__)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    try:
        prompt = payload["prompt"]
        response = await runner.run(messages=prompt)
        return response
    except KeyError as e:
        logger.error(f"Missing required field: {e}")
        return f"Error: Missing required field {e}"
    except Exception as e:
        logger.error(f"Agent execution failed: {e}")
        return f"Error: {str(e)}"

@app.ping

Defines the health check function, used for Platform and Kubernetes health probes.

Function Signature

python
@app.ping
def ping() -> str:
    pass

Parameter Description

  • No parameters: Health check function does not accept any parameters
  • Return Value (str): Health status information, usually returns "pong" or "ok"

Basic Example

python
@app.ping
def ping() -> str:
    """Health check"""
    return "pong!"

Advanced Example: Including Dependency Checks

python
import redis
from sqlalchemy import create_engine

# Initialize dependencies
redis_client = redis.Redis(host='localhost', port=6379)
db_engine = create_engine('postgresql://...')

@app.ping
def ping() -> str:
    """Health check, including dependency service detection"""
    try:
        # Check Redis connection
        redis_client.ping()

        # Check database connection
        with db_engine.connect() as conn:
            conn.execute("SELECT 1")

        return "ok - all services healthy"
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return f"degraded - {str(e)}"

Notes

  1. No parameters: Function signature must be def ping() -> str:
  2. Synchronous function: Use def rather than async def
  3. Quick response: Should return within 1 second to avoid timeout
  4. Lightweight check: Avoid performing time-consuming operations

Available Endpoints

Health check functions are automatically registered to the following endpoints:

  • GET /ping - Basic health check
  • GET /health - Health status
  • GET /readiness - Kubernetes readiness probe
  • GET /liveness - Kubernetes liveness probe

@app.async_task (Planned)

Used to define asynchronous tasks, supporting long-running operations.

python
@app.async_task
async def process_long_task(task_id: str, data: dict) -> dict:
    """Process long-running tasks"""
    # Long-running logic
    result = await heavy_computation(data)
    return result

Note: This feature is planned and not yet implemented.

MCP Agent Annotations

AgentkitMCPApp is used to build MCP (Model Context Protocol) services, encapsulating functions as standard MCP tools.

@app.tool

Registers a function as an MCP tool for LLM invocation.

Function Signature

python
@app.tool
def tool_name(param1: str, param2: int) -> dict:
    pass

Parameter Description

  • Function parameters: Tool input parameters, must have type annotations
  • Return Value: Tool execution result, recommended to return dict type
  • Docstring: Used as tool description for LLM to understand tool purpose

Basic Example

python
from agentkit.apps import AgentkitMCPApp

mcp_app = AgentkitMCPApp()

@mcp_app.tool
def get_city_weather(city: str) -> dict[str, str]:
    """Get city weather information

    Args:
        city: City name (must be in English)

    Returns:
        Dictionary containing weather conditions and temperature
    """
    weather_data = {
        "beijing": {"condition": "Sunny", "temperature": 25},
        "shanghai": {"condition": "Cloudy", "temperature": 22},
    }

    city = city.lower().strip()
    if city in weather_data:
        info = weather_data[city]
        return {"result": f"{info['condition']}, {info['temperature']}°C"}
    else:
        return {"result": f"Weather information not found for {city}"}

Async Tool Example

python
import aiohttp

@mcp_app.tool
async def fetch_web_content(url: str) -> dict:
    """Fetch web page content

    Args:
        url: Web page URL

    Returns:
        Dictionary containing web page content
    """
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            return {
                "status": response.status,
                "content": content[:1000]  # Extract first 1000 characters
            }

Notes

  1. Type annotations required: All parameters must have type annotations; MCP needs this information to generate tool schema
  2. Detailed docstrings: Docstrings serve as tool descriptions to help LLM understand tool purpose
  3. Sync and async both supported: Supports def and async def
  4. Return structured data: Recommended to return dict type for easy LLM parsing
  5. Error handling: Handle exceptions within the function to avoid tool call failures

Complex Parameter Example

python
from typing import List, Optional
from pydantic import BaseModel

class SearchQuery(BaseModel):
    keywords: List[str]
    max_results: int = 10
    filters: Optional[dict] = None

@mcp_app.tool
def search_documents(query: SearchQuery) -> dict:
    """Search documents

    Args:
        query: Search query object, containing keywords, result count, and filter conditions

    Returns:
        Search results list
    """
    # Implement search logic
    results = perform_search(
        keywords=query.keywords,
        max_results=query.max_results,
        filters=query.filters
    )
    return {"results": results, "count": len(results)}

@app.agent_as_a_tool

Encapsulates the entire Agent as an MCP tool, enabling Agent composition and collaboration.

Function Signature

python
@app.agent_as_a_tool
async def agent_tool(prompt: str) -> str:
    pass

Usage Example

python
from veadk import Agent, Runner
from veadk.tools.demo_tools import get_city_weather

# Create a dedicated weather Agent
weather_agent = Agent(tools=[get_city_weather])
weather_runner = Runner(agent=weather_agent)

mcp_app = AgentkitMCPApp()

@mcp_app.agent_as_a_tool
async def weather_assistant(prompt: str) -> str:
    """Weather assistant Agent

    An Agent specifically designed to handle weather-related queries, capable of querying weather information for cities worldwide.

    Args:
        prompt: User's weather query

    Returns:
        Weather query result
    """
    response = await weather_runner.run(messages=prompt)
    return response

Multi-Agent Collaboration Example

python
# Create multiple dedicated Agents
weather_agent = Agent(tools=[get_city_weather])
news_agent = Agent(tools=[get_latest_news])
calendar_agent = Agent(tools=[check_schedule])

@mcp_app.agent_as_a_tool
async def weather_assistant(query: str) -> str:
    """Weather assistant"""
    return await Runner(agent=weather_agent).run(messages=query)

@mcp_app.agent_as_a_tool
async def news_assistant(query: str) -> str:
    """News assistant"""
    return await Runner(agent=news_agent).run(messages=query)

@mcp_app.agent_as_a_tool
async def calendar_assistant(query: str) -> str:
    """Calendar assistant"""
    return await Runner(agent=calendar_agent).run(messages=query)

Notes

  1. Single Agent responsibility: Each Agent should focus on a specific domain
  2. Clear descriptions: Docstrings should clearly define the Agent's capability boundaries
  3. Async execution: Usually an async function
  4. Reasonable timeout: Set a reasonable execution timeout

A2A Agent Annotations

AgentkitA2aApp is used to build A2A (Agent-to-Agent) applications, supporting communication and collaboration between Agents.

@app.agent_executor

Registers an Agent executor, defining the Agent's execution logic.

Function Signature

python
@app.agent_executor(runner=runner, **kwargs)
class MyAgentExecutor(A2aAgentExecutor):
    pass

Parameter Description

  • runner: Agent runner instance
  • kwargs: Other configuration parameters

Basic Example

python
from agentkit.apps import AgentkitA2aApp
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from veadk import Agent, Runner
from veadk.a2a.agent_card import get_agent_card
from veadk.tools.demo_tools import get_city_weather

# Create A2A application
a2a_app = AgentkitA2aApp()

# Create Agent
agent = Agent(tools=[get_city_weather])
runner = Runner(agent=agent)

# Register executor
@a2a_app.agent_executor(runner=runner)
class WeatherAgentExecutor(A2aAgentExecutor):
    """Weather query Agent executor"""
    pass

# Run application
if __name__ == "__main__":
    a2a_app.run(
        agent_card=get_agent_card(agent=agent, url="http://127.0.0.1:8000"),
        host="127.0.0.1",
        port=8000,
    )

Custom Executor Example

python
from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue

@a2a_app.agent_executor(runner=runner)
class CustomAgentExecutor(AgentExecutor):
    """Custom Agent executor"""

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue
    ) -> str:
        """Execute Agent logic

        Args:
            context: Request context, containing input messages and history
            event_queue: Event queue, used for sending intermediate results

        Returns:
            Agent's response result
        """
        # Extract input from context
        user_message = context.current_input

        # Send intermediate events (optional)
        await event_queue.put({
            "type": "thinking",
            "content": "Thinking..."
        })

        # Execute Agent
        response = await self.runner.run(messages=user_message)

        return response

Notes

  1. Inherit AgentExecutor: Must inherit from AgentExecutor or its subclasses
  2. Provide runner: Must pass runner instance through parameters
  3. Implement execute method: To customize logic, override the execute method
  4. Use event_queue: Send intermediate states through the event queue

@app.task_store

Registers task storage for persisting A2A task states.

Function Signature

python
@app.task_store(**kwargs)
class MyTaskStore(TaskStore):
    pass

Using Default Storage

python
# Without specifying task_store, in-memory storage (InMemoryTaskStore) will be used
a2a_app = AgentkitA2aApp()

@a2a_app.agent_executor(runner=runner)
class MyExecutor(A2aAgentExecutor):
    pass

Custom Task Storage Example

python
from a2a.server.tasks.task_store import TaskStore
from a2a.types import Task
import redis

@a2a_app.task_store(redis_url="redis://localhost:6379")
class RedisTaskStore(TaskStore):
    """Redis-based task storage"""

    def __init__(self, redis_url: str):
        self.redis_client = redis.from_url(redis_url)

    async def save_task(self, task: Task) -> None:
        """Save task"""
        task_data = task.model_dump_json()
        self.redis_client.set(f"task:{task.id}", task_data)

    async def get_task(self, task_id: str) -> Task | None:
        """Get task"""
        task_data = self.redis_client.get(f"task:{task_id}")
        if task_data:
            return Task.model_validate_json(task_data)
        return None

    async def delete_task(self, task_id: str) -> None:
        """Delete task"""
        self.redis_client.delete(f"task:{task_id}")

Notes

  1. Optional decorator: If not specified, in-memory storage is used
  2. Inherit TaskStore: Must inherit from TaskStore
  3. Implement required methods: Implement save_task, get_task, delete_task, and other methods
  4. Persistent storage: For production environments, persistent storage (Redis, database, etc.) is recommended

Best Practices

1. Error Handling

All decorated functions should have good error handling:

python
import logging

logger = logging.getLogger(__name__)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    try:
        # Business logic
        result = await process(payload)
        return result
    except ValueError as e:
        logger.warning(f"Invalid input: {e}")
        return f"Invalid input: {str(e)}"
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return "An error occurred. Please try again later."

2. Logging

Add appropriate logs for problem tracking:

python
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    request_id = headers.get("request_id", "unknown")
    logger.info(f"[{request_id}] Processing request: {payload}")

    try:
        response = await runner.run(messages=payload["prompt"])
        logger.info(f"[{request_id}] Request completed successfully")
        return response
    except Exception as e:
        logger.error(f"[{request_id}] Request failed: {e}")
        raise

3. Type Annotations

Use complete type annotations to improve code quality:

python
from typing import Dict, Any

@app.entrypoint
async def run(payload: Dict[str, Any], headers: Dict[str, str]) -> str:
    prompt: str = payload["prompt"]
    user_id: str = headers.get("user_id", "anonymous")
    # ...

4. Docstrings

Add detailed docstrings for all functions:

python
@mcp_app.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> dict:
    """Calculate the distance between two geographic coordinates

    Uses the Haversine formula to calculate the great-circle distance between two points on the Earth's surface.

    Args:
        lat1: Starting latitude (degrees)
        lon1: Starting longitude (degrees)
        lat2: Ending latitude (degrees)
        lon2: Ending longitude (degrees)

    Returns:
        Dictionary containing distance information, distance unit is kilometers

    Example:
        >>> calculate_distance(39.9, 116.4, 31.2, 121.5)
        {"distance_km": 1067.5}
    """
    # Implementation logic

5. Performance Optimization

Avoid performing time-consuming initialization in decorated functions:

python
# ❌ Bad practice
@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    agent = Agent(tools=[...])  # Created on every request
    runner = Runner(agent=agent)
    return await runner.run(messages=payload["prompt"])

# ✅ Good practice
agent = Agent(tools=[...])  # Created only once
runner = Runner(agent=agent)

@app.entrypoint
async def run(payload: dict, headers: dict) -> str:
    return await runner.run(messages=payload["prompt"])

Next Steps

  • Check the complete example code in the samples/ directory
  • Read the SDK Overview to understand the overall architecture
  • Deploy the application to AgentKit Platform

Released under the Apache-2.0 License.