本指南将帮助您基于VeAIOps的服务端架构方便地通过二次开发的方式进行功能扩展,包括实现新的消息通道(Channel)、监控数据源(DataSource)和告警事件接入等。
VeAIOps可以方便地通过二次开发的方式进行功能扩展,包括实现新的消息通道(Channel)、监控数据源(DataSource)和告警事件接入等。
核心思想: 实现
BaseChannel接口, 并通过@register_channel()装饰器进行自动注册。
定义Channel枚举: 在 veaiops/schema/types.py 的 ChannelType 枚举中添加新的Channel类型。
创建实现文件: 在 veaiops/channel/ 目录下创建对应的 new_channel.py 文件。
实现Channel类: 在 new_channel.py 中创建 NewChannel 类,继承自 BaseChannel 并实现所有抽象方法。
注册Channel: 使用 @register_channel() 装饰器将新实现的Channel类注册到系统中。
导出模块: 在 veaiops/channel/__init__.py 文件中导出新的Channel类,使其对系统可见。
添加测试: 在 tests/channel/ 目录下为新的Channel添加单元测试和集成测试。
首先,在 veaiops/schema/types.py 文件中的 ChannelType 枚举里添加您的新Channel名称。
class ChannelType(str, Enum):
"""Channel types supported by the system."""
Lark = "Lark"
DingTalk = "DingTalk"
WeChat = "WeChat"
Webhook = "Webhook"
NewChannel = "NewChannel" # 添加您的新Channel
在 veaiops/channel/ 目录下创建一个新文件(例如 new_channel.py)。在这个文件中,定义一个继承自 veaiops.channel.base.channel.BaseChannel 的类。
您必须实现 BaseChannel 中定义的所有抽象方法。这些方法定义了Channel与VeAIOps系统核心交互的统一接口。
from typing import Any, Dict, List, Optional
from fastapi.responses import JSONResponse
from starlette.requests import Request
from veaiops.channel.base.channel import BaseChannel
from veaiops.channel.registry import register_channel
from veaiops.schema.chatops import Message, Chat, Part
from veaiops.schema.types import ChannelType, AgentType
@register_channel()
class NewChannel(BaseChannel):
channel = ChannelType.NewChannel
async def event_handle(self, request: Request) -> Any:
"""处理平台事件回调的主入口。"""
# 1. 验证请求的合法性(如签名校验)
# 2. 解析payload,判断事件类型(如消息、按钮点击等)
# 3. 调用相应的处理方法
pass
async def payload_to_msg(self, payload: Dict[str, Any]) -> Optional[Message]:
"""将平台的消息负载转换为系统内部的Message对象。"""
# 解析payload,提取消息ID、内容、发送者等信息
pass
async def msg_to_llm_compatible(self, *args, **kwargs) -> List[Part]:
"""将消息转换为LLM兼容的格式。"""
# 将消息内容转换为LLM能理解的格式
pass
async def payload_to_chat(self, payload: Dict[str, Any]) -> Optional[Chat]:
"""将平台的聊天负载转换为系统内部的Chat对象。"""
# 解析payload,提取聊天ID、用户ID等信息
pass
async def payload_response(self, payload: Dict[str, Any]) -> JSONResponse:
"""处理平台消息负载并立即返回响应,以避免超时。"""
# 通常返回一个HTTP 200 OK响应
return JSONResponse(content={})
async def recreate_chat_from_payload(self, payload: dict) -> None:
"""根据消息负载重新创建或更新聊天会话记录。"""
# 用于确保会话状态的一致性
pass
async def send_message(self, content: dict, agent_type: AgentType, *args, **kwargs) -> str:
"""调用平台API发送消息。"""
# 1. 从kwargs获取chat_id、user_id等目标信息
# 2. 构建符合平台API要求的消息体
# 3. 调用平台API发送
pass
@register_channel() 装饰器。这将自动将其注册到Channel工厂中。veaiops/channel/__init__.py 文件中导出您新创建的Channel类。from .lark.lark import LarkChannel
from .webhook import WebhookChannel
from .new_channel import NewChannel # 导出您的新Channel
from .registry import REGISTRY
__all__ = ["LarkChannel", "WebhookChannel", "NewChannel", "REGISTRY"]
VeAIOps的核心API路由位于
veaiops/handler/routers/apis/v1/webhooks.py。它通过访问payload["header"]["event_type"]来接收和处理所有Channel的事件回调。您无需修改此文件,因为Channel注册机制会自动将请求分发到您实现的event_handle方法。
您的主要工作是在 event_handle 方法中正确解析和处理来自新Channel平台的回调请求。
在 tests/channel/ 目录下为您的的新Channel添加测试用例,以确保其所有功能(如消息解析、发送、事件处理等)都能正常工作。
- 参考现有实现:
veaiops/channel/lark/lark.py是功能最完整的实现,强烈建议作为主要参考。- 错误处理: 妥善处理平台API调用可能出现的各种错误,并提供清晰的日志。
- 日志记录: 在关键步骤添加详细的日志,以便于调试和问题排查。
- 幂等性: 对于可能重复接收的事件,使用基类提供的
check_idempotence方法来确保消息处理的幂等性。- 异步处理: 充分利用
asyncio和await来提高并发性能,尤其是在处理网络I/O时。
VeAIOps支持通过插件化的方式扩展不同的监控数据源(DataSource)。本节将详细介绍如何实现一个新的监控数据源并将其集成到现有系统中。
核心思想: 继承
DataSource基类,在DataSourceFactory中注册,并实现数据获取和告警同步的逻辑。
定义数据源模型: 在 veaiops/schema/documents/datasource/base.py 中定义新数据源的配置和连接模型。
实现数据源逻辑: 在 veaiops/metrics/ 目录下创建新的数据源实现文件,包括客户端和数据源类。
集成数据源工厂: 在 veaiops/metrics/datasource_factory.py 中更新 DataSourceFactory 以支持新数据源。
创建API路由: 在 veaiops/handler/routers/apis/v1/datasource/ 目录下添加新的API路由文件。
更新服务层: 在 veaiops/handler/services/datasource/ 中添加新数据源的业务逻辑。
在 veaiops/schema/documents/datasource/base.py 中,您需要:
Connect 模型: 添加新数据源所需的连接凭证字段,如API密钥、用户名/密码等。确保在 validate_update_fields 方法中包含这些新字段。BaseDataSourceConfig 的新配置模型,用于存储新数据源的特定配置,如区域、命名空间等。DataSource 模型: 将新的配置模型添加到 DataSource 模型中,并更新 validate_update_fields 方法。示例: 为名为 NewMonitor 的新数据源添加配置
class NewMonitorConfig(BaseDataSourceConfig):
"""NewMonitor data source configuration."""
region: Optional[str] = None
# 添加其他特定于NewMonitor的字段
class Connect(BaseConfigDocument):
# ...
new_monitor_api_key: Optional[EncryptedSecretStr] = None
new_monitor_secret_key: Optional[EncryptedSecretStr] = None
def validate_update_fields(self, update_data: Dict[str, Any]) -> Dict[str, Any]:
allowed_fields = {
# ...
"new_monitor_api_key",
"new_monitor_secret_key",
}
# ...
class DataSource(BaseConfigDocument):
# ...
new_monitor_config: Optional[NewMonitorConfig] = None
def validate_update_fields(self, update_data: Dict[str, Any]) -> Dict[str, Any]:
allowed_fields = {
# ...
"new_monitor_config",
}
# ...
在 veaiops/metrics/ 目录下创建一个新文件,例如 new_monitor.py,并实现以下类:
NewMonitorClient: 封装与新数据源API的交互,包括身份验证、数据获取和告警规则管理。NewMonitorDataSource: 继承自 DataSource 基类,并实现其抽象方法:
_fetch_one_slot: 获取指定时间范围内的数据。sync_rules_for_intelligent_threshold_task: 同步智能阈值告警规则。concurrency_group: 定义并发组标识。get_concurrency_quota: 定义并发配额。fetch_partial_data: 获取部分数据。NewMonitorRuleConfig: 继承自 BaseRuleConfig,定义新数据源的告警规则配置。RuleSynchronizer: 继承自 BaseRuleSynchronizer,实现告警规则的同步逻辑。示例: new_monitor.py 的基本结构
from veaiops.metrics.base import DataSource, BaseRuleConfig, BaseRuleSynchronizer, rate_limit
class NewMonitorClient:
# ... 实现API客户端 ...
class NewMonitorDataSource(DataSource):
# ... 实现数据源逻辑 ...
@dataclass
class NewMonitorRuleConfig(BaseRuleConfig):
# ... 定义规则配置 ...
class RuleSynchronizer(BaseRuleSynchronizer):
# ... 实现规则同步逻辑 ...
在 veaiops/metrics/datasource_factory.py 中,更新 DataSourceFactory 以支持新数据源:
create_datasource 方法中添加对新数据源类型的处理逻辑。_create_new_monitor_datasource 私有方法,用于创建 NewMonitorDataSource 实例。validate_config 和 get_config_summary 方法中添加对新数据源的支持。示例:
class DataSourceFactory:
@staticmethod
def create_datasource(doc: DataSourceDocument) -> DataSource:
# ...
elif doc.type == "NewMonitor":
return DataSourceFactory._create_new_monitor_datasource(doc, common_params)
# ...
@staticmethod
def _create_new_monitor_datasource(doc: DataSourceDocument, common_params: Dict[str, Any]) -> NewMonitorDataSource:
# ... 实现创建逻辑 ...
在 veaiops/handler/routers/apis/v1/datasource/ 目录下创建一个新文件,例如 new_monitor.py,并实现与新数据源相关的API端点。
示例:
from fastapi import APIRouter
router = APIRouter()
@router.post("/new-monitor/some-action")
async def some_action():
# ... 实现API逻辑 ...
最后,在 veaiops/handler/routers/apis/v1/datasource/__init__.py 中包含新的路由。
在 veaiops/handler/services/datasource/ 目录中,根据需要更新或创建服务文件,以包含新数据源的业务逻辑。
- 通过遵循以上步骤,您可以将一个新的监控系统无缝集成到VeAIOps中。在整个过程中,请务必参考
aliyun.py、volcengine.py和zabbix.py的现有实现,以确保代码风格和架构的一致性。
VeAIOps的事件中心能够接收、处理和投递来自不同监控系统的告警事件。本节将指导您如何将一个新的告警系统集成到事件中心。
核心思想: 定义告警的数据结构(Payload),实现从Payload到内部
Event对象的转换逻辑,并创建一个API端点来接收Webhook请求。
定义告警Payload模型: 在 veaiops/schema/base/intelligent_threshold.py 中为新告警系统定义一个Pydantic模型。
实现事件转换逻辑: 在 veaiops/handler/services/event/converter/intelligent_threshold.py 中创建一个转换函数,将告警Payload转换为标准的 Event 对象。
创建API接收端点: 在 veaiops/handler/routers/apis/v1/event_center/event.py 中添加一个新的API端点,用于接收来自新告警系统的Webhook请求。
在 veaiops/schema/base/intelligent_threshold.py 文件中,创建一个继承自 BaseAlarmPayload 的新类,用于描述新告警系统的JSON数据结构。
示例: 为名为 NewAlarmSystem 的新系统添加Payload模型。
class NewAlarmSystemPayload(BaseAlarmPayload):
"""Payload model for NewAlarmSystem alarm events."""
alarm_id: str = Field(..., description="Unique ID of alarm")
severity: str = Field(..., description="Alarm severity (e.g., 'P0', 'P1')")
resource_name: str = Field(..., description="Name of affected resource")
details: Dict[str, Any] = Field(..., description="Detailed alarm information")
# ... 添加其他字段
在 veaiops/handler/services/event/converter/intelligent_threshold.py 中,您需要:
async 函数,如 convert_new_alarm_to_event,它接收 NewAlarmSystemPayload 对象并返回一个 Event 对象列表。Event 模型的字段上,例如 event_level, project, raw_data 等。handle_..._resource_event_with_merge 函数。convert_intelligent_threshold_alarm_to_event 函数中,添加一个处理新数据源类型的分支。示例:
async def convert_new_alarm_to_event(alarm: NewAlarmSystemPayload) -> Optional[List[Event]]:
"""Convert NewAlarmSystem alarm payload to Event."""
# 1. 映射告警级别
event_level = EventLevel.P2
if alarm.severity == "P0":
event_level = EventLevel.P0
# 2. 创建Event对象
event = Event(
agent_type=AgentType.INTELLIGENT_THRESHOLD,
datasource_type=DataSourceType.NewAlarmSystem, # 确保在DataSourceType枚举中已定义
event_level=event_level,
raw_data=alarm.model_dump(),
# ... 映射其他字段
)
await event.save()
return [event]
async def convert_intelligent_threshold_alarm_to_event(
source: DataSourceType, alarm: BaseAlarmPayload
) -> Optional[List[Event]]:
# ...
elif source == DataSourceType.NewAlarmSystem:
if isinstance(alarm, NewAlarmSystemPayload):
return await convert_new_alarm_to_event(alarm)
# ...
最后,在 veaiops/handler/routers/apis/v1/event_center/event.py 中添加一个新的API端点来接收Webhook。
示例:
from veaiops.schema.base.intelligent_threshold import NewAlarmSystemPayload
# ...
@router.post("/intelligent_threshold/new_alarm_system/", response_model=APIResponse[str], status_code=status.HTTP_200_OK)
async def create_new_alarm_system_intelligent_threshold_event(
background_tasks: BackgroundTasks,
raw_data: NewAlarmSystemPayload = Body(...),
) -> APIResponse[str]:
"""Create an intelligent threshold event from NewAlarmSystem alarm payload."""
logger.info(f"Creating intelligent threshold event from NewAlarmSystem with data: {raw_data}")
events = await convert_intelligent_threshold_alarm_to_event(DataSourceType.NewAlarmSystem, raw_data)
if events is None:
logger.info("No event created due to conversion failure")
raise BadRequestError(message="Failed to convert alarm to event")
for event in events:
# 在后台触发事件消费流程
background_tasks.add_task(consume_event, event=event)
logger.info(f"Intelligent threshold event {event.id} created or updated successfully")
return APIResponse(data=",".join(str(event.id) for event in events))