# Volcengine ADK > Documents for Volcengine Agent Development Kit VeADK. # Usage documentation 🎉 欢迎报名 2025 年度火山引擎冬季 Force 大会 “AI Agent开发工作坊” \*\*⏰ 时间:\*\*2025 年 12 月 19 日 14:00-18:00 \*\*📌 场地:\*\*上海世博中心 6F-617 \*\*📄 简介:\*\*AI Agent 工作坊,依托全流程的实际动手操作,并融合相关理论与最佳实践总结,助力参会者掌握从零到一构建企业级 AI Agent 的实战能力,使其具备涵盖 Agent 设计、开发、部署、运维以及持续评估与优化的完整技能体系。借助 AI IDE TRAE 和火山的 Agent SDK VeADK,推动 AI Agent快速落地,实现 Vibe 编程,助力参与者完成基础的 Agent 开发并加速进阶;通过集成和使用 Agentkit,促使 AI Agent 实现从 Research 到 Production - Ready 的能力跨越;运用 Agentkit Identity,达成用户到智能体、智能体到智能体、智能体到 MCP 工具的全链路零信任安全;通过Agentkit MCP 网关,实现存量应用向智能体的迁移与复用;利用 AgentKit 罗盘评测工具,对智能体进行持续评估和质量监测;基于面向真实客户场景的最佳实践闭环,加速 AI 从试点应用迈向规模化应用的落地进程。通过样例库介绍和动手实验,构建共创、行动、反馈的迭代机制。 [点击报名](https://www.volcengine.com/contact/force-2512) Volcengine Agent Development Kit 火山引擎智能体开发套件 ______________________________________________________________________ 火山引擎智能体框架 **VeADK(Volcengine Agent Development Kit)**,是由**火山引擎**推出的为 Agent 智能体的应用构建提供开发、部署、观测、评测等全流程云原生解决方案。相较于现有的智能体开发框架,VeADK 具备与火山引擎产品体系深度融合的优势,帮助开发者更高效地构建企业级 AI 智能体应用。 快速开始 通过以下方式安装 VeADK: ```bash pip install veadk-python ``` ```bash pip install git+https://github.com/volcengine/veadk-python.git@main ``` ______________________________________________________________________ 或者您可以使用 VeADK 提供的镜像仓库: ```text veadk-cn-beijing.cr.volces.com/veadk/veadk-python:latest ``` ```text veadk-cn-beijing.cr.volces.com/veadk/veadk-python:preview ``` ```text veadk-cn-beijing.cr.volces.com/veadk/veadk-python:0.2.20 ``` [快速开始](https://volcengine.github.io/veadk-python/quickstart/index.md) ______________________________________________________________________ ## 使用 VeADK 构建 DeepResearch [](https://veadk.tos-cn-beijing.volces.com/doc-assets/veadk_deepresearch.mp4) ______________________________________________________________________ - **多生态与模型兼容** ______________________________________________________________________ VeADK 与 Google ADK 实现完全兼容,支持现有项目无缝迁移;与 LiteLLM 模型推理服务兼容,支持各类主流模型接入。 - **完善的记忆与知识库支持** ______________________________________________________________________ 提供短期记忆与长期记忆的完整解决方案:短期记忆可基于 MySQL 实现持久化存储;长期记忆则依托 Viking DB、云搜索服务构建。VeADK 以 LlamaINdex 作为知识库核心处理入口,同时支持 Viking 知识库后端无缝接入。 - **内置丰富工具和生态集成** ______________________________________________________________________ 内置 Web Search / 图片生成 / 视频生成等多款火山引擎生态工具,满足基础业务场景需求。支持代码沙箱等进阶工具实现复杂业务场景下的定制化功能。 - **可观测性与评估能力** ______________________________________________________________________ 集成 CozeLoop、APMPlus、TLS 等多款工具组件,全面覆盖调用链路观测、日志存储检索及在线评测等核心需求。具备 Tracing 追踪能力,可精准记录智能体(Agent)执行过程中的关键路径与中间状态。构建智能体运行、评测、调优的一站式闭环支撑体系,为智能体的能力迭代与智能化升级提供坚实保障。 - **云原生架构与快速部署** ______________________________________________________________________ 采用云原生架构设计,提供代码打包、镜像构建等多元化部署形式。通过整合火山引擎 VeFaaS、API 网关等核心服务,实现全开发流程的简化与高效流转。依托云部署项目模板,支持基于 CloudEngine 的一键式部署与发布,大幅提升上线效率。 - **企业级安全防护** ______________________________________________________________________ 依托火山引擎 AgentKit Identity 构建一站式身份鉴权体系,并结合权限管理平台提供全流程服务支撑。在身份管理层面,支持用户池管理、企业 IdP(SAML/OIDC)集成及第三方身份联合认证;在工作负载身份管理层面,可为智能体/工具分配唯一数字身份,并维护属性标签实现精准标识。提供第三方凭据托管功能,通过加密方式托管 API Key 与 OAuth 令牌,从根源杜绝明文凭据泄露风险。在权限管控方面,采用基于属性与上下文的动态授权机制,实现细粒度权限控制,保障服务访问安全。 ## VeADK Famliy VeADK 各组件与火山引擎相关产品的结合矩阵: | **组件** | **火山引擎产品** | **说明** | | -------------- | ------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------- | | **大模型** | [**火山方舟**](https://www.volcengine.com/product/ark) | 大模型平台,提供各类语言模型、多模态模型的推理服务 | | **提示词工程** | [**PromptPilot**](https://promptpilot.volcengine.com/) | 提供提示词管理、优化能力 | | **工具** | [**MCP 广场**](https://www.volcengine.com/mcp-marketplace) | 提供各类 MCP Server,丰富工具一键直连 | | | [**Web search**](https://www.volcengine.com/docs/85508/1650263)(融合信息搜索API) | 融合信息搜索,提供公域数据搜索功能 | | | [**VeSearch**](https://www.volcengine.com/docs/85508/1512748)(联网问答Agent) | 提供信息搜索与云端自动整合功能 | | | [**Web Scraper**](https://www.volcengine.com/docs/84296/1545470) | 定制化内容查询(邀测) | | | [**飞书 Lark**](https://open.larkoffice.com/document/uAjLw4CM/ukTMukTMukTM/mcp_integration/mcp_introduction) | 进行飞书相关操作 | | | [**AI 数据湖服务 LAS**](https://www.volcengine.com/product/las) | 提供开放、低成本、高性能的AI数据湖,海量数据存储与查询能力 | | **短期记忆** | [**MySQL**](https://www.volcengine.com/docs/6313) | 提供使用 MySQL 数据库存储短期记忆,提供高性能读写能力,可实现持久化 | | | [**PostgreSQL**](https://www.volcengine.com/product/rds-pg) | 提供使用 PostgreSQL 数据库存储短期记忆,提供高性能读写能力,可实现持久化 | | | [**火山引擎云数据库 MySQL 版**](https://www.volcengine.com/product/rds-mysql) | 记忆存储 | | **长期记忆** | [**云搜索服务**](https://www.volcengine.com/product/es) (OpenSearch) | 兼容 OpenSearch,支持向量搜索等能力 | | | [**Redis**](https://www.volcengine.com/product/redis) | Redis 作为长期记忆存储,支持 Redisearch 功能 | | | [**Viking 记忆库**](https://www.volcengine.com/docs/84313/1860687?lang=zh) | 知识向量化存储和检索 | | **知识库** | [**Viking 知识库**](https://www.volcengine.com/docs/84313/2117716?lang=zh) | 知识向量化存储和检索 | | | [**MySQL**](https://www.volcengine.com/docs/6313) | 提供使用 MySQL 数据库存储短期记忆,提供高性能读写能力,不具备向量搜索功能 | | | [**Redis**](https://www.volcengine.com/product/redis) | Redis 作为长期记忆存储,支持 Redisearch | | | [**云搜索服务**](https://www.volcengine.com/product/es) (OpenSearch) | 知识向量化存储和检索 | | **可观测** | [**应用性能监控全链路版**](https://www.volcengine.com/product/apmplus) (APMPlus) | 调用链路观测 | | | [**扣子罗盘**](https://www.coze.cn/loop)(CozeLoop) | 调用链路观测 | | | [**日志服务**](https://www.volcengine.com/product/tls) (TLS) | 调用链路观测、日志存储与检索 | | **评测** | [**扣子罗盘**](https://www.coze.cn/loop) (CozeLoop) | 在线评测 | | **云部署** | [**火山引擎函数服务**](https://www.volcengine.com/product/vefaas) (VeFaaS) | 提供一键上云能力 | | | [**火山引擎 API 网关**](https://www.volcengine.com/product/apig) | 提供鉴权、路由等能力 | | | [**火山引擎持续交付**](https://www.volcengine.com/product/cp) | 提供用户仓库向 VeFaaS 进行基于镜像的持续交付部署 | | | [**火山引擎镜像仓库**](https://www.volcengine.com/product/cr) | 提供用户代码镜像托管维护 | ## 安装 您可以从 PyPI 中安装最新版的 VeADK: ```bash pip install veadk-python ``` 您可以通过运行如下命令来检测您的 VeADK 是否安装成功: ```bash veadk --version ``` 了解如何安装 Python 请查看[Python 安装指南](https://www.python.org/downloads/)了解如何安装 Python。 了解如何从源码构建 VeADK 1. 下载源码至本地 将VeADK的代码包从Github下载到本地 ```bash git clone https://github.com/volcengine/veadk-python.git cd veadk-python ``` 1. 配置`uv`环境 本项目使用`uv`进行构建,(了解如何[安装`uv`](https://docs.astral.sh/uv/getting-started/installation/)) ```text # 选择 3.10及以上版本 uv venv --python 3.10 ``` 激活`uv`虚拟环境 ```bash source .venv/bin/activate ``` ```bash source .venv/bin/activate ``` ```text .venv\Scripts\activate.bat ``` ```powershell .venv\Scripts\activate.ps1 ``` 安装 VeADK ```bash uv pip install . # 或以本地可编辑模式安装 # uv pip install -e . ``` 了解如何为 VeADK 贡献代码 请查看[贡献指南](https://volcengine.github.io/veadk-python/references/contributing/index.md)了解如何为 VeADK 项目贡献代码。 ## 生成并运行 您可以执行如下命令,来生成一个我们为您预制好的 Agent 项目模板: ```bash veadk create ``` 您将被提示输入如下信息: - 项目名称(这也将会成为您的本地目录名称) - 方舟平台 API Key(您也可以稍后手动填写到配置文件中) 输入完毕后,您本地的项目结构如下: your_project/ ```text your_project ├── __init__.py # 模块导出文件 ├── .env # 环境变量文件,您需要在此处提供您的模型 API Key └── agent.py # Agent 定义文件 ``` 您需要在 `.env` 中填入您的模型 API Key: [获取方舟 API Key](https://console.volcengine.com/ark/region:ark+cn-beijing/apiKey?apikey=%7B%7D) .env ```text MODEL_AGENT_API_KEY = ... ``` 我们为您生成的 Agent 项目中,`agent.py` 提供了一个可以查询模拟天气数据的智能体,结构如下: agent.py ```python from veadk import Agent # 导入 Agent 模块 root_agent = Agent( name="root_agent", # Agent 名称 description="A helpful assistant for user questions.", # Agent 描述 instruction="Answer user questions to the best of your knowledge", # Agent 系统提示词 model_name="doubao-seed-1-6-251015", # 模型名称 ) ``` 之后,您可以通过如下命令来启动一个浏览器页面,与您的 Agent 进行交互: 常用配置 - 改变服务地址:`veadk web --host 0.0.0.0` - 改变服务端口:`veadk web --port 8000` - 改变日志输出级别:`veadk web --log_level DEBUG` (1) 1. VeADK 重载默认等级为 `ERROR` ```bash veadk web ``` 交互界面如图: # 执行引擎 `Runner` 是 ADK Runtime 中的一个核心组件,负责协调你定义的 Agents、Tools、Callbacks,使它们共同响应用户输入,同时管理信息流、状态变化,以及与外部服务(例如 LLM、Tools)或存储的交互。 VeADK 的执行引擎完全兼容 Google ADK Runner,更多`Runner`工作机制说明您可参考 [Google ADK Agent 运行时](https://google.github.io/adk-docs/runtime/)。 ______________________________________________________________________ ## 多租设计 在程序中,若需满足企业级多租户设计需求,可通过 `app_name`、`user_id` 以及 `session_id` 三个核心维度实现数据隔离,各维度对应的隔离范围如下: | 数据 | 隔离维度 | | -------- | --------------------------------- | | 短期记忆 | `app_name` `user_id` `session_id` | | 长期记忆 | `app_name` `user_id` | | 知识库 | `app_name` | ## 最简运行 VeADK 中为您提供了一个简单的运行接口 `Runner.run`,用于直接运行一个 Agent 实例,处理用户输入并返回响应。 使用限制 本运行接口封装度较高,如您需要在生产级别进行更加灵活的控制,建议使用 `Runner.run_async` 接口,通过异步生成器处理 Agent 每个执行步骤所产生的 Event 事件。 下面是一个最简运行示例: ```python import asyncio from veadk import Agent, Runner agent = Agent() runner = Runner(agent=agent) response = asyncio.run(runner.run(messages="北京的天气怎么样?")) print(response) ``` ## 生产级运行 我们以图片理解为例,演示如何使用 `runner.run_async` 来进行 Event 事件处理: ```python from google.genai.types import Blob, Content, Part from veadk import Agent, Runner APP_NAME = "app" USER_ID = "user" SESSION_ID = "session" agent = Agent() runner = Runner(agent=agent, app_name=APP_NAME) user_message = Content( role="user", parts=[ Part( text="请详细描述这张图片的所有内容,包括物体、颜色、布局和文字信息(如有)。" ), Part( inline_data=Blob( display_name=os.path.basename(image_path), data=read_png_to_bytes(image_path), mime_type="image/png", ) ), ], ) async for event in runner.run_async( user_id=runner.user_id, session_id=session_id, new_message=user_message, run_config=RunConfig(max_llm_calls=1), ): # 在这里处理您的 Event 事件 ``` # 内置工具使用指南 本文档旨在说明如何有效利用 veADK 内置工具(BuiltinTools)。这些工具提供了即用型功能(如网页搜索或代码执行器),赋予 Agent 通用能力。例如,需要从网络检索信息的 Agent 可直接使用 **web_search** 工具,无需额外配置。 ## 使用方法 1. **导入**:从 `veadk.tools.builtin_tools` 模块导入所需工具。 1. **配置**:初始化工具并按需提供参数。 1. **注册**:将工具实例添加至 Agent 的 `tools` 列表。 ```python from veadk import Agent from veadk.tools.builtin_tools.web_search import websearch # 在 Agent 初始化时注册工具 # Agent(tools=[websearch], other_params...) ``` 工具注册后,Agent 会根据 **用户提示** 和 **指令** 自主决定是否调用。框架将在调用时自动执行工具。 > **重要提示**:请务必查阅本文档末尾的“限制条件”部分。 ## 工具列表 veADK 集成了以下火山引擎工具: | 工具名称 | 功能说明 | 导入路径 | | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------- | | `web_search` | 通过[融合信息搜索 API](https://www.volcengine.com/docs/85508/1650263) 进行全网搜索。 | `from veadk.tools.builtin_tools.web_search import web_search` | | `web_scraper` | 聚合搜索(邀测),代码详见[此处](https://github.com/volcengine/mcp-server/tree/main/server)。 | `from veadk.tools.builtin_tools.web_scraper import web_scraper` | | `vesearch` | 调用[联网问答 Agent](https://www.volcengine.com/docs/85508/1512748) 进行搜索,支持头条搜索等。 | `from veadk.tools.builtin_tools.vesearch import vesearch` | | `image_generate` | 根据文本描述[生成图片](https://www.volcengine.com/docs/82379/1541523)。 | `from veadk.tools.builtin_tools.image_generate import image_generate` | | `image_edit` | [编辑图片](https://www.volcengine.com/docs/82379/1541523)(图生图)。 | `from veadk.tools.builtin_tools.image_edit import image_edit` | | `video_generate` | 根据文本描述[生成视频](https://volcengine.github.io/veadk-python/tools/builtin/https.www.volcengine.com/docs/82379/1520757)。 | `from veadk.tools.builtin_tools.video_generate import video_generate` | | `run_code` | 在 [AgentKit 沙箱](https://console.volcengine.com/agentkit-ppe/region:agentkit-ppe+cn-beijing/builtintools)中执行代码。 | `from veadk.tools.builtin_tools.run_code import run_code` | | `lark` | 集成[飞书开放能力](https://open.larkoffice.com/document/uAjLw4CM/ukTMukTMukTM/mcp_integration/mcp_installation),实现文档处理、会话管理等。 | `from veadk.tools.builtin_tools.lark import lark` | | `las` | 基于[火山引擎 AI 多模态数据湖服务 LAS](https://www.volcengine.com/mcp-marketplace) 进行数据管理。 | `from veadk.tools.builtin_tools.las import las` | | `mobile_run` | 手机指令执行 | `from veadk.tools.builtin_tools.mobile_run import create_mobile_use_tool` | ### 公域搜索 (Web Search) `web_search` 工具允许 Agent 通过融合信息搜索的 API 进行搜索。详情请参考[融合信息搜索 API 文档](https://www.volcengine.com/docs/85508/1650263)。 使用 `web_search` 工具的附加要求 1. 需要配置火山引擎 AK、SK 或者使用火山引 IAM 授权的临时 StsToken 1. 需要配置用于 Agent 推理模型的API Key ```python from veadk import Agent, Runner from veadk.tools.builtin_tools.web_search import web_search from veadk.memory.short_term_memory import ShortTermMemory app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent = Agent( name="WebSearchAgent", model_name="doubao-seed-1-6-250615", description="An agent that can get result from Web Search", instruction="You are a helpful assistant that can provide information use web search tool.", tools=[web_search], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run( messages="杭州今天的天气怎么样?", session_id=session_id ) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 环境变量列表: - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key - `VOLCENGINE_ACCESS_KEY`:用于调用WebSearch的火山引擎的AccessKey - `VOLCENGINE_SECRET_KEY`:用于调用WebSearch的火山引擎的SecretKey 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here volcengine: # [optional] for Viking DB and `web_search` tool access_key: you-access-key-here secret_key: you-secret-key-here ``` 运行结果: ### 火山引擎搜索(VeSearch) `vesearch` 工具允许Agent通过调用火山引擎的联网问答Agent来进行搜索,详情请参考[联网问答 Agent](https://www.volcengine.com/docs/85508/1512748)。 使用 `vesearch` 工具的附加要求 1. 需要配置火山引擎 AK、SK 或者使用火山引 IAM 授权的临时 StsToken 1. 需要配置用于 Agent 推理模型的API Key 1. 需要配置联网问答 Agent 的智能体ID,在控制台创建智能体后获取,[控制台地址](https://console.volcengine.com/ask-echo/my-agent)。配置项名称为:`TOOL_VESEARCH_ENDPOINT`。 ```python from veadk import Agent, Runner from veadk.tools.builtin_tools.vesearch import vesearch from veadk.memory.short_term_memory import ShortTermMemory app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" API_KEY = "vesearch_api_key" agent = Agent( name="ve_search_agent", model_name="doubao-seed-1-6-250615", api_key=API_KEY, description="An agent that can get result from veSearch", instruction="You are a helpful assistant that can provide information use vesearch tool.", tools=[vesearch], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run( messages="杭州今天的天气怎么样?", session_id=session_id ) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 环境变量列表: - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key - `VOLCENGINE_ACCESS_KEY`:用于调用WebSearch的火山引擎的AccessKey - `VOLCENGINE_SECRET_KEY`:用于调用WebSearch的火山引擎的SecretKey - `TOOL_VESEARCH_ENDPOINT`: 用于 联网问答 Agent 的智能体ID。注:**必须配置在环境变量里面** 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here volcengine: # [optional] for Viking DB and `web_search` tool access_key: you-access-key-here secret_key: you-secret-key-here ``` 运行结果: ### 代码执行(Run Code) `run_code` 工具使代理能够执行代码。允许模型执行计算、数据处理或运行小程序等任务。 使用 `run_code` 工具的附加要求 1. 需要配置火山引擎 AK、SK 或者使用火山引 IAM 授权的临时 StsToken 1. 需要配置用于 Agent 推理模型的 API Key 1. 需要配置 AgentKit Tools Id,详见:***AgentKit 沙箱工具(Tools)*** 章节部分 ```python from veadk import Agent, Runner from veadk.tools.builtin_tools.web_search import web_search from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.builtin_tools.run_code import run_code app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent: Agent = Agent( name="data_analysis_agent", description="A data analysis for stock marketing", instruction=""" 你是一个资深软件工程师,在沙箱里执行生产的代码, 避免每次安装检查, 可以使用python lib akshare 下载相关的股票数据。可以通过web_search工具搜索相关公司的经营数据。如果缺失了依赖库, 通过python代码为沙箱安装缺失的依赖库。""", tools=[run_code, web_search], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run(messages="阳光电源?", session_id=session_id) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 以下是必须在环境变量里面的配置项: - `AGENTKIT_TOOL_ID`:用于调用火山引擎AgentKit Tools的沙箱环境Id - `AGENTKIT_TOOL_HOST`:用于调用火山引擎AgentKit Tools的EndPoint - `AGENTKIT_TOOL_SERVICE_CODE`:用于调用AgentKit Tools的ServiceCode 环境变量列表: - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key - `VOLCENGINE_ACCESS_KEY`:用于调用WebSearch的火山引擎的AccessKey - `VOLCENGINE_SECRET_KEY`:用于调用WebSearch的火山引擎的SecretKey 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here volcengine: # [optional] for Viking DB and `web_search` tool access_key: you-access-key-here secret_key: you-secret-key-here ``` 运行结果: ### 图像生成工具(Image Generate) `image_generate` 该工具可以根据用户的描述生成图像 使用 `image_generate` 工具的附加要求 1. 需要配置用于 Agent 推理模型的 API Key 1. 需要配置用于 Agent 推理图像生成的模型名称 ```python from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.builtin_tools.generate_image import image_generate app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent = Agent( name="image_generate_agent", description=("根据需求生成图片."), instruction=( """ 你是一个图片生成专家,根据用户的需求生成图片。 你可以使用的工具有: - image_generate 你只能依靠自己和绘图工具来完成任务。 """ ), tools=[ image_generate, ], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run(messages="生成一个可爱的小猫", session_id=session_id) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 环境变量列表: - `MODEL_IMAGE_NAME`: 用于 Agent 推理图像生成的模型名称 - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here image: name: doubao-seedream-4-0-250828 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here ``` 运行结果: ### 视频生成工具(Video Generate) `video_generate` 该工具可以根据用户的描述生成图像 !!! warning "使用 `video_generate` 工具的附加要求": 1. 需要配置用于 Agent 推理模型的 API Key 1. 需要配置用于 Agent 推理视频生成的模型名称 1. 需要配置用于 Agent 推理图片生成的模型名称(该用例使用了 image_generate 工具,因此需要推理图像生成模型的配置) ```python from veadk import Agent, Runner from veadk.tools.builtin_tools.video_generate import video_generate from veadk.tools.builtin_tools.image_generate import image_generate from veadk.memory.short_term_memory import ShortTermMemory app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent = Agent( name="quick_video_create_agent", description=("You are an expert in creating images and video"), instruction="""You can create images and using the images to generate video, first you using the image_generate tool to create an image as the first_frame and next create the last_frame, then you using the generated first_frame and last_frame to invoke video_generate tool to create a video. After the video is created, you need to return the full absolute path of the video avoid the user cannot find the video and give a quick summary of the content of the video. """, tools=[image_generate, video_generate], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run( messages="首先生成一只小狗图片,然后生成一个小狗飞上天抓老鼠的图片,最终合成一个视频", session_id=session_id, ) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 环境变量列表: - `MODEL_VIDEO_NAME`: 用于 Agent 推理视频生成的模型名称 - `MODEL_IMAGE_NAME`: 用于 Agent 推理图像生成的模型名称 - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here image: name: doubao-seedream-4-0-250828 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here image: name: doubao-seedance-1-0-pro-250528 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here ``` 运行结果: ### 飞书 MCP工具 (Lark MCP Tools) `lark` 该MCP工具集可以帮助你快速实现 AI agent 与飞书开放能力的集成,实现基于 agent 的飞书云文档处理、会话管理、日历安排等自动化场景。 使用 `lark` 工具的附加要求(2、3、4获取方式详见:[**Lark创建应用**](https://open.larkoffice.com/document/develop-an-echo-bot/introduction) ) 1. 需要配置用于 Agent 推理模型的 API Key 1. 需要配置用于 Lark 服务的 Application ID 1. 需要配置用于 Lark 服务的 API Key 1. 需要配置用于 Lark 服务的 OAuthToken ```python from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.builtin_tools.lark import lark_tools app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent = Agent( name="lark_agent", description=("飞书机器人"), instruction=( """ 你是一个飞书机器人,通过lark_tools给用户发消息。 """ ), tools=[ lark_tools, ], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run( messages="给xiangya@bytedance.com发送'你好,我是lark agent'", session_id=session_id, ) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 必须配置在环境变量的配置项: - `TOOL_LARK_ENDPOINT`: 用于 Agent 推理视频生成的模型名称 - `TOOL_LARK_API_KEY`: 用于 Agent 推理图像生成的模型名称 - `TOOL_LARK_TOKEN`: 用于 Agent 推理模型 API Key 环境变量列表: - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here ``` 运行结果: ### LAS MCP工具 (LAS MCP Tools) `LAS` 该MCP工具集基于AI多模态数据湖服务LAS,提供多模态数据集的创建、预览、查询分析、编辑和清洗加工能力。 使用 `las` 工具的附加要求 1. 需要配置用于 Agent 推理模型的 API Key 1. 需要预先[创建LAS通用数据集](https://console.volcengine.com/las/region:las+cn-beijing/next/dataset/common/create),并获得数据集的LAS DatasetId 1. 需要配置用于 Las 服务的 DatasetId 1. 需要配置用于 Las 服务的 URL,[URL获取方式](https://www.volcengine.com/mcp-marketplace/detail?name=LAS%20MCP) ```python from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.builtin_tools.las import las app_name = "veadk_app" user_id = "veadk_user" session_id = "veadk_session" agent = Agent( name="las_agent", description=("use data from las"), instruction=( """ 你是一个诗人,根据用户的需求生成诗词。 你可以使用的MCP工具集有: - las 第一步你需要使用las工具去ds_public数据集检索相关内容,然后基于检索内容作为基础来写一首诗词。 """ ), tools=[ las, ], ) short_term_memory = ShortTermMemory() runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response = await runner.run( messages="写一首国风和木头、爱情相关的诗词", session_id=session_id ) print(response) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` 必须配置在环境变量的配置项: - `TOOL_LAS_URL`: 用于提供 LAS MCP Server 服务的地址 - `TOOL_LAS_DATASET_ID`: 配置使用LAS服务的数据集ID 环境变量列表: - `MODEL_AGENT_API_KEY`: 用于 Agent 推理模型 API Key 或在 `config.yaml` 中定义: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-250615 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: your-api-key-here ``` 运行结果: ### 手机指令执行 (Mobile Run) `mobile_run` 该工具可以让Agent在云手机上完成手机任务。 使用 `mobile_run` 工具的附加要求 1. 需要在火山购买云手机服务,并订购pod 1. 根据自己的需要在云手机上配置环境,必须下载app,登录账号等。 ```python ``` 必须配置在环境变量的配置项: - `TOOL_MOBIL_USE_TOOL_ID`: 用于执行命令的云手机id 或在 `config.yaml` 中定义: config.yaml ```yaml tool: mobile_use: tool_id: - product_id-pod_id - product_id-pod_id volcengine: access_key: xxx secret_key: xxx ``` 获取方式: 1. 登录火山,进入云手机产品控制界面 1. tool_id 格式为:product_id-pod_id 运行结果: ## 系统工具 - `load_knowledgebase`:检索知识库工具,在你给 Agent 传入 `knowledgebase` 参数后,将会自动挂载该工具,Agent 将在运行时自主决定何时查询知识库; - `load_memory`:检索长期记忆工具,在你给 Agent 传入 `long_term_memory` 参数后,将会自动挂载该工具,Agent 将在运行时自主决定何时查询长期记忆。 ## 其它 ### AgentKit 沙箱工具(Tools) AgentKit 沙箱工具提供了多种智能体在执行任务中需要的运行环境工具,支持快速集成与便捷调用,同时veADK提供了更多内置工具。 一体化工具集包含 Browser、Terminal、Code 运行环境,支持自动根据任务切换。 **创建沙箱** 按以下步骤操作: 1. **创建沙箱工具**:自定义沙箱名称"AIO_Sandbox_xxxx",工具集类型选择"一体化工具集",描述"Python代码沙箱" 1. **获取结果**:创建完成后在控制台获取‘AIO_Sandbox_xxxx’对应的沙箱ID:t-ye8dj82xxxxx 本文档介绍如何在智能体系统中创建与使用**自定义工具(Custom Tools)**,以扩展内置能力。\ 自定义工具可以满足特定业务逻辑需求,包括普通函数调用、带上下文的函数、以及长时间运行的任务等。\ 通过组合自定义工具,VeADK 能够支持开发者构建更复杂、更贴合业务场景的智能体应用。 ______________________________________________________________________ ## 普通入参函数 普通入参函数是最基础的自定义工具形式,通过传入参数完成特定计算或逻辑处理。为实现一个普通入参函数,需要完成以下步骤: 1. 定义工具函数,使用简单、易懂的扁平化设计,来规范输入参数及返回类型; 1. 实现函数中的 Docstring,描述函数功能、参数及返回值; 1. 将工具注册到 Agent 的工具列表中。 下面我们将实现一个计算器来说明如何自定义一个普通入参函数,并将这个函数注册到 Agent 中。 ```python import asyncio from typing import Any, Dict from google.adk.tools.tool_context import ToolContext from veadk import Agent, Runner def calculator( a: float, b: float, operation: str, tool_context: ToolContext ) -> Dict[str, Any]: """A simple calculator tool that performs basic arithmetic operations. Args: a (float): The first operand. b (float): The second operand. operation (str): The arithmetic operation to perform. Supported operations are "add", "subtract", "multiply", and "divide". Returns: Dict[str, Any]: A dictionary containing the result of the operation, the operation performed, and the status of the operation ("success" or "error"). """ if operation == "add": return {"result": a + b, "operation": "+", "status": "success"} if operation == "subtract": return {"result": a - b, "operation": "-", "status": "success"} if operation == "multiply": return {"result": a * b, "operation": "*", "status": "success"} if operation == "divide": return { "result": a / b if b != 0 else "Error, divisor cannot be zero", "operation": "/", "status": "success" if b != 0 else "error", } return {"status": "error", "message": "Unsupported operation"} agent = Agent( name="computing_agent", instruction="Please use the `calculator` tool to perform user-required calculations", tools=[calculator], ) runner = Runner(agent=agent) response = asyncio.run(runner.run(messages="Add 2 and 3")) print(response) ``` 运行后,可以看到如下结果: Agent 会根据用户输入调用注册的工具函数,执行相应的计算或逻辑处理。 ## 携带运行时上下文信息的函数 在您的工具定义函数中,通过携带有 `ToolContext` 类型的入参,能够访问智能体运行时上下文信息(如会话 ID、用户身份、环境变量等),从而实现更智能的决策。相比于定义普通入参函数,您需要额外传入 `ToolContext` 类型的入参,来获取智能体运行时上下文信息: 1. 定义函数时接受 `tool_context: ToolContext` 作为参数; 1. 在函数逻辑中处理传入的 `tool_context`。 下面通过一个简单的消息验证工具应用来演示如何使用携带有上下文信息 `ToolContext` 的函数。 ```python import asyncio from google.adk.tools.tool_context import ToolContext from veadk import Agent, Runner def message_checker( user_message: str, tool_context: ToolContext, ) -> str: """A user message checker tool that checks if the user message is valid. Args: user_message (str): The user message to check. Returns: str: The checked message. """ print(f"user_message: {user_message}") print(f"current running agent name: {tool_context._invocation_context.agent.name}") print(f"app_name: {tool_context._invocation_context.app_name}") print(f"user_id: {tool_context._invocation_context.user_id}") print(f"session_id: {tool_context._invocation_context.session.id}") return f"Checked message: {user_message.upper()}" agent = Agent( name="context_agent", tools=[message_checker], instruction="Use message_checker tool to check user message, and show the checked message", ) runner = Runner(agent=agent) response = asyncio.run(runner.run(messages="Hello world!")) print(response) ``` 运行结果如下图所示: 图中可以看到,工具函数中可以访问到智能体运行时上下文信息,例如会话 ID、用户身份、当前所执行的 Agent 信息等。 ## 长时运行任务(Long-running Task) 长时运行任务工具适用于需要耗时处理或异步执行的操作,例如大规模计算、数据分析或批量处理任务。下面,我们将通过构建一个简单的数据处理工具来演示长时运行任务的实现。 **引入必要依赖** ```python import asyncio from typing import Any from google.adk.events import Event from google.adk.tools.long_running_tool import LongRunningFunctionTool from google.genai.types import Content, FunctionCall, FunctionResponse, Part from veadk import Agent, Runner ``` **定义长时任务工具** ```python def big_data_processing(data_url: str) -> dict[str, Any]: """Process the big data from a specific data url. Args: data_url (str): The url of the big data to process. Returns: dict[str, Any]: A dictionary containing the result of the big data processing, the data url processed, and the status of the processing ("pending" or "finish"). """ # create a new task for processing big data. return { "status": "pending", "data-url": data_url, "task-id": "big-data-processing-1", } long_running_tool = LongRunningFunctionTool(func=big_data_processing) ``` **初始化 Agent 及运行时元数据** ```python APP_NAME = "long_running_tool_app" USER_ID = "long_running_tool_user" SESSION_ID = "long_running_tool_session" agent = Agent( name="long_running_tool_agent", tools=[long_running_tool], instruction="Use long_running_tool to process big data", ) runner = Runner(agent=agent, app_name=APP_NAME) # 初始化 Session session = asyncio.run( runner.short_term_memory.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID ) ) ``` **定义运行函数并执行** ```python async def call_agent_async(query): def get_long_running_function_call(event: Event) -> FunctionCall | None: # Get the long running function call from the event if ( not event.long_running_tool_ids or not event.content or not event.content.parts ): return for part in event.content.parts: if ( part and part.function_call and event.long_running_tool_ids and part.function_call.id in event.long_running_tool_ids ): return part.function_call def get_function_response( event: Event, function_call_id: str ) -> FunctionResponse | None: # Get the function response for the fuction call with specified id. if not event.content or not event.content.parts: return for part in event.content.parts: if ( part and part.function_response and part.function_response.id == function_call_id ): return part.function_response content = Content(role="user", parts=[Part(text=query)]) print("Running agent...") events_async = runner.run_async( session_id=session.id, user_id=USER_ID, new_message=content ) long_running_function_call, long_running_function_response, task_id = ( None, None, None, ) async for event in events_async: # Use helper to check for the specific auth request event if not long_running_function_call: long_running_function_call = get_long_running_function_call(event) else: _potential_response = get_function_response( event, long_running_function_call.id ) if _potential_response: # Only update if we get a non-None response long_running_function_response = _potential_response task_id = long_running_function_response.response["task-id"] if event.content and event.content.parts: if text := "".join(part.text or "" for part in event.content.parts): print(f"[{event.author}]: {text}") if long_running_function_response: # query the status of the correpsonding ticket via tciket_id # send back an intermediate / final response updated_response = long_running_function_response.model_copy(deep=True) updated_response.response = {"status": "finish"} async for event in runner.run_async( session_id=session.id, user_id=USER_ID, new_message=Content( parts=[Part(function_response=updated_response)], role="user" ), ): if event.content and event.content.parts: if text := "".join(part.text or "" for part in event.content.parts): print(f"[{event.author}]: {text}") asyncio.run(call_agent_async("Process the big data from https://example.com/data.csv")) ``` 执行结果如下图所示: # KnowledgeBase 介绍 ## 主要功能 KnowledgeBase 在 veadk 框架中扮演 Agent 的外部知识库 角色。它像一个可供 Agent 随时查阅的、专门存储静态资料的“图书馆”。其核心功能包括: 1. 知识注入 (Ingestion) :您可以将外部的、非结构化的文本资料(如产品文档、FAQ、文章、知识片段等)添加进知识库。框架会自动处理这些文本,将其转换为 Agent 可以理解和检索的格式。 - 支持从 文件 导入 (kb.add_from_files([...]))。 - 支持从 内存中的文本字符串 导入 (kb.add_from_text([...]))。 1. 后端抽象 (Backend Abstraction) : KnowledgeBase 提供了一个统一的接口,屏蔽了底层向量数据库的实现细节。您只需要在初始化时通过 backend 参数指定使用 viking 还是 opensearch ,而无需关心它们各自的 API 调用方式。 3) 知识检索 (Retrieval) :当 KnowledgeBase 实例被传递给 Agent 后,Agent 会自动获得一个内置的 knowledgebase_search 工具。在回答问题时,Agent 可以自主决定是否使用此工具,将用户的问题转化为关键词去知识库中搜索相关信息,从而给出更精准、更具上下文的回答。 1) 与 Agent 无缝集成 :通过在创建 Agent 时传入 knowledgebase=kb 参数,Agent 就能自动利用这个知识库来增强其回答能力。 ## 使用方法 以下是使用 KnowledgeBase 的典型步骤,并对比了 viking 和 opensearch 的配置差异。 第 1 步:配置 config.yaml 文件 这是两者最主要的区别所在。您需要在项目根目录的 config.yaml 文件中提供对应后端的连接信息 ```yaml # config.yaml volcengine: access_key: "YOUR_VOLCENGINE_AK" # embedding 仍然需要火山引擎的 ak/sk secret_key: "YOUR_VOLCENGINE_SK" database: opensearch: host: "your_opensearch_host" port: 9200 username: "your_username" password: "your_password" ``` 第 2 步:在 Python 脚本中初始化 KnowledgeBase 在代码中,您只需指定 backend 名称即可。 第 3 步:添加知识 添加知识的方法对于两种后端是完全一样的。 第 4 步:集成到 Agent ```python from veadk import Agent, Runner agent = Agent( name="my_knowledgeable_agent", model_name="doubao-seed-1-6-250615", instruction="你是一个知识渊博的助手,请利用知识库回答问题。", knowledgebase=kb, # 在这里传入 # ... 其他参数 ) runner = Runner(agent, app_name="your_app_name") # 之后就可以正常运行 runner # ... ``` # 入门示例 (Getting Started) ## 资源开通 ### vikingDB开通 1. 登录控制台进入vikingDB操作页面 1. 进入控制台后,我们会看到下面的页面。知识库要选中中间红框选择的那个进入。 1. 进入知识库列表页后,按照下图红框提示进入创建页面: 1. 点击创建按钮后弹出创建选项如下(一定要选旗舰版本) 1. 进入创建详情页,按照提示输入关键信息。具体的核心部分见下图红框部分: 1. 点击创建知识库按钮,点完后会有一个弹出问是否导入文档,选择暂不导入。 ### TOS配置 1. 登陆火山控制台进入TOS控制台 1. 创建TOS桶 [火山引擎tos文档](https://www.volcengine.com/docs/6349?lang=zh) ### openSearch 开通 1. 登录火山控制台进入云搜索控制台 1. 进入云搜索控制台后创建opensearch实例 1. 输入实例创建信息 [火山引擎云搜索服务文档:创建实例](https://www.volcengine.com/docs/6465/1117829?lang=zh) ## 代码配置 ### config.yaml 把上面资源开通部分,对应的内容vikingDB、TOS和OpenSearch对应的内容填入。注意agent的api_base api_key可以直接使用例子中的。access_key和secret_key必须使用开通资源账号的。 ```yaml model: agent: provider: openai name: doubao-seed-1-6-251015 api_base: https://ark.cn-beijing.volces.com/api/v3/ api_key: ************************** volcengine: # [optional] for Viking DB and `web_search` tool #替换为自己的ak/sk access_key: *********************************** secret_key: *********************************** database: viking: project: default # user project in Volcengine Viking DB region: cn-beijing tos: endpoint: tos-cn-beijing.volces.com # default Volcengine TOS endpoint region: cn-beijing # default Volcengine TOS region bucket: bucket_name opensearch: host: opensearch.********.com port: 9200 username: admin password: ************* ``` ## 演示场景 ### 知识库的内容 把下面的一段话作为知识库的内容: ```python 西格蒙德·弗洛伊德(Sigmund Freud,1856年5月6日-1939年9月23日)是精神分析的创始人。 精神分析既是一种治疗精神疾病的方法,也是一种解释人类行为的理论。弗洛伊德认为,我们童年时期的经历对我们的成年生活有很大的影响,并且塑造了我们的个性。 例如,源自人们曾经的创伤经历的焦虑感,会隐藏在意识深处,并且可能在成年期间引起精神问题(以神经症的形式)。""", """阿尔弗雷德·阿德勒(Alfred Adler,1870年2月7日-1937年5月28日),奥地利精神病学家,人本主义心理学先驱,个体心理学的创始人。 曾追随弗洛伊德探讨神经症问题,但也是精神分析学派内部第一个反对弗洛伊德的心理学体系的心理学家。 著有《自卑与超越》《人性的研究》《个体心理学的理论与实践》《自卑与生活》等。 ``` ### Agent是角色设定 ```python instruction="""你是一个优秀的助手。当被提问时,请遵循以下步骤: 1. 首先,根据你的内部知识,生成一个初步的回答。 2. 然后,查询你的知识库,寻找与问题相关的信息来验证或丰富你的答案。 3. 最后,结合你的内部知识和知识库中的信息,给出一个全面、准确的最终答案。""", ``` ### 测试问题: 1. 弗洛伊德和阿德勒差了多少岁,多少天? 1. 弗洛伊德和阿德勒差了多少岁,多少天? 他们对后世的影响有多大 ### vikingDB作为知识库存储 #### 知识库初始化(从变量加载): ```python APP_NAME = "viking_demo" mock_data = [ """西格蒙德·弗洛伊德(Sigmund Freud,1856年5月6日-1939年9月23日)是精神分析的创始人。 精神分析既是一种治疗精神疾病的方法,也是一种解释人类行为的理论。弗洛伊德认为,我们童年时期的经历对我们的成年生活有很大的影响,并且塑造了我们的个性。 例如,源自人们曾经的创伤经历的焦虑感,会隐藏在意识深处,并且可能在成年期间引起精神问题(以神经症的形式)。""", """阿尔弗雷德·阿德勒(Alfred Adler,1870年2月7日-1937年5月28日),奥地利精神病学家,人本主义心理学先驱,个体心理学的创始人。 曾追随弗洛伊D探讨神经症问题,但也是精神分析学派内部第一个反对弗洛伊德的心理学体系的心理学家。 著有《自卑与超越》《人性的研究》《个体心理学的理论与实践》《自卑与生活》等。""", ] kb = KnowledgeBase( backend="viking", # 这里设置为viking app_name=APP_NAME, ) res = kb.collection_status() if not res["existed"]: kb.create_collection() # viking需要专门create一下 kb.add_from_text(mock_data) ``` #### Agent代码初始化: ```python agent = Agent( name="chat_agent", model_name="doubao-seed-1-6-250615", description="你是一个优秀的助手,你可以和用户进行对话。", instruction="""你是一个优秀的助手。当被提问时,请遵循以下步骤: 1. 首先,根据你的内部知识,生成一个初步的回答。 2. 然后,查询你的知识库,寻找与问题相关的信息来验证或丰富你的答案。 3. 最后,结合你的内部知识和知识库中的信息,给出一个全面、准确的最终答案。""", knowledgebase=kb, tools=[calculate_date_difference], ) ``` #### #### 完整代码: ```python import os from uuid import uuid4 import yaml os.environ["LITELLM_LOGGING"] = "False" os.environ["LOGGING_LEVEL"] = "DEBUG" import asyncio from datetime import datetime from veadk import Agent, Runner from veadk.knowledgebase import KnowledgeBase # 从config.yaml读取配置 with open('config.yaml', 'r') as f: config = yaml.safe_load(f) # 打印配置结构用于调试 print("Config structure:", config) # 设置环境变量(使用get方法避免KeyError) if config.get('database') and config['database'].get('viking'): os.environ["DATABASE_VIKING_PROJECT"] = config['database']['viking'].get('project', 'default') os.environ["DATABASE_VIKING_REGION"] = config['database']['viking'].get('region', 'cn-beijing') else: print("Warning: Viking database config not found, using defaults") os.environ["DATABASE_VIKING_PROJECT"] = 'default' os.environ["DATABASE_VIKING_REGION"] = 'cn-beijing' if config.get('database') and config['database'].get('tos'): os.environ["DATABASE_TOS_BUCKET"] = config['database']['tos'].get('bucket', 'test-wangyue') else: print("Warning: TOS config not found, using default") os.environ["DATABASE_TOS_BUCKET"] = 'test-wangyue' if config.get('volcengine'): os.environ["VOLCENGINE_ACCESS_KEY"] = config['volcengine'].get('access_key', '') os.environ["VOLCENGINE_SECRET_KEY"] = config['volcengine'].get('secret_key', '') else: print("Warning: Volcengine config not found") os.environ["VOLCENGINE_ACCESS_KEY"] = '' os.environ["VOLCENGINE_SECRET_KEY"] = '' # 验证环境变量 assert os.getenv("DATABASE_VIKING_PROJECT") and os.getenv("DATABASE_VIKING_REGION"), ( "请设置config.yaml里的viking参数" ) assert os.getenv("VOLCENGINE_ACCESS_KEY") and os.getenv("VOLCENGINE_SECRET_KEY"), ( "请在config.yaml里设置火山ak和sk" ) assert os.getenv("DATABASE_TOS_BUCKET"), "请在config.yaml里设置tos相关的参数" APP_NAME = "viking_demo" mock_data = [ """西格蒙德·弗洛伊德(Sigmund Freud,1856年5月6日-1939年9月23日)是精神分析的创始人。 精神分析既是一种治疗精神疾病的方法,也是一种解释人类行为的理论。弗洛伊德认为,我们童年时期的经历对我们的成年生活有很大的影响,并且塑造了我们的个性。 例如,源自人们曾经的创伤经历的焦虑感,会隐藏在意识深处,并且可能在成年期间引起精神问题(以神经症的形式)。""", """阿尔弗雷德·阿德勒(Alfred Adler,1870年2月7日-1937年5月28日),奥地利精神病学家,人本主义心理学先驱,个体心理学的创始人。 曾追随弗洛伊德探讨神经症问题,但也是精神分析学派内部第一个反对弗洛伊德的心理学体系的心理学家。 著有《自卑与超越》《人性的研究》《个体心理学的理论与实践》《自卑与生活》等。""", ] kb = KnowledgeBase( backend="viking", # 这里设置为viking app_name=APP_NAME, ) res = kb.collection_status() if not res["existed"]: kb.create_collection() # viking需要专门create一下 kb.add_from_text(mock_data) #kb.add_from_files(["tmp/demo.txt"]) def calculate_date_difference(date1: str, date2: str) -> int: """ 计算两个日期之间的天数差异 参数: date1: 第一个日期,格式为"YYYY-MM-DD" date2: 第二个日期,格式为"YYYY-MM-DD" 返回: 两个日期之间的天数差异(绝对值) """ # 解析日期字符串为datetime对象 try: d1 = datetime.strptime(date1, "%Y-%m-%d") d2 = datetime.strptime(date2, "%Y-%m-%d") except ValueError as e: raise ValueError(f"日期格式错误,请使用YYYY-MM-DD格式: {e}") # 计算日期差并返回绝对值 delta = d2 - d1 return abs(delta.days) agent = Agent( name="chat_agent", model_name="doubao-seed-1-6-250615", description="你是一个优秀的助手,你可以和用户进行对话。", instruction="你是一个优秀的助手,你可以和用户进行对话。", knowledgebase=kb, tools=[calculate_date_difference], ) runner = Runner( agent, app_name=APP_NAME, ) async def main(): """ 主函数,用于运行agent """ session_id = uuid4().hex while True: try: print(" 您: ", end="") message = input() if message.strip().lower() == "exit": break print(" Agent: ") completion = await runner.run( messages=message, session_id=session_id, ) print(completion) except (KeyboardInterrupt, EOFError): break if __name__ == "__main__": asyncio.run(main()) ``` #### 运行结果: ##### 输入问题: ##### 执行过程: 下图红框里面显示了通过搜索知识库获取相关信息,然后大模型调用calculate_date_difference funcion call。 下图显示Agent先调用knowledge 获取相关知识,然后再通过Function Call调用function calculate_date_difference. ##### 执行结果: ### OpenSearch 作为知识库存储 ##### 知识库初始化(从文件加载变量加载) 在项目目录下新建tmp目录,在tmp目录新建demo.txt。它的内容如下: ````plain text 西格蒙德·弗洛伊德(Sigmund Freud,1856年5月6日-1939年9月23日)是精神分析的创始人。 精神分析既是一种治疗精神疾病的方法,也是一种解释人类行为的理论。弗洛伊德认为,我们童年时期的经历对我们的成年生活有很大的影响,并且塑造了我们的个性。 例如,源自人们曾经的创伤经历的焦虑感,会隐藏在意识深处,并且可能在成年期间引起精神问题(以神经症的形式)。 阿尔弗雷德·阿德勒(Alfred Adler,1870年2月7日-1937年5月28日),奥地利精神病学家,人本主义心理学先驱,个体心理学的创始人。 曾追随弗洛伊德探讨神经症问题,但也是精神分析学派内部第一个反对弗洛伊德的心理学体系的心理学家。 著有《自卑与超越》《人性研究》《个体心理学的理论与实践》《自卑与生活》等。 ```text 知识库初始化代码: ```python APP_NAME = "opensearch_demo" kb = KnowledgeBase( backend="opensearch", # 这里设置为opensearch app_name=APP_NAME, ) # The file path has been corrected to tmp/demo.txt kb.add_from_files(["tmp/demo.txt"]) ```` ##### Agent代码初始化: ```python agent = Agent( name="chat_agent", model_name="doubao-seed-1-6-250615", description="你是一个优秀的助手,你可以和用户进行对话。", instruction="""你是一个优秀的助手。当被提问时,请遵循以下步骤: 1. 首先,根据你的内部知识,生成一个初步的回答。 2. 然后,查询你的知识库,寻找与问题相关的信息来验证或丰富你的答案。 3. 如果知识库信息不足,或用户问题涉及实时、或知识库外的知识,请使用 `web_search` 工具进行网络搜索。 4. 最后,结合你的内部知识、知识库信息以及网络搜索结果,给出一个全面、准确的最终答案。""", knowledgebase=kb, tools=[calculate_date_difference, web_search], ) ``` ##### 完整的代码: ```python import os from uuid import uuid4 import yaml import json from datetime import datetime import asyncio os.environ["LITELLM_LOGGING"] = "False" os.environ["LOGGING_LEVEL"] = "DEBUG" from veadk import Agent, Runner from veadk.knowledgebase import KnowledgeBase # 从config.yaml读取配置 with open('config.yaml', 'r') as f: config = yaml.safe_load(f) # 打印配置结构用于调试 print("Config structure:", config) if config.get('database') and config['database'].get('opensearch'): os.environ["DATABASE_OPENSEARCH_HOST"] = config['database']['opensearch'].get('host', '') os.environ["DATABASE_OPENSEARCH_PORT"] = str(config['database']['opensearch'].get('port', '')) os.environ["DATABASE_OPENSEARCH_USERNAME"] = config['database']['opensearch'].get('username', '') os.environ["DATABASE_OPENSEARCH_PASSWORD"] = config['database']['opensearch'].get('password', '') else: print("Warning: Opensearch config not found") if config.get('volcengine'): os.environ["VOLCENGINE_ACCESS_KEY"] = config['volcengine'].get('access_key', '') os.environ["VOLCENGINE_SECRET_KEY"] = config['volcengine'].get('secret_key', '') else: print("Warning: Volcengine config not found") os.environ["VOLCENGINE_ACCESS_KEY"] = '' os.environ["VOLCENGINE_SECRET_KEY"] = '' # 验证环境变量 assert os.getenv("VOLCENGINE_ACCESS_KEY") and os.getenv("VOLCENGINE_SECRET_KEY"), ( "请在config.yaml里设置火山ak和sk" ) APP_NAME = "opensearch_demo" kb = KnowledgeBase( backend="opensearch", # 这里设置为opensearch app_name=APP_NAME, ) # The file path has been corrected to tmp/demo.txt kb.add_from_files(["tmp/demo.txt"]) def web_search(query: str) -> str: """ 当知识库信息不足时,使用此工具在互联网上搜索实时或外部信息。查询可以是单个字符串,也可以是JSON格式的字符串列表。 """ # This is a placeholder for the actual web search tool call # In a real scenario, this would be implemented by the environment. print(f"Performing web search for: {query}") # The model might send a list of queries directly, or a JSON string of a list. if isinstance(query, list): return f"Web search results for queries: {', '.join(query)}" # If it's a string, it could be a JSON list. if isinstance(query, str): try: queries = json.loads(query) if isinstance(queries, list): return f"Web search results for queries: {', '.join(queries)}" except (json.JSONDecodeError, TypeError): # It's just a plain string. pass # Fallthrough for plain string return "Web search results for '" + query + "'" # Fallback for any other unexpected type return f"Cannot process web search for unexpected query type: {query}" def calculate_date_difference(date1: str, date2: str) -> int: """ 计算两个日期之间的天数差异 参数: date1: 第一个日期,格式为"YYYY-MM-DD" date2: 第二个日期,格式为"YYYY-MM-DD" 返回: 两个日期之间的天数差异(绝对值) """ # 解析日期字符串为datetime对象 try: d1 = datetime.strptime(date1, "%Y-%m-%d") d2 = datetime.strptime(date2, "%Y-%m-%d") except ValueError as e: raise ValueError(f"日期格式错误,请使用YYYY-MM-DD格式: {e}") # 计算日期差并返回绝对值 delta = d2 - d1 return abs(delta.days) agent = Agent( name="chat_agent", model_name="doubao-seed-1-6-250615", description="你是一个优秀的助手,你可以和用户进行对话。", instruction="""你是一个优秀的助手。当被提问时,请遵循以下步骤: 1. 首先,根据你的内部知识,生成一个初步的回答。 2. 然后,查询你的知识库,寻找与问题相关的信息来验证或丰富你的答案。 3. 如果知识库信息不足,或用户问题涉及实时、或知识库外的知识,请使用 `web_search` 工具进行网络搜索。 4. 最后,结合你的内部知识、知识库信息以及网络搜索结果,给出一个全面、准确的最终答案。""", knowledgebase=kb, tools=[calculate_date_difference, web_search], ) runner = Runner( agent, app_name=APP_NAME, ) if __name__ == "__main__": session_id = uuid4().hex print("欢迎使用交互式问答 Agent。输入 'exit' 或 'quit' 来结束对话。") while True: try: message = input("您: ") if message.lower() in ["exit", "quit"]: print("再见!") break completion = asyncio.run( runner.run( messages=message, session_id=session_id, ) ) print(f"Agent: {completion}") except KeyboardInterrupt: print(" 再见!") break ``` ##### 执行过程: ##### 执行结果 # 总结与对比 总而言之, KnowledgeBase 的设计让您可以轻松切换底层向量数据库,而无需更改大部分业务逻辑代码。主要的差异在于前期的配置和 viking 需要显式创建集合这一点。 | 特性 | Viking 后端 | OpenSearch 后端 | | ------------------------ | ---------------------------------------------------------------- | ---------------------------------------------------------- | | **核心优势** | 托管服务,免运维,与火山引擎生态结合紧密。 | 开源、灵活,可私有化部署,社区生态成熟。 | | **配置 (`config.yaml`)** | 需要 `viking.project` 和 `viking.region`。 | 需要 `opensearch.host`, `port`, `username`, `password`。 | | **初始化** | 需要在使用前手动检查并调用 `kb.create_collection()`。 | 通常自动创建索引,无需额外步骤。 | | **代码使用** | **完全一致**。`add_from_files`, `add_from_text` 等方法用法相同。 | **完全一致**。`veadk` 框架的抽象层做得很好。 | | **依赖** | 依赖火山引擎的 `ak/sk` 进行认证和 embedding。 | 同样需要 `ak/sk` 来调用 embedding 模型,但数据库本身独立。 | 本文档将介绍 VeADK 系统的\*\*短期记忆(Short-term Memory)\*\*机制及其应用。短期记忆的核心作用是保存「会话级」的上下文信息,从而提升多轮交互的一致性,让智能体的响应更连贯。 当用户与你的 Agent 开启对话时,ShortTermMemory 或 SessionService会自动创建一个 Session 对象。这个 Session 会全程跟踪并管理对话过程中的所有相关内容。 Note - 更多信息可参考[Google ADK Session](https://google.github.io/adk-docs/sessions/session) ## 使用ShortTermMemory 下面展示了创建和使用短期记忆: ```python import asyncio from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory app_name = "app_short_term_1" user_id = "user_short_term_1" session_id = "session_short_term_1" agent = Agent() short_term_memory = ShortTermMemory( backend="local", # 指定 ShortTermMemory 的存储方式 # 如果是 sqlite,指定数据库路径 # local_database_path="/tmp/d_persistent_short_term_memory.db", ) runner = Runner( agent=agent, short_term_memory=short_term_memory, app_name=app_name, user_id=user_id ) async def main(): response1 = await runner.run( messages="我在 7 月 15 日购买了 20 个冰激凌", session_id=session_id ) print(f"response of round 1: {response1}") response2 = await runner.run( messages="我什么时候买了冰激凌?", session_id=session_id ) print(f"response of round 2: {response2}") if __name__ == "__main__": asyncio.run(main()) ``` **示例输出** ```text response of round 1: 听起来您记录了冰激凌的购买情况!请问您是否需要: 1. 计算相关费用(如果有单价信息) 2. 记录或分析消费习惯 3. 其他与这次购买相关的数据处理或记录需求? response of round 2: 根据您提供的信息,您在 **7 月 15 日** 购买了 20 个冰激凌。 ``` ## 短期记忆的几种实现 VeADK 中,您可以使用如下短期记忆后端服务来初始化您的短期记忆: | 类别 | 说明 | | ------------ | ---------------------------------------------------------------------------------------- | | `local` | 内存短期记忆,程序结束后即清空。生产环境需要使用数据库进行持久化,以符合分布式架构要求。 | | `mysql` | 使用 MySQL 数据库存储短期记忆,可实现持久化 | | `sqlite` | 使用本地 SQLite 数据库存储短期记忆,可实现持久化 | | `postgresql` | 使用 PostgreSQL 数据库存储短期记忆,可实现持久化 | #### 数据库 backend 配置 ```yaml database: mysql: host: user: password: charset: utf8 ``` ```yaml database: postgresql: host: # host or ip user: password: ``` 在火山引擎开通数据库 - [如何开通火山引擎 MySQL 数据库](https://www.volcengine.com/product/rds-mysql) - [如何开通火山引擎 postgresql 数据库](https://www.volcengine.com/product/rds-pg) ## 会话管理 你通常无需直接创建或管理 `Session` 对象,而是通过 `SessionService` 来管理,负责对话会话的完整生命周期。 其核心职责包括: 1. **启动新会话**`create_session()`:当用户发起交互时,创建全新的 Session 对象。 1. **恢复已有会话**`get_session()`:通过 `session_id` 检索特定 Session ,使 Agent 能够接续之前的对话进度。 1. **保存对话进度**`append_event()`:将新的交互内容(Event 对象)追加到会话历史中。这也是会话状态的更新机制。 1. **列出会话列表**`list_sessions()`:查询特定用户及 application 下的活跃会话。 1. **清理会话数据**`delete_session()`:当会话结束或会话不再需要时,删除 Session 对象及其关联数据。 ## 上下文压缩 随着 Agent 运行会话历史会不断增长,从而导致大模型处理数据的增长和响应时间变慢。上下文压缩功能使用滑动窗口方法来汇总会话历史数据,当会话历史超过预定义阈值时,系统会自动压缩旧事件。 ### 配置上下文压缩 添加上下文压缩后,`Runner` 会在每次会话达到间隔时自动压缩会话历史。 ```python from google.adk.apps.app import App from google.adk.apps.app import EventsCompactionConfig app = App( name='my-agent', root_agent=root_agent, events_compaction_config=EventsCompactionConfig( compaction_interval=3, # 每 3 次新调用触发一次压缩。 overlap_size=1 # 包含前一个窗口的最后一次事件重叠。 ), ) ``` ### 定义压缩器 你可以使用`LlmEventSummarizer`自定义使用特定的大模型和压缩结构`PromptTemplate`。 ```python from google.adk.apps.app import App from google.adk.apps.app import EventsCompactionConfig from google.adk.models.lite_llm import LiteLlm from google.adk.apps.llm_event_summarizer import LlmEventSummarizer # 自定义压缩器使用的AI模型 summarization_llm = LiteLlm( model="volcengine/doubao-seed-1-6-lite-251015", api_key=f"{model_api_key}", api_base=f"{model_api_base}", ) my_compactor = LlmEventSummarizer(llm=summarization_llm, # 自定义压缩结构 prompt_template=""" Please summarize the conversation. Compression Requirements: 1. Retain key entities, data points, and timelines 2. Highlight core problems and solutions discussed 3. Maintain logical coherence and contextual relevance 4. Eliminate duplicate expressions and redundant details """) app = App( name='my-agent', root_agent=root_agent, events_compaction_config=EventsCompactionConfig( compactor=my_compactor, compaction_interval=5, overlap_size=1 ), ) ``` 本文档介绍 VeADK 系统中的\*\*长期记忆(Long-term Memory)\*\*概念及应用。长期记忆用于跨会话、跨时间保存重要信息,以增强智能体在连续交互中的一致性和智能性。 **定义**:长期记忆是智能体用于存储超出单次会话范围的重要信息的机制,可以包含用户偏好、任务历史、知识要点或长期状态。 **为什么重要**: - 支持跨会话的连续对话体验; - 允许智能体在多次交互中保留学习成果和用户特定信息; - 减少重复询问,提升用户满意度和效率; - 支持长期策略优化,例如个性化推荐或任务追踪。 智能体用户需要长期记忆来实现更智能、个性化和可持续的交互体验,尤其在多次会话或复杂任务场景中显著提高系统实用性。 ## 支持后端类型 | 类别 | 说明 | | ------------ | ---------------------------------------------------------- | | `local` | 内存跨 Session 记忆,程序结束后即清空 | | `opensearch` | 使用 OpenSearch 作为长期记忆存储,可实现持久化和检索 | | `redis` | 使用 Redis 作为长期记忆存储,Redis 需要支持 Rediseach 功能 | | `viking` | 使用 VikingDB 记忆库产品作为长期记忆存储 | | `viking_mem` | 已废弃,设置后将会自动转为 `viking` | ## 初始化方法 在使用长期记忆之前,需要实例化 LongTermMemory 对象并指定后端类型。以下代码展示了如何初始化基于 VikingDB 的长期记忆模块,并将其绑定到 Agent: ```python from veadk import Agent, Runner from veadk.memory import LongTermMemory # 初始化长期记忆 # backend="viking" 指定使用 VikingDB # app_name 和 user_id 用于数据隔离 long_term_memory = LongTermMemory( backend="viking", app_name="local_memory_demo", user_id="demo_user" ) # 将长期记忆绑定到 Agent root_agent = Agent( name="minimal_agent", instruction="Acknowledge user input and maintain simple conversation.", long_term_memory=long_term_memory, # 长期记忆实例 ) runner = Runner(agent=root_agent) ``` ## 记忆管理 ### 添加会话到长期记忆 在会话(Session)结束或达到特定节点时,需要显式调用 add_session_to_memory 将会话数据持久化。对于 Viking 后端,这一步会触发数据的向量化处理。 ```python # 假设 runner1 已经完成了一次对话 completed_session = await runner.session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) # 将完整会话归档到长期记忆 root_agent.long_term_memory.add_session_to_memory(completed_session) ``` ### 检索长期记忆 除了 Agent 在运行时自动检索外,开发者也可以调用 search_memory 接口直接进行语义搜索,用于调试或构建自定义的 RAG(检索增强生成)应用。 ```python query = "favorite project" res = await root_agent.long_term_memory.search_memory( app_name=APP_NAME, user_id=USER_ID, query=query ) # 打印检索结果 print(res) ``` ## 使用长期记忆进行会话管理 在单租户场景中,长期记忆可用于管理同一用户的多次会话,确保智能体能够: - 在新会话中记忆上一次交互内容; - 根据历史信息做出个性化响应; - 在多轮任务中累积进度信息或中间结果。 ### 准备工作 - 为每个用户分配唯一标识(user_id 或 session_owner_id); - 设计长期记忆数据结构以支持多会话信息保存; - 配合短期记忆使用,实现会话内上下文快速访问。 ### 示例 以下示例演示了一个完整的流程:Runner1 告诉 Agent 一个信息("My favorite project is Project Alpha"),将会话存入记忆,然后创建一个全新的 Runner2,验证其能否回答相关问题。 ```python # --- 阶段 1: 写入记忆 --- # Runner1 告诉 Agent 信息 runner1_question = "My favorite project is Project Alpha." user_input = types.Content(role="user", parts=[types.Part(text=runner1_question)]) async for event in runner1.run_async(user_id=USER_ID, session_id=session_id, new_message=user_input): pass # 处理 Runner1 的响应 # 关键步骤:将会话归档到 VikingDB completed_session = await runner1.session_service.get_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id) root_agent.long_term_memory.add_session_to_memory(completed_session) # --- 阶段 2: 跨会话读取 --- # 初始化 Runner2 (模拟新的会话或后续交互) runner2 = Runner( agent=root_agent, app_name=APP_NAME, user_id=USER_ID, short_term_memory=short_term_memory ) # Runner2 提问,依赖长期记忆回答 qa_question = "favorite project" qa_content = types.Content(role="user", parts=[types.Part(text=qa_question)]) final_text = None async for event in runner2.run_async(user_id=USER_ID, session_id=session_id, new_message=qa_content): if event.is_final_response(): final_text = event.content.parts[0].text.strip() ``` ### 说明 / 结果展示 - 智能体能够识别并关联同一用户的历史交互; - 提供连续性强、个性化的多会话交互体验; - 为长期任务、学习型应用或持续监控场景提供基础能力。 ```text [Log Output] Runner1 Question: My favorite project is Project Alpha. Runner1 Answer: (Acknowledged) ... [Step 4] Archiving session to Long-Term Memory via memory_service Session archived to Long-Term Memory ... Runner2 Question: favorite project Runner2 Answer: Your favorite project is Project Alpha. ``` # 观测您的智能体 本文档介绍智能体系统中的 \*\*Tracing(跟踪)\*\*概念及其在开发和实践中的作用。 Tracing 是企业级智能体应用中实现可观测性(Observability)的核心组成部分。 ______________________________________________________________________ ## 全链路观测 Tracing 指的是对智能体执行过程的**全链路记录与追踪**。 在 Agent 系统中,这包括: - 用户输入与请求的接收时间和内容; - 智能体内部推理过程,例如模型调用、工具执行、策略决策; - 各工具、服务或外部系统接口的调用记录; - 响应生成及输出结果。 通过 Tracing,可以将智能体执行过程形成可分析、可追溯的日志数据,为开发、调试、监控和优化提供依据。 **核心特点**: - **可视化链路**:每次请求从接入到响应的完整流程可追踪; - **结构化数据**:支持记录上下文、状态、事件类型、耗时等; - **跨组件追踪**:涵盖 Agent、工具、记忆模块、知识库及外部接口。 下面是一段 Tracing 数据的结构示例(一般为 `json` 格式文件): tracing.json ```json { "name": "execute_tool get_city_weather", "span_id": 15420762184471404328, "trace_id": 195590910357197178730434145750344919939, "start_time": 1762770336065962000, "end_time": 1762770336066380000, "attributes": { "gen_ai.operation.name": "execute_tool", "gen_ai.tool.name": "get_city_weather", "gen_ai.tool.input": "{\"name\": \"get_city_weather\"}", "gen_ai.tool.output": "{\"response\": {\"result\": \"Sunny, 25°C\"}}", "cozeloop.input": "{\"name\": \"get_city_weather\"}", "cozeloop.output": "{\"response\": {\"result\": \"Sunny, 25°C\"}}" }, "parent_span_id": 18301436667977577407 } ``` ## Tracing 的主要能力 在智能体系统中,引入 Tracing 有多方面价值: **调试与问题定位** - 智能体在多轮交互或复杂任务中可能出现逻辑错误、工具调用失败或模型输出异常 - 帮助开发者快速定位问题发生环节 **性能分析与优化** - 通过跟踪各模块执行时间,可以发现性能瓶颈; - 优化模型调用策略、工具执行顺序或缓存策略,提高系统响应速度。 **可审计性与合规性**: - 企业级应用需要记录操作链路以满足审计或合规要求; - Tracing 可提供完整的请求、响应和决策过程记录。 **多 Agent 系统协调**: - 在多 Agent 协作场景中,Tracing 可以帮助分析各 Agent 的调用关系和数据流向; - 支持跨 Agent 调度和工具调用的可视化追踪。 **系统可靠性提升**: - 可及时发现异常或失败事件,并触发告警或自动恢复机制; - 为长期运行的 Agent 系统提供稳定性保障。 下面是一段 Tracing 数据的链路示例,展示了一个智能体系统中多个 Agent 之间的调用和事件流程。 tracing.json ```json # Tracing 在多 Agent 系统中调用和事件链路示例 # coding_agent -> python_coder_agent Tracing链路如下: [ { "name": "execute_tool transfer_to_agent", "span_id": 8839983747734433620, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089073534973000, "end_time": 1763089073535544000, "attributes": { "agent.name": "coding_agent", "gen_ai.operation.name": "execute_tool", "gen_ai.tool.name": "transfer_to_agent", }, "parent_span_id": 5680387922600458423 }, { "name": "call_llm", "span_id": 6376035117066938110, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089073539770000, "end_time": 1763089112820657000, "attributes": { "agent.name": "python_coder", "gen_ai.request.model": "openai/doubao-seed-1-6-251015", "gen_ai.request.type": "chat", "gen_ai.span.kind": "llm", "gen_ai.prompt.0.role": "user", "gen_ai.prompt.0.content": "使用 Python 帮我写一段快速排序的代码。" }, "parent_span_id": 11028930176666491606 }, { "name": "invoke_agent python_coder", "span_id": 11028930176666491606, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089073538867000, "end_time": 1763089112820868000, "attributes": { "gen_ai.span.kind": "agent", "gen_ai.operation.name": "invoke_agent", "gen_ai.agent.description": "擅长使用 Python 编程语言来解决问题。", "gen_ai.system": "openai", "agent.name": "python_coder" }, "parent_span_id": 5680387922600458423 }, { "name": "call_llm", "span_id": 5680387922600458423, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089069195992000, "end_time": 1763089112820933000, "attributes": { "agent.name": "coding_agent", "gen_ai.request.model": "openai/doubao-seed-1-6-251015", "gen_ai.request.type": "chat", "gen_ai.span.kind": "llm", "gen_ai.prompt.0.role": "user", "gen_ai.prompt.0.content": "使用 Python 帮我写一段快速排序的代码。", }, "parent_span_id": 987150150265236585 }, { "name": "invoke_agent coding_agent", "span_id": 987150150265236585, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089069192617000, "end_time": 1763089112821024000, "attributes": { "gen_ai.span.kind": "agent", "gen_ai.operation.name": "invoke_agent", "gen_ai.agent.description": "可以调用适合的智能体来解决用户问题。", "gen_ai.system": "openai", "agent.name": "coding_agent", }, "parent_span_id": 11431457420615823859 }, { "name": "invocation", "span_id": 11431457420615823859, "trace_id": 161011438572144016825706884868588853689, "start_time": 1763089069192376000, "end_time": 1763089112821085000, "attributes": { "gen_ai.operation.name": "chain", "gen_ai.span.kind": "workflow", "gen_ai.system": "openai", "agent.name": "python_coder", }, "parent_span_id": null } ] ``` 通过 Tracing,开发者可以对智能体系统进行**可观察性增强**,从而实现高效开发、可靠运行以及持续优化。 ## 在 VeADK 中开启观测 您可以在 VeADK 中开启火山引擎 Agent 执行全链路观测能力: ```python import asyncio from veadk import Agent, Runner from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer # init tracing exporter exporters = [ APMPlusExporter(), ] tracer = OpentelemetryTracer(exporters=exporters) # Define the Agent agent = Agent( tracers=[tracer], ) runner = Runner(agent=agent) response = asyncio.run(runner.run(messages="hi!")) print(response) ``` 环境变量列表: - `OBSERVABILITY_OPENTELEMETRY_APMPLUS_API_KEY`:APM 服务的 API Key - `OBSERVABILITY_OPENTELEMETRY_APMPLUS_ENDPOINT`:APM 服务的 Endpoint, 例如 `http://apmplus-cn-beijing.volces.com:4317` - `OBSERVABILITY_OPENTELEMETRY_APMPLUS_SERVICE_NAME`:APM 的 Service Name, 例如 `python_coder_agent` 或在 `config.yaml` 中定义: config.yaml ```yaml observability: opentelemetry: apmplus: endpoint: ... api_key: ... service_name: ... ``` 开启后,您可以看到日志中可以打印出相关 Tracing 数据的 ID: 与 OpenTelemetry 的兼容性 VeADK 相关字段命名遵循 OpenTelemetry 生成式 AI 规范,您可以直接将 Tracing 数据导入到 OpenTelemetry 兼容的系统中进行分析和可视化。 ## CozeLoop 平台 通过 VeADK 开发的火山智能体接入到扣子罗盘之后,可以通过扣子罗盘的评测功能进行 Agent 评测,或者通过 Trace 功能实现调用链路观测。火山智能体的 Trace 数据可以直接上报至扣子罗盘,实现调用链路观测;在扣子罗盘中注册的火山智能体,也可以通过观测功能进行 Agent 评测。 ### 准备工作 在 VeADK 配置文件 config.yaml 的 observability 字段中填写 cozeloop 的属性。关于配置文件的详细说明及示例可参考配置文件。 - endpoint:固定设置为 `https://api.coze.cn/v1/loop/opentelemetry/v1/traces` - api_key:扣子罗盘访问密钥,支持个人访问令牌、OAuth 访问令牌和服务访问令牌。获取方式可参考[配置个人访问令牌](https://loop.coze.cn/open/docs/cozeloop/authentication-for-sdk#05d27a86)。 - service_name:扣子罗盘工作空间的 ID。你可以在登录扣子罗盘之后,左上角切换到想要存放火山智能体数据的工作空间,并在 URL 的 space 关键词之后获取工作空间 ID,例如 `https://loop.coze.cn/console/enterprise/personal/space/739XXXXXXXX092/pe/prompts` 中,**739XXXXXXXX092**为工作空间 ID。 config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-flash-250828 api_base: https://ark.cn-beijing.volces.com/api/v3 api_key: your api_key volcengine: access_key: your ak secret_key: your sk observability: opentelemetry: cozeloop: endpoint: https://api.coze.cn/v1/loop/opentelemetry/v1/traces api_key: your your api_key service_name: your cozeloop space id ``` ### 部署运行 #### Cozeloop exporter接入代码 agent.py ```python import asyncio from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.demo_tools import get_city_weather from veadk.tracing.telemetry.exporters.cozeloop_exporter import CozeloopExporter from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer from veadk.tracing.telemetry.exporters.cozeloop_exporter import CozeloopExporterConfig cozeloop_exporter = CozeloopExporter() exporters = [cozeloop_exporter] tracer = OpentelemetryTracer(exporters=exporters) agent = Agent(tools=[get_city_weather], tracers=[tracer]) session_id = "session_id_pj" runner = Runner(agent=agent, short_term_memory=ShortTermMemory()) prompt = "How is the weather like in Beijing? Besides, tell me which tool you invoked." asyncio.run(runner.run(messages=prompt, session_id=session_id)) ``` #### 效果展示 ```bash python agent.py ``` ## APMPlus 平台 通过 VeADK 开发的火山智能体可以通过定义 APMPlus 数据导出器接入到火山引擎 APMPlus 平台,实现调用链路观测。 ### 准备工作 - endpoint:指定APMPlus的接入点为 http://apmplus-cn-beijing.volces.com:4317 - api_key:需填入有效应用程序密钥。 - service_name:指定服务名称,可根据实际需求修改。 初始化 APMPlusExporter:利用APMPlusExporterConfig配置端点、应用程序密钥和服务名称,创建APMPlusExporter实例,配置从环境变量获取。示例代码如下: config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-flash-250828 api_base: https://ark.cn-beijing.volces.com/api/v3 api_key: your api_key volcengine: access_key: your ak secret_key: your sk observability: opentelemetry: apmplus: endpoint: http://apmplus-cn-beijing.volces.com:4317 api_key: your api_key service_name: apmplus_veadk_pj ``` agent.py ```python import asyncio from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.demo_tools import get_city_weather from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporterConfig from os import getenv apmplus_exporter = APMPlusExporter() exporters = [apmplus_exporter] tracer = OpentelemetryTracer(exporters=exporters) agent = Agent(tools=[get_city_weather], tracers=[tracer]) session_id = "session_id_pj" runner = Runner(agent=agent, short_term_memory=ShortTermMemory()) prompt = "How is the weather like in Beijing? Besides, tell me which tool you invoked." asyncio.run(runner.run(messages=prompt, session_id=session_id)) ``` ### 部署运行 本地运行上述agent.py代码,触发APMPlus追踪器记录Agent运行的各个节点的调用,以及Metrics信息上传云端存储: ```bash python agent.py ``` #### 会话信息 #### trace信息 #### 模型指标信息 ## TLS 平台 通过 VeADK 开发的火山智能体可以通过定义 TLS 数据导出器来接入到火山引擎日志服务 TLS,并在 TLS 的观测功能模块中进行 Agent 执行链路观测。 ### 准备工作 #### veADK代码中创建tracing project和实例 config.yaml ```yaml model: agent: provider: openai name: doubao-seed-1-6-flash-250828 api_base: https://ark.cn-beijing.volces.com/api/v3 api_key: your api_key volcengine: access_key: your ak secret_key: your sk observability: opentelemetry: tls: endpoint: https://tls-cn-beijing.volces.com:4318/v1/traces service_name: tp_pj region: cn-beijing ``` agent.py ```python import asyncio from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.demo_tools import get_city_weather from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporter from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporterConfig from veadk.integrations.ve_tls.ve_tls import VeTLS from os import getenv # 初始化VeTLS客户端用于创建日志项目和追踪实例 ve_tls_client = VeTLS() # 创建日志项目 project_name = "veadk_pj" log_project_id = ve_tls_client.create_log_project(project_name) print(f"Created log project with ID: {log_project_id}") # 创建追踪实例 trace_instance_name = getenv("OBSERVABILITY_OPENTELEMETRY_TLS_SERVICE_NAME") trace_instance = ve_tls_client.create_tracing_instance(log_project_id, trace_instance_name) print(f"Created trace instance with ID: {trace_instance['TraceInstanceId']}") ``` #### TLS Exporter接入代码示例 agent.py ```python import asyncio from veadk import Agent, Runner from veadk.memory.short_term_memory import ShortTermMemory from veadk.tools.demo_tools import get_city_weather from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporter from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporterConfig from veadk.integrations.ve_tls.ve_tls import VeTLS from os import getenv # 初始化TLSExporter用于上报 tracing 数据 tls_exporter = TLSExporter( config=TLSExporterConfig( topic_id=trace_instance.get('TraceTopicId', trace_instance_name), ) ) exporters = [tls_exporter] tracer = OpentelemetryTracer(exporters=exporters) agent = Agent(tools=[get_city_weather], tracers=[tracer]) session_id = "session_id_pj" runner = Runner(agent=agent, short_term_memory=ShortTermMemory()) prompt = "How is the weather like in Beijing? Besides, tell me which tool you invoked." asyncio.run(runner.run(messages=prompt, session_id=session_id)) ``` ### 部署运行 本地运行上述agent.py代码,触发TLS Project、Topic的创建,并且通过追踪器记录Agent运行的各个节点的调用: ```bash python agent.py ```