跳转至

API References

Agent

Bases: LlmAgent

LLM-based Agent with Volcengine capabilities.

This class represents an intelligent agent powered by LLMs (Large Language Models), integrated with Volcengine's AI framework. It supports memory modules, sub-agents, tracers, knowledge bases, and other advanced features for A2A (Agent-to-Agent) or user-facing scenarios.

属性:

名称 类型 描述
name str

The name of the agent.

description str

A description of the agent, useful in A2A scenarios.

instruction Union[str, InstructionProvider]

The instruction or instruction provider.

model_name str

Name of the model used by the agent.

model_provider str

Provider of the model (e.g., openai).

model_api_base str

The base URL of the model API.

model_api_key str

The API key for accessing the model.

model_extra_config dict

Extra configurations to include in model requests.

tools list[ToolUnion]

Tools available to the agent.

sub_agents list[BaseAgent]

Sub-agents managed by this agent.

knowledgebase Optional[KnowledgeBase]

Knowledge base attached to the agent.

short_term_memory Optional[ShortTermMemory]

Session-based memory for temporary context.

long_term_memory Optional[LongTermMemory]

Cross-session memory for persistent user context.

tracers list[BaseTracer]

List of tracers used for telemetry and monitoring.

源代码位于: veadk/agent.py
class Agent(LlmAgent):
    """LLM-based Agent with Volcengine capabilities.

    This class represents an intelligent agent powered by LLMs (Large Language Models),
    integrated with Volcengine's AI framework. It supports memory modules, sub-agents,
    tracers, knowledge bases, and other advanced features for A2A (Agent-to-Agent)
    or user-facing scenarios.

    Attributes:
        name (str): The name of the agent.
        description (str): A description of the agent, useful in A2A scenarios.
        instruction (Union[str, InstructionProvider]): The instruction or instruction provider.
        model_name (str): Name of the model used by the agent.
        model_provider (str): Provider of the model (e.g., openai).
        model_api_base (str): The base URL of the model API.
        model_api_key (str): The API key for accessing the model.
        model_extra_config (dict): Extra configurations to include in model requests.
        tools (list[ToolUnion]): Tools available to the agent.
        sub_agents (list[BaseAgent]): Sub-agents managed by this agent.
        knowledgebase (Optional[KnowledgeBase]): Knowledge base attached to the agent.
        short_term_memory (Optional[ShortTermMemory]): Session-based memory for temporary context.
        long_term_memory (Optional[LongTermMemory]): Cross-session memory for persistent user context.
        tracers (list[BaseTracer]): List of tracers used for telemetry and monitoring.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")

    name: str = DEFAULT_AGENT_NAME
    description: str = DEFAULT_DESCRIPTION
    instruction: Union[str, InstructionProvider] = DEFAULT_INSTRUCTION

    model_name: str = Field(default_factory=lambda: settings.model.name)
    model_provider: str = Field(default_factory=lambda: settings.model.provider)
    model_api_base: str = Field(default_factory=lambda: settings.model.api_base)
    model_api_key: str = Field(default_factory=lambda: settings.model.api_key)
    model_extra_config: dict = Field(default_factory=dict)

    tools: list[ToolUnion] = []

    sub_agents: list[BaseAgent] = Field(default_factory=list, exclude=True)

    knowledgebase: Optional[KnowledgeBase] = None

    short_term_memory: Optional[ShortTermMemory] = None
    long_term_memory: Optional[LongTermMemory] = None

    tracers: list[BaseTracer] = []

    run_processor: Optional[BaseRunProcessor] = Field(default=None, exclude=True)
    """Optional run processor for intercepting and processing agent execution flows.

    The run processor can be used to implement cross-cutting concerns such as:
    - Authentication flows (e.g., OAuth2 via VeIdentity)
    - Request/response logging
    - Error handling and retry logic
    - Performance monitoring

    If not provided, a NoOpRunProcessor will be used by default.

    Example:
        from veadk.integrations.ve_identity import AuthRequestProcessor

        agent = Agent(
            name="my-agent",
            run_processor=AuthRequestProcessor()
        )
    """

    enable_authz: bool = False

    def model_post_init(self, __context: Any) -> None:
        super().model_post_init(None)  # for sub_agents init

        # Initialize run_processor if not provided
        if self.run_processor is None:
            self.run_processor = NoOpRunProcessor()

        # combine user model config with VeADK defaults
        headers = DEFAULT_MODEL_EXTRA_CONFIG["extra_headers"].copy()
        body = DEFAULT_MODEL_EXTRA_CONFIG["extra_body"].copy()

        if self.model_extra_config:
            user_headers = self.model_extra_config.get("extra_headers", {})
            user_body = self.model_extra_config.get("extra_body", {})

            headers |= user_headers
            body |= user_body

        self.model_extra_config |= {
            "extra_headers": headers,
            "extra_body": body,
        }

        logger.info(f"Model extra config: {self.model_extra_config}")

        if not self.model:
            self.model = LiteLlm(
                model=f"{self.model_provider}/{self.model_name}",
                api_key=self.model_api_key,
                api_base=self.model_api_base,
                **self.model_extra_config,
            )
            logger.debug(
                f"LiteLLM client created with config: {self.model_extra_config}"
            )
        else:
            logger.warning(
                "You are trying to use your own LiteLLM client, some default request headers may be missing."
            )

        self._prepare_tracers()

        if self.knowledgebase:
            from veadk.tools.builtin_tools.load_knowledgebase import (
                LoadKnowledgebaseTool,
            )

            load_knowledgebase_tool = LoadKnowledgebaseTool(
                knowledgebase=self.knowledgebase
            )
            self.tools.append(load_knowledgebase_tool)

        if self.long_term_memory is not None:
            from google.adk.tools import load_memory

            if hasattr(load_memory, "custom_metadata"):
                if not load_memory.custom_metadata:
                    load_memory.custom_metadata = {}
                load_memory.custom_metadata["backend"] = self.long_term_memory.backend
            self.tools.append(load_memory)

        if self.enable_authz:
            from veadk.tools.builtin_tools.agent_authorization import (
                check_agent_authorization,
            )

            if self.before_agent_callback:
                if isinstance(self.before_agent_callback, list):
                    self.before_agent_callback.append(check_agent_authorization)
                else:
                    self.before_agent_callback = [
                        self.before_agent_callback,
                        check_agent_authorization,
                    ]
            else:
                self.before_agent_callback = check_agent_authorization

        logger.info(f"VeADK version: {VERSION}")

        logger.info(f"{self.__class__.__name__} `{self.name}` init done.")
        logger.debug(
            f"Agent: {self.model_dump(include={'name', 'model_name', 'model_api_base', 'tools'})}"
        )

    async def _run(
        self,
        runner,
        user_id: str,
        session_id: str,
        message: types.Content,
        stream: bool,
        run_processor: Optional[BaseRunProcessor] = None,
    ):
        """Internal run method with run processor support.

        Args:
            runner: The Runner instance.
            user_id: User ID for the session.
            session_id: Session ID.
            message: The message to send.
            stream: Whether to stream the output.
            run_processor: Optional run processor to use. If not provided, uses self.run_processor.

        Returns:
            The final output string.
        """
        stream_mode = StreamingMode.SSE if stream else StreamingMode.NONE

        # Use provided run_processor or fall back to instance's run_processor
        processor = run_processor or self.run_processor

        @processor.process_run(runner=runner, message=message)
        async def event_generator():
            async for event in runner.run_async(
                user_id=user_id,
                session_id=session_id,
                new_message=message,
                run_config=RunConfig(streaming_mode=stream_mode),
            ):
                if event.get_function_calls():
                    for function_call in event.get_function_calls():
                        logger.debug(f"Function call: {function_call}")
                elif (
                    event.content is not None
                    and event.content.parts[0].text is not None
                    and len(event.content.parts[0].text.strip()) > 0
                ):
                    yield event.content.parts[0].text

        final_output = ""
        async for chunk in event_generator():
            if stream:
                print(chunk, end="", flush=True)
            final_output += chunk
        if stream:
            print()  # end with a new line

        return final_output

    def _prepare_tracers(self):
        enable_apmplus_tracer = os.getenv("ENABLE_APMPLUS", "false").lower() == "true"
        enable_cozeloop_tracer = os.getenv("ENABLE_COZELOOP", "false").lower() == "true"
        enable_tls_tracer = os.getenv("ENABLE_TLS", "false").lower() == "true"

        if not (enable_apmplus_tracer or enable_cozeloop_tracer or enable_tls_tracer):
            logger.info("No exporter enabled by env, skip prepare tracers.")
            return

        if not self.tracers:
            from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer

            self.tracers.append(OpentelemetryTracer())

        exporters = self.tracers[0].exporters  # type: ignore

        from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter
        from veadk.tracing.telemetry.exporters.cozeloop_exporter import CozeloopExporter
        from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporter

        if enable_apmplus_tracer and not any(
            isinstance(e, APMPlusExporter) for e in exporters
        ):
            self.tracers[0].exporters.append(APMPlusExporter())  # type: ignore
            logger.info("Enable APMPlus exporter by env.")

        if enable_cozeloop_tracer and not any(
            isinstance(e, CozeloopExporter) for e in exporters
        ):
            self.tracers[0].exporters.append(CozeloopExporter())  # type: ignore
            logger.info("Enable CozeLoop exporter by env.")

        if enable_tls_tracer and not any(isinstance(e, TLSExporter) for e in exporters):
            self.tracers[0].exporters.append(TLSExporter())  # type: ignore
            logger.info("Enable TLS exporter by env.")

        logger.debug(
            f"Opentelemetry Tracer init {len(self.tracers[0].exporters)} exporters"  # type: ignore
        )

    async def run(
        self,
        prompt: str | list[str],
        stream: bool = False,
        app_name: str = "veadk_app",
        user_id: str = "veadk_user",
        session_id="veadk_session",
        load_history_sessions_from_db: bool = False,
        db_url: str = "",
        collect_runtime_data: bool = False,
        eval_set_id: str = "",
        save_session_to_memory: bool = False,
        run_processor: Optional[BaseRunProcessor] = None,
    ):
        """Running the agent. The runner and session service will be created automatically.

        For production, consider using Google-ADK runner to run agent, rather than invoking this method.

        Args:
            prompt (str | list[str]): The prompt to run the agent.
            stream (bool, optional): Whether to stream the output. Defaults to False.
            app_name (str, optional): The name of the application. Defaults to "veadk_app".
            user_id (str, optional): The id of the user. Defaults to "veadk_user".
            session_id (str, optional): The id of the session. Defaults to "veadk_session".
            load_history_sessions_from_db (bool, optional): Whether to load history sessions from database. Defaults to False.
            db_url (str, optional): The url of the database. Defaults to "".
            collect_runtime_data (bool, optional): Whether to collect runtime data. Defaults to False.
            eval_set_id (str, optional): The id of the eval set. Defaults to "".
            save_session_to_memory (bool, optional): Whether to save this turn session to memory. Defaults to False.
            run_processor (Optional[BaseRunProcessor], optional): Optional run processor to use for this run.
                If not provided, uses the agent's default run_processor. Defaults to None.
        """

        logger.warning(
            "Running agent in this function is only for development and testing, do not use this function in production. For production, consider using `Google ADK Runner` to run agent, rather than invoking this method."
        )
        logger.info(
            f"Run agent {self.name}: app_name: {app_name}, user_id: {user_id}, session_id: {session_id}."
        )
        prompt = [prompt] if isinstance(prompt, str) else prompt

        # memory service
        short_term_memory = ShortTermMemory(
            backend="database" if load_history_sessions_from_db else "local",
            db_url=db_url,
        )
        session_service = short_term_memory.session_service
        await short_term_memory.create_session(
            app_name=app_name, user_id=user_id, session_id=session_id
        )

        # runner
        runner = Runner(
            agent=self,
            app_name=app_name,
            session_service=session_service,
            memory_service=self.long_term_memory,
        )

        logger.info(f"Begin to process prompt {prompt}")
        # run
        final_output = ""
        for _prompt in prompt:
            message = types.Content(role="user", parts=[types.Part(text=_prompt)])
            final_output = await self._run(
                runner, user_id, session_id, message, stream, run_processor
            )

        # VeADK features
        if save_session_to_memory:
            assert self.long_term_memory is not None, (
                "Long-term memory is not initialized in agent"
            )
            session = await session_service.get_session(
                app_name=app_name,
                user_id=user_id,
                session_id=session_id,
            )
            if session:
                await self.long_term_memory.add_session_to_memory(session)
                logger.info(f"Add session `{session.id}` to your long-term memory.")
            else:
                logger.error(
                    f"Session {session_id} not found in session service, cannot save to long-term memory."
                )

        if collect_runtime_data:
            eval_set_recorder = EvalSetRecorder(session_service, eval_set_id)
            dump_path = await eval_set_recorder.dump(app_name, user_id, session_id)
            self._dump_path = dump_path  # just for test/debug/instrumentation

        if self.tracers:
            for tracer in self.tracers:
                tracer.dump(user_id=user_id, session_id=session_id)

        return final_output

run_processor = Field(default=None, exclude=True) class-attribute instance-attribute

Optional run processor for intercepting and processing agent execution flows.

The run processor can be used to implement cross-cutting concerns such as: - Authentication flows (e.g., OAuth2 via VeIdentity) - Request/response logging - Error handling and retry logic - Performance monitoring

If not provided, a NoOpRunProcessor will be used by default.

Example

from veadk.integrations.ve_identity import AuthRequestProcessor

agent = Agent( name="my-agent", run_processor=AuthRequestProcessor() )

run(prompt, stream=False, app_name='veadk_app', user_id='veadk_user', session_id='veadk_session', load_history_sessions_from_db=False, db_url='', collect_runtime_data=False, eval_set_id='', save_session_to_memory=False, run_processor=None) async

Running the agent. The runner and session service will be created automatically.

For production, consider using Google-ADK runner to run agent, rather than invoking this method.

参数:

名称 类型 描述 默认
prompt str | list[str]

The prompt to run the agent.

必需
stream bool

Whether to stream the output. Defaults to False.

False
app_name str

The name of the application. Defaults to "veadk_app".

'veadk_app'
user_id str

The id of the user. Defaults to "veadk_user".

'veadk_user'
session_id str

The id of the session. Defaults to "veadk_session".

'veadk_session'
load_history_sessions_from_db bool

Whether to load history sessions from database. Defaults to False.

False
db_url str

The url of the database. Defaults to "".

''
collect_runtime_data bool

Whether to collect runtime data. Defaults to False.

False
eval_set_id str

The id of the eval set. Defaults to "".

''
save_session_to_memory bool

Whether to save this turn session to memory. Defaults to False.

False
run_processor Optional[BaseRunProcessor]

Optional run processor to use for this run. If not provided, uses the agent's default run_processor. Defaults to None.

None
源代码位于: veadk/agent.py
async def run(
    self,
    prompt: str | list[str],
    stream: bool = False,
    app_name: str = "veadk_app",
    user_id: str = "veadk_user",
    session_id="veadk_session",
    load_history_sessions_from_db: bool = False,
    db_url: str = "",
    collect_runtime_data: bool = False,
    eval_set_id: str = "",
    save_session_to_memory: bool = False,
    run_processor: Optional[BaseRunProcessor] = None,
):
    """Running the agent. The runner and session service will be created automatically.

    For production, consider using Google-ADK runner to run agent, rather than invoking this method.

    Args:
        prompt (str | list[str]): The prompt to run the agent.
        stream (bool, optional): Whether to stream the output. Defaults to False.
        app_name (str, optional): The name of the application. Defaults to "veadk_app".
        user_id (str, optional): The id of the user. Defaults to "veadk_user".
        session_id (str, optional): The id of the session. Defaults to "veadk_session".
        load_history_sessions_from_db (bool, optional): Whether to load history sessions from database. Defaults to False.
        db_url (str, optional): The url of the database. Defaults to "".
        collect_runtime_data (bool, optional): Whether to collect runtime data. Defaults to False.
        eval_set_id (str, optional): The id of the eval set. Defaults to "".
        save_session_to_memory (bool, optional): Whether to save this turn session to memory. Defaults to False.
        run_processor (Optional[BaseRunProcessor], optional): Optional run processor to use for this run.
            If not provided, uses the agent's default run_processor. Defaults to None.
    """

    logger.warning(
        "Running agent in this function is only for development and testing, do not use this function in production. For production, consider using `Google ADK Runner` to run agent, rather than invoking this method."
    )
    logger.info(
        f"Run agent {self.name}: app_name: {app_name}, user_id: {user_id}, session_id: {session_id}."
    )
    prompt = [prompt] if isinstance(prompt, str) else prompt

    # memory service
    short_term_memory = ShortTermMemory(
        backend="database" if load_history_sessions_from_db else "local",
        db_url=db_url,
    )
    session_service = short_term_memory.session_service
    await short_term_memory.create_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )

    # runner
    runner = Runner(
        agent=self,
        app_name=app_name,
        session_service=session_service,
        memory_service=self.long_term_memory,
    )

    logger.info(f"Begin to process prompt {prompt}")
    # run
    final_output = ""
    for _prompt in prompt:
        message = types.Content(role="user", parts=[types.Part(text=_prompt)])
        final_output = await self._run(
            runner, user_id, session_id, message, stream, run_processor
        )

    # VeADK features
    if save_session_to_memory:
        assert self.long_term_memory is not None, (
            "Long-term memory is not initialized in agent"
        )
        session = await session_service.get_session(
            app_name=app_name,
            user_id=user_id,
            session_id=session_id,
        )
        if session:
            await self.long_term_memory.add_session_to_memory(session)
            logger.info(f"Add session `{session.id}` to your long-term memory.")
        else:
            logger.error(
                f"Session {session_id} not found in session service, cannot save to long-term memory."
            )

    if collect_runtime_data:
        eval_set_recorder = EvalSetRecorder(session_service, eval_set_id)
        dump_path = await eval_set_recorder.dump(app_name, user_id, session_id)
        self._dump_path = dump_path  # just for test/debug/instrumentation

    if self.tracers:
        for tracer in self.tracers:
            tracer.dump(user_id=user_id, session_id=session_id)

    return final_output

Runner

Bases: Runner

VeADK Runner that augments ADK with session, memory, tracing, and media upload.

This class builds on Google ADK's Runner and adds: - Integration with short-term memory (ShortTermMemory) for auto session management. - Optional long-term memory integration and session persistence. - New message interception and media upload to TOS. - Tracing dump and Trace ID logging. - A simplified run entry that supports multi-turn text/multimodal inputs.

属性:

名称 类型 描述
user_id str

Default user ID.

long_term_memory

Long-term memory service instance, or None if not set.

short_term_memory ShortTermMemory | None

Short-term memory instance used to auto-create/manage sessions.

upload_inline_data_to_tos bool

Whether to upload inline media to TOS while running.

session_service bool

Session service instance (may come from short-term memory).

memory_service bool

Memory service instance (may come from agent's long-term memory).

app_name str

Application name used in session management and object pathing.

Note

This class wraps the parent run_async at initialization to insert media upload and post-run handling. If you override the underlying run_async, ensure it remains compatible with this interception logic.

源代码位于: veadk/runner.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
class Runner(ADKRunner):
    """VeADK Runner that augments ADK with session, memory, tracing, and media upload.

    This class builds on Google ADK's ``Runner`` and adds:
    - Integration with short-term memory (ShortTermMemory) for auto session management.
    - Optional long-term memory integration and session persistence.
    - New message interception and media upload to TOS.
    - Tracing dump and Trace ID logging.
    - A simplified ``run`` entry that supports multi-turn text/multimodal inputs.

    Attributes:
        user_id (str): Default user ID.
        long_term_memory: Long-term memory service instance, or ``None`` if not set.
        short_term_memory (veadk.memory.short_term_memory.ShortTermMemory | None):
            Short-term memory instance used to auto-create/manage sessions.
        upload_inline_data_to_tos (bool): Whether to upload inline media to TOS while running.
        session_service: Session service instance (may come from short-term memory).
        memory_service: Memory service instance (may come from agent's long-term memory).
        app_name (str): Application name used in session management and object pathing.

    Note:
        This class wraps the parent ``run_async`` at initialization to insert media
        upload and post-run handling. If you override the underlying ``run_async``,
        ensure it remains compatible with this interception logic.
    """

    def __init__(
        self,
        agent: BaseAgent | Agent | None = None,
        short_term_memory: ShortTermMemory | None = None,
        app_name: str | None = None,
        user_id: str = "veadk_default_user",
        upload_inline_data_to_tos: bool = False,
        run_processor: "BaseRunProcessor | None" = None,
        *args,
        **kwargs,
    ) -> None:
        """Initialize a Runner instance.

        Selects the session service based on provided short-term memory or an
        external ``session_service``. If long-term memory or an external
        ``memory_service`` is provided, the passed service is preferred. After
        construction, it injects a message interception layer into the parent's
        ``run_async`` to support inline media upload and post-run handling.

        Args:
            agent (google.adk.agents.base_agent.BaseAgent | veadk.agent.Agent):
                The agent instance used to run interactions.
            short_term_memory (ShortTermMemory | None): Optional short-term memory; if
                not provided and no external `session_service` is supplied, an in-memory
                session service will be created.
            app_name (str): Application name. Defaults to `veadk_default_app`.
            user_id (str): Default user ID. Defaults to `veadk_default_user`.
            upload_inline_data_to_tos (bool): Whether to enable inline media upload. Defaults to `False`.
            run_processor (BaseRunProcessor | None): Optional run processor for intercepting agent execution.
                If not provided, will try to get from agent. If agent doesn't have one, uses NoOpRunProcessor.
            *args: Positional args passed through to `ADKRunner`.
            **kwargs: Keyword args passed through to `ADKRunner`; may include
                ``session_service`` and ``memory_service`` to override defaults.

        Returns:
            None

        Raises:
            None
        """
        self.user_id = user_id
        self.long_term_memory = None
        self.short_term_memory = short_term_memory
        self.upload_inline_data_to_tos = upload_inline_data_to_tos
        credential_service = kwargs.pop("credential_service", None)
        session_service = kwargs.pop("session_service", None)
        memory_service = kwargs.pop("memory_service", None)

        # Handle run_processor: priority is runner arg > agent.run_processor > NoOpRunProcessor
        if run_processor is not None:
            self.run_processor = run_processor
        elif hasattr(agent, "run_processor") and agent.run_processor is not None:  # type: ignore
            self.run_processor = agent.run_processor  # type: ignore
        else:
            from veadk.processors import NoOpRunProcessor

            self.run_processor = NoOpRunProcessor()

        if session_service:
            if short_term_memory:
                logger.warning(
                    "Short term memory is enabled, but session service is also provided. We will use session service from runner argument."
                )

        if not session_service:
            if short_term_memory:
                session_service = short_term_memory.session_service
                logger.debug(
                    f"Use session service {session_service} from short term memory."
                )
            else:
                logger.warning(
                    "No short term memory or session service provided, use an in-memory one instead."
                )
                short_term_memory = ShortTermMemory()
                self.short_term_memory = short_term_memory
                session_service = short_term_memory.session_service

        if memory_service:
            if hasattr(agent, "long_term_memory") and agent.long_term_memory:  # type: ignore
                self.long_term_memory = agent.long_term_memory  # type: ignore
                logger.warning(
                    "Long term memory in agent is enabled, but memory service is also provided. We will use memory service from runner argument."
                )

        if not memory_service:
            if hasattr(agent, "long_term_memory") and agent.long_term_memory:  # type: ignore
                self.long_term_memory = agent.long_term_memory  # type: ignore
                memory_service = agent.long_term_memory  # type: ignore
            else:
                logger.info("No long term memory provided.")

        # For forward compatibility, we pass app_name to ADKRunner.
        if not kwargs.get("app") and not app_name:
            app_name = "veadk_default_app"

        super().__init__(
            agent=agent,
            session_service=session_service,
            memory_service=memory_service,
            credential_service=credential_service,
            app_name=app_name,
            *args,
            **kwargs,
        )

        self.run_async = MethodType(
            intercept_new_message(_upload_image_to_tos)(super().run_async), self
        )

    async def run(
        self,
        messages: RunnerMessage,
        user_id: str = "",
        session_id: str = f"tmp-session-{formatted_timestamp()}",
        run_config: RunConfig | None = None,
        save_tracing_data: bool = False,
        upload_inline_data_to_tos: bool = False,
        run_processor: "BaseRunProcessor | None" = None,
    ):
        """Run a conversation with multi-turn text and multimodal inputs.

        When short-term memory is configured, a session is auto-created as needed.
        Inputs are converted into ADK message format. If ``upload_inline_data_to_tos``
        is ``True``, media upload is enabled temporarily for this run (does not change
        the Runner's global setting).

        Args:
            messages (RunnerMessage): Input messages (``str``, ``MediaMessage`` or a list of them).
            user_id (str): Override default user ID; if empty, uses the constructed ``user_id``.
            session_id (str): Session ID. Defaults to a timestamp-based temporary ID.
            run_config (google.adk.agents.RunConfig | None): Run config; if ``None``, a default
                config is created using the environment var ``MODEL_AGENT_MAX_LLM_CALLS``.
            save_tracing_data (bool): Whether to dump tracing data to disk after the run. Defaults to ``False``.
            upload_inline_data_to_tos (bool): Whether to enable media upload only for this run. Defaults to ``False``.
            run_processor (BaseRunProcessor | None): Optional run processor to use for this run.
                If not provided, uses the runner's default run_processor. Defaults to None.

        Returns:
            str: The textual output from the last event, if present; otherwise an empty string.

        Raises:
            ValueError: If an input contains an unsupported or unrecognized media type.
            AssertionError: If a media MIME type is not among ``image/*`` or ``video/*``.
            Exception: Exceptions from the underlying ADK/Agent execution may propagate.
        """
        if upload_inline_data_to_tos:
            _upload_inline_data_to_tos = self.upload_inline_data_to_tos
            self.upload_inline_data_to_tos = upload_inline_data_to_tos

        if not run_config:
            run_config = RunConfig(
                # streaming_mode=stream_mode,
                max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)),
            )
        logger.info(f"Run config: {run_config}")

        user_id = user_id or self.user_id

        converted_messages: list = _convert_messages(
            messages, self.app_name, user_id, session_id
        )

        if self.short_term_memory:
            session = await self.short_term_memory.create_session(
                app_name=self.app_name, user_id=user_id, session_id=session_id
            )
            assert session, (
                f"Failed to create session with app_name={self.app_name}, user_id={user_id}, session_id={session_id}, "
            )
            logger.debug(
                f"Auto create session: {session.id}, user_id: {session.user_id}, app_name: {self.app_name}"
            )

        final_output = ""
        for converted_message in converted_messages:
            try:

                @(run_processor or self.run_processor).process_run(
                    runner=self, message=converted_message
                )
                async def event_generator():
                    async for event in self.run_async(
                        user_id=user_id,
                        session_id=session_id,
                        new_message=converted_message,
                        run_config=run_config,
                    ):
                        yield event

                async for event in event_generator():
                    if (
                        event.content is not None
                        and event.content.parts
                        and event.content.parts[0].text is not None
                        and len(event.content.parts[0].text.strip()) > 0
                    ):
                        final_output = event.content.parts[0].text
            except LlmCallsLimitExceededError as e:
                logger.warning(f"Max number of llm calls limit exceeded: {e}")
                final_output = ""

        # try to save tracing file
        if save_tracing_data:
            self.save_tracing_file(session_id)

        self._print_trace_id()

        if upload_inline_data_to_tos:
            self.upload_inline_data_to_tos = _upload_inline_data_to_tos  # type: ignore

        return final_output

    def get_trace_id(self) -> str:
        """Get the Trace ID from the current agent's tracer.

        If the agent is not a :class:`veadk.agent.Agent` or no tracer is configured,
        returns ``"<unknown_trace_id>"``.

        Returns:
            str: The Trace ID or ``"<unknown_trace_id>"``.

        Raises:
            None
        """
        if not isinstance(self.agent, Agent):
            logger.warning(
                ("The agent is not an instance of VeADK Agent, no trace id provided.")
            )
            return "<unknown_trace_id>"

        if not self.agent.tracers:
            logger.warning(
                "No tracer is configured in the agent, no trace id provided."
            )
            return "<unknown_trace_id>"

        try:
            trace_id = self.agent.tracers[0].trace_id  # type: ignore
            return trace_id
        except Exception as e:
            logger.warning(f"Get tracer id failed as {e}")
            return "<unknown_trace_id>"

    def _print_trace_id(self) -> None:
        """Log the current tracer's Trace ID.

        If the agent is not a :class:`veadk.agent.Agent` or no tracer is configured,
        nothing is printed.

        Returns:
            None

        Raises:
            None
        """
        if not isinstance(self.agent, Agent):
            logger.warning(
                ("The agent is not an instance of VeADK Agent, no trace id provided.")
            )
            return

        if not self.agent.tracers:
            logger.warning(
                "No tracer is configured in the agent, no trace id provided."
            )
            return

        try:
            trace_id = self.agent.tracers[0].trace_id  # type: ignore
            logger.info(f"Trace id: {trace_id}")
        except Exception as e:
            logger.warning(f"Get tracer id failed as {e}")
            return

    def save_tracing_file(self, session_id: str) -> str:
        """Dump tracing data to disk and return the last written path.

        Only effective when the agent is one of
        Agent/SequentialAgent/ParallelAgent/LoopAgent and a tracer is configured;
        otherwise returns an empty string.

        Args:
            session_id (str): Session ID used to associate the tracing with a session.

        Returns:
            str: The tracing file path; returns an empty string on failure or when no tracer.

        Examples:
            You can save the tracing data to a local file.

            ```python
            import asyncio

            from veadk import Agent, Runner

            agent = Agent()

            runner = Runner(agent=agent)

            session_id = "session"
            asyncio.run(runner.run(messages="Hi!", session_id=session_id))

            path = runner.save_tracing_file(session_id=session_id)
            print(path)
            ```
        """
        if not isinstance(
            self.agent, (Agent, SequentialAgent, ParallelAgent, LoopAgent)
        ):
            logger.warning(
                (
                    "The agent is not an instance of Agent, SequentialAgent, ParallelAgent or LoopAgent, cannot save tracing file."
                )
            )
            return ""

        if not self.agent.tracers:
            logger.warning("No tracer is configured in the agent.")
            return ""

        try:
            dump_path = ""
            for tracer in self.agent.tracers:
                dump_path = tracer.dump(user_id=self.user_id, session_id=session_id)

            return dump_path
        except Exception as e:
            logger.error(f"Failed to save tracing file: {e}")
            return ""

    async def save_eval_set(self, session_id: str, eval_set_id: str = "default") -> str:
        """Save the current session as part of an evaluation set and return its path.

        Args:
            session_id (str): Session ID.
            eval_set_id (str): Evaluation set identifier. Defaults to ``"default"``.

        Returns:
            str: The exported evaluation set file path.

        Examples:
            You can save the specific session as a evaluation set in Google ADK format.

            ```python
            import asyncio

            from veadk import Agent, Runner

            agent = Agent()

            runner = Runner(agent=agent)

            session_id = "session"
            asyncio.run(runner.run(messages="Hi!", session_id=session_id))

            path = runner.save_eval_set(session_id=session_id)
            print(path)
            ```
        """
        eval_set_recorder = EvalSetRecorder(self.session_service, eval_set_id)
        eval_set_path = await eval_set_recorder.dump(
            self.app_name, self.user_id, session_id
        )
        return eval_set_path

    async def save_session_to_long_term_memory(
        self, session_id: str, user_id: str = "", app_name: str = "", **kwargs
    ) -> None:
        """Save the specified session to long-term memory.

        If ``long_term_memory`` is not configured, the function logs a warning and returns.
        It fetches the session from the session service and then calls the long-term memory's
        ``add_session_to_memory`` for persistence.

        Args:
            session_id (str): Session ID.
            user_id (str): Optional; override default user ID. If empty, uses ``self.user_id``.
            app_name (str): Optional; override default app name. If empty, uses ``self.app_name``.

        Examples:
            You can save a specific session to long-term memory.

            ```python
            import asyncio

            from veadk import Agent, Runner
            from veadk.memory import LongTermMemory

            APP_NAME = "app"

            agent = Agent(long_term_memory=LongTermMemory(backend="local", app_name=APP_NAME))

            session_id = "session"
            runner = Runner(agent=agent, app_name=APP_NAME)

            asyncio.run(runner.run(messages="Hi!", session_id=session_id))

            asyncio.run(runner.save_session_to_long_term_memory(session_id=session_id))
            ```
        """
        if not self.long_term_memory:
            logger.warning("Long-term memory is not enabled. Failed to save session.")
            return

        if not user_id:
            user_id = self.user_id

        if not app_name:
            app_name = self.app_name

        session = await self.session_service.get_session(
            app_name=app_name,
            user_id=user_id,
            session_id=session_id,
        )

        if not session:
            logger.error(
                f"Session {session_id} (app_name={app_name}, user_id={user_id}) not found in session service, cannot save to long-term memory."
            )
            return

        await self.long_term_memory.add_session_to_memory(session, kwargs=kwargs)
        logger.info(f"Add session `{session.id}` to long term memory.")

__init__(agent=None, short_term_memory=None, app_name=None, user_id='veadk_default_user', upload_inline_data_to_tos=False, run_processor=None, *args, **kwargs)

Initialize a Runner instance.

Selects the session service based on provided short-term memory or an external session_service. If long-term memory or an external memory_service is provided, the passed service is preferred. After construction, it injects a message interception layer into the parent's run_async to support inline media upload and post-run handling.

参数:

名称 类型 描述 默认
agent BaseAgent | Agent

The agent instance used to run interactions.

None
short_term_memory ShortTermMemory | None

Optional short-term memory; if not provided and no external session_service is supplied, an in-memory session service will be created.

None
app_name str

Application name. Defaults to veadk_default_app.

None
user_id str

Default user ID. Defaults to veadk_default_user.

'veadk_default_user'
upload_inline_data_to_tos bool

Whether to enable inline media upload. Defaults to False.

False
run_processor BaseRunProcessor | None

Optional run processor for intercepting agent execution. If not provided, will try to get from agent. If agent doesn't have one, uses NoOpRunProcessor.

None
*args

Positional args passed through to ADKRunner.

()
**kwargs

Keyword args passed through to ADKRunner; may include session_service and memory_service to override defaults.

{}

返回:

类型 描述
None

None

源代码位于: veadk/runner.py
def __init__(
    self,
    agent: BaseAgent | Agent | None = None,
    short_term_memory: ShortTermMemory | None = None,
    app_name: str | None = None,
    user_id: str = "veadk_default_user",
    upload_inline_data_to_tos: bool = False,
    run_processor: "BaseRunProcessor | None" = None,
    *args,
    **kwargs,
) -> None:
    """Initialize a Runner instance.

    Selects the session service based on provided short-term memory or an
    external ``session_service``. If long-term memory or an external
    ``memory_service`` is provided, the passed service is preferred. After
    construction, it injects a message interception layer into the parent's
    ``run_async`` to support inline media upload and post-run handling.

    Args:
        agent (google.adk.agents.base_agent.BaseAgent | veadk.agent.Agent):
            The agent instance used to run interactions.
        short_term_memory (ShortTermMemory | None): Optional short-term memory; if
            not provided and no external `session_service` is supplied, an in-memory
            session service will be created.
        app_name (str): Application name. Defaults to `veadk_default_app`.
        user_id (str): Default user ID. Defaults to `veadk_default_user`.
        upload_inline_data_to_tos (bool): Whether to enable inline media upload. Defaults to `False`.
        run_processor (BaseRunProcessor | None): Optional run processor for intercepting agent execution.
            If not provided, will try to get from agent. If agent doesn't have one, uses NoOpRunProcessor.
        *args: Positional args passed through to `ADKRunner`.
        **kwargs: Keyword args passed through to `ADKRunner`; may include
            ``session_service`` and ``memory_service`` to override defaults.

    Returns:
        None

    Raises:
        None
    """
    self.user_id = user_id
    self.long_term_memory = None
    self.short_term_memory = short_term_memory
    self.upload_inline_data_to_tos = upload_inline_data_to_tos
    credential_service = kwargs.pop("credential_service", None)
    session_service = kwargs.pop("session_service", None)
    memory_service = kwargs.pop("memory_service", None)

    # Handle run_processor: priority is runner arg > agent.run_processor > NoOpRunProcessor
    if run_processor is not None:
        self.run_processor = run_processor
    elif hasattr(agent, "run_processor") and agent.run_processor is not None:  # type: ignore
        self.run_processor = agent.run_processor  # type: ignore
    else:
        from veadk.processors import NoOpRunProcessor

        self.run_processor = NoOpRunProcessor()

    if session_service:
        if short_term_memory:
            logger.warning(
                "Short term memory is enabled, but session service is also provided. We will use session service from runner argument."
            )

    if not session_service:
        if short_term_memory:
            session_service = short_term_memory.session_service
            logger.debug(
                f"Use session service {session_service} from short term memory."
            )
        else:
            logger.warning(
                "No short term memory or session service provided, use an in-memory one instead."
            )
            short_term_memory = ShortTermMemory()
            self.short_term_memory = short_term_memory
            session_service = short_term_memory.session_service

    if memory_service:
        if hasattr(agent, "long_term_memory") and agent.long_term_memory:  # type: ignore
            self.long_term_memory = agent.long_term_memory  # type: ignore
            logger.warning(
                "Long term memory in agent is enabled, but memory service is also provided. We will use memory service from runner argument."
            )

    if not memory_service:
        if hasattr(agent, "long_term_memory") and agent.long_term_memory:  # type: ignore
            self.long_term_memory = agent.long_term_memory  # type: ignore
            memory_service = agent.long_term_memory  # type: ignore
        else:
            logger.info("No long term memory provided.")

    # For forward compatibility, we pass app_name to ADKRunner.
    if not kwargs.get("app") and not app_name:
        app_name = "veadk_default_app"

    super().__init__(
        agent=agent,
        session_service=session_service,
        memory_service=memory_service,
        credential_service=credential_service,
        app_name=app_name,
        *args,
        **kwargs,
    )

    self.run_async = MethodType(
        intercept_new_message(_upload_image_to_tos)(super().run_async), self
    )

get_trace_id()

Get the Trace ID from the current agent's tracer.

If the agent is not a :class:veadk.agent.Agent or no tracer is configured, returns "<unknown_trace_id>".

返回:

名称 类型 描述
str str

The Trace ID or "<unknown_trace_id>".

源代码位于: veadk/runner.py
def get_trace_id(self) -> str:
    """Get the Trace ID from the current agent's tracer.

    If the agent is not a :class:`veadk.agent.Agent` or no tracer is configured,
    returns ``"<unknown_trace_id>"``.

    Returns:
        str: The Trace ID or ``"<unknown_trace_id>"``.

    Raises:
        None
    """
    if not isinstance(self.agent, Agent):
        logger.warning(
            ("The agent is not an instance of VeADK Agent, no trace id provided.")
        )
        return "<unknown_trace_id>"

    if not self.agent.tracers:
        logger.warning(
            "No tracer is configured in the agent, no trace id provided."
        )
        return "<unknown_trace_id>"

    try:
        trace_id = self.agent.tracers[0].trace_id  # type: ignore
        return trace_id
    except Exception as e:
        logger.warning(f"Get tracer id failed as {e}")
        return "<unknown_trace_id>"

run(messages, user_id='', session_id=f'tmp-session-{formatted_timestamp()}', run_config=None, save_tracing_data=False, upload_inline_data_to_tos=False, run_processor=None) async

Run a conversation with multi-turn text and multimodal inputs.

When short-term memory is configured, a session is auto-created as needed. Inputs are converted into ADK message format. If upload_inline_data_to_tos is True, media upload is enabled temporarily for this run (does not change the Runner's global setting).

参数:

名称 类型 描述 默认
messages RunnerMessage

Input messages (str, MediaMessage or a list of them).

必需
user_id str

Override default user ID; if empty, uses the constructed user_id.

''
session_id str

Session ID. Defaults to a timestamp-based temporary ID.

f'tmp-session-{formatted_timestamp()}'
run_config RunConfig | None

Run config; if None, a default config is created using the environment var MODEL_AGENT_MAX_LLM_CALLS.

None
save_tracing_data bool

Whether to dump tracing data to disk after the run. Defaults to False.

False
upload_inline_data_to_tos bool

Whether to enable media upload only for this run. Defaults to False.

False
run_processor BaseRunProcessor | None

Optional run processor to use for this run. If not provided, uses the runner's default run_processor. Defaults to None.

None

返回:

名称 类型 描述
str

The textual output from the last event, if present; otherwise an empty string.

引发:

类型 描述
ValueError

If an input contains an unsupported or unrecognized media type.

AssertionError

If a media MIME type is not among image/* or video/*.

Exception

Exceptions from the underlying ADK/Agent execution may propagate.

源代码位于: veadk/runner.py
async def run(
    self,
    messages: RunnerMessage,
    user_id: str = "",
    session_id: str = f"tmp-session-{formatted_timestamp()}",
    run_config: RunConfig | None = None,
    save_tracing_data: bool = False,
    upload_inline_data_to_tos: bool = False,
    run_processor: "BaseRunProcessor | None" = None,
):
    """Run a conversation with multi-turn text and multimodal inputs.

    When short-term memory is configured, a session is auto-created as needed.
    Inputs are converted into ADK message format. If ``upload_inline_data_to_tos``
    is ``True``, media upload is enabled temporarily for this run (does not change
    the Runner's global setting).

    Args:
        messages (RunnerMessage): Input messages (``str``, ``MediaMessage`` or a list of them).
        user_id (str): Override default user ID; if empty, uses the constructed ``user_id``.
        session_id (str): Session ID. Defaults to a timestamp-based temporary ID.
        run_config (google.adk.agents.RunConfig | None): Run config; if ``None``, a default
            config is created using the environment var ``MODEL_AGENT_MAX_LLM_CALLS``.
        save_tracing_data (bool): Whether to dump tracing data to disk after the run. Defaults to ``False``.
        upload_inline_data_to_tos (bool): Whether to enable media upload only for this run. Defaults to ``False``.
        run_processor (BaseRunProcessor | None): Optional run processor to use for this run.
            If not provided, uses the runner's default run_processor. Defaults to None.

    Returns:
        str: The textual output from the last event, if present; otherwise an empty string.

    Raises:
        ValueError: If an input contains an unsupported or unrecognized media type.
        AssertionError: If a media MIME type is not among ``image/*`` or ``video/*``.
        Exception: Exceptions from the underlying ADK/Agent execution may propagate.
    """
    if upload_inline_data_to_tos:
        _upload_inline_data_to_tos = self.upload_inline_data_to_tos
        self.upload_inline_data_to_tos = upload_inline_data_to_tos

    if not run_config:
        run_config = RunConfig(
            # streaming_mode=stream_mode,
            max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)),
        )
    logger.info(f"Run config: {run_config}")

    user_id = user_id or self.user_id

    converted_messages: list = _convert_messages(
        messages, self.app_name, user_id, session_id
    )

    if self.short_term_memory:
        session = await self.short_term_memory.create_session(
            app_name=self.app_name, user_id=user_id, session_id=session_id
        )
        assert session, (
            f"Failed to create session with app_name={self.app_name}, user_id={user_id}, session_id={session_id}, "
        )
        logger.debug(
            f"Auto create session: {session.id}, user_id: {session.user_id}, app_name: {self.app_name}"
        )

    final_output = ""
    for converted_message in converted_messages:
        try:

            @(run_processor or self.run_processor).process_run(
                runner=self, message=converted_message
            )
            async def event_generator():
                async for event in self.run_async(
                    user_id=user_id,
                    session_id=session_id,
                    new_message=converted_message,
                    run_config=run_config,
                ):
                    yield event

            async for event in event_generator():
                if (
                    event.content is not None
                    and event.content.parts
                    and event.content.parts[0].text is not None
                    and len(event.content.parts[0].text.strip()) > 0
                ):
                    final_output = event.content.parts[0].text
        except LlmCallsLimitExceededError as e:
            logger.warning(f"Max number of llm calls limit exceeded: {e}")
            final_output = ""

    # try to save tracing file
    if save_tracing_data:
        self.save_tracing_file(session_id)

    self._print_trace_id()

    if upload_inline_data_to_tos:
        self.upload_inline_data_to_tos = _upload_inline_data_to_tos  # type: ignore

    return final_output

save_eval_set(session_id, eval_set_id='default') async

Save the current session as part of an evaluation set and return its path.

参数:

名称 类型 描述 默认
session_id str

Session ID.

必需
eval_set_id str

Evaluation set identifier. Defaults to "default".

'default'

返回:

名称 类型 描述
str str

The exported evaluation set file path.

示例:

You can save the specific session as a evaluation set in Google ADK format.

import asyncio

from veadk import Agent, Runner

agent = Agent()

runner = Runner(agent=agent)

session_id = "session"
asyncio.run(runner.run(messages="Hi!", session_id=session_id))

path = runner.save_eval_set(session_id=session_id)
print(path)
源代码位于: veadk/runner.py
async def save_eval_set(self, session_id: str, eval_set_id: str = "default") -> str:
    """Save the current session as part of an evaluation set and return its path.

    Args:
        session_id (str): Session ID.
        eval_set_id (str): Evaluation set identifier. Defaults to ``"default"``.

    Returns:
        str: The exported evaluation set file path.

    Examples:
        You can save the specific session as a evaluation set in Google ADK format.

        ```python
        import asyncio

        from veadk import Agent, Runner

        agent = Agent()

        runner = Runner(agent=agent)

        session_id = "session"
        asyncio.run(runner.run(messages="Hi!", session_id=session_id))

        path = runner.save_eval_set(session_id=session_id)
        print(path)
        ```
    """
    eval_set_recorder = EvalSetRecorder(self.session_service, eval_set_id)
    eval_set_path = await eval_set_recorder.dump(
        self.app_name, self.user_id, session_id
    )
    return eval_set_path

save_session_to_long_term_memory(session_id, user_id='', app_name='', **kwargs) async

Save the specified session to long-term memory.

If long_term_memory is not configured, the function logs a warning and returns. It fetches the session from the session service and then calls the long-term memory's add_session_to_memory for persistence.

参数:

名称 类型 描述 默认
session_id str

Session ID.

必需
user_id str

Optional; override default user ID. If empty, uses self.user_id.

''
app_name str

Optional; override default app name. If empty, uses self.app_name.

''

示例:

You can save a specific session to long-term memory.

import asyncio

from veadk import Agent, Runner
from veadk.memory import LongTermMemory

APP_NAME = "app"

agent = Agent(long_term_memory=LongTermMemory(backend="local", app_name=APP_NAME))

session_id = "session"
runner = Runner(agent=agent, app_name=APP_NAME)

asyncio.run(runner.run(messages="Hi!", session_id=session_id))

asyncio.run(runner.save_session_to_long_term_memory(session_id=session_id))
源代码位于: veadk/runner.py
async def save_session_to_long_term_memory(
    self, session_id: str, user_id: str = "", app_name: str = "", **kwargs
) -> None:
    """Save the specified session to long-term memory.

    If ``long_term_memory`` is not configured, the function logs a warning and returns.
    It fetches the session from the session service and then calls the long-term memory's
    ``add_session_to_memory`` for persistence.

    Args:
        session_id (str): Session ID.
        user_id (str): Optional; override default user ID. If empty, uses ``self.user_id``.
        app_name (str): Optional; override default app name. If empty, uses ``self.app_name``.

    Examples:
        You can save a specific session to long-term memory.

        ```python
        import asyncio

        from veadk import Agent, Runner
        from veadk.memory import LongTermMemory

        APP_NAME = "app"

        agent = Agent(long_term_memory=LongTermMemory(backend="local", app_name=APP_NAME))

        session_id = "session"
        runner = Runner(agent=agent, app_name=APP_NAME)

        asyncio.run(runner.run(messages="Hi!", session_id=session_id))

        asyncio.run(runner.save_session_to_long_term_memory(session_id=session_id))
        ```
    """
    if not self.long_term_memory:
        logger.warning("Long-term memory is not enabled. Failed to save session.")
        return

    if not user_id:
        user_id = self.user_id

    if not app_name:
        app_name = self.app_name

    session = await self.session_service.get_session(
        app_name=app_name,
        user_id=user_id,
        session_id=session_id,
    )

    if not session:
        logger.error(
            f"Session {session_id} (app_name={app_name}, user_id={user_id}) not found in session service, cannot save to long-term memory."
        )
        return

    await self.long_term_memory.add_session_to_memory(session, kwargs=kwargs)
    logger.info(f"Add session `{session.id}` to long term memory.")

save_tracing_file(session_id)

Dump tracing data to disk and return the last written path.

Only effective when the agent is one of Agent/SequentialAgent/ParallelAgent/LoopAgent and a tracer is configured; otherwise returns an empty string.

参数:

名称 类型 描述 默认
session_id str

Session ID used to associate the tracing with a session.

必需

返回:

名称 类型 描述
str str

The tracing file path; returns an empty string on failure or when no tracer.

示例:

You can save the tracing data to a local file.

import asyncio

from veadk import Agent, Runner

agent = Agent()

runner = Runner(agent=agent)

session_id = "session"
asyncio.run(runner.run(messages="Hi!", session_id=session_id))

path = runner.save_tracing_file(session_id=session_id)
print(path)
源代码位于: veadk/runner.py
def save_tracing_file(self, session_id: str) -> str:
    """Dump tracing data to disk and return the last written path.

    Only effective when the agent is one of
    Agent/SequentialAgent/ParallelAgent/LoopAgent and a tracer is configured;
    otherwise returns an empty string.

    Args:
        session_id (str): Session ID used to associate the tracing with a session.

    Returns:
        str: The tracing file path; returns an empty string on failure or when no tracer.

    Examples:
        You can save the tracing data to a local file.

        ```python
        import asyncio

        from veadk import Agent, Runner

        agent = Agent()

        runner = Runner(agent=agent)

        session_id = "session"
        asyncio.run(runner.run(messages="Hi!", session_id=session_id))

        path = runner.save_tracing_file(session_id=session_id)
        print(path)
        ```
    """
    if not isinstance(
        self.agent, (Agent, SequentialAgent, ParallelAgent, LoopAgent)
    ):
        logger.warning(
            (
                "The agent is not an instance of Agent, SequentialAgent, ParallelAgent or LoopAgent, cannot save tracing file."
            )
        )
        return ""

    if not self.agent.tracers:
        logger.warning("No tracer is configured in the agent.")
        return ""

    try:
        dump_path = ""
        for tracer in self.agent.tracers:
            dump_path = tracer.dump(user_id=self.user_id, session_id=session_id)

        return dump_path
    except Exception as e:
        logger.error(f"Failed to save tracing file: {e}")
        return ""

ShortTermMemory

Bases: BaseModel

Short term memory for agent execution.

The short term memory represents the context of the agent model. All content in the short term memory will be sent to agent model directly, including the system prompt, historical user prompt, and historical model responses.

属性:

名称 类型 描述
backend Literal['local', 'mysql', 'sqlite', 'postgresql', 'database']

The backend of short term memory: - local for in-memory storage - mysql for mysql / PostgreSQL storage - sqlite for locally sqlite storage

backend_configs dict

Configuration dict for init short term memory backend.

db_url str

Database connection url for init short term memory backend. For example, sqlite:///./test.db. Once set, it will override the backend parameter.

local_database_path str

Local database path, only used when backend is sqlite. Default to /tmp/veadk_local_database.db.

after_load_memory_callback Callable | None

A callback to be called after loading memory from the backend. The callback function should accept Session as an input.

示例:

In-memory simple memory

You can initialize a short term memory with in-memory storage:

from veadk import Agent, Runner
from veadk.memory.short_term_memory import ShortTermMemory
import asyncio

session_id = "veadk_playground_session"

agent = Agent()
short_term_memory = ShortTermMemory(backend="local")

runner = Runner(
    agent=agent, short_term_memory=short_term_memory)

# This invocation will be stored in short-term memory
response = asyncio.run(runner.run(
    messages="My name is VeADK", session_id=session_id
))
print(response)

# The history invocation can be fetched by model
response = asyncio.run(runner.run(
    messages="Do you remember my name?", session_id=session_id # keep the same `session_id`
))
print(response)
Memory with a Database URL

Also you can use a databasae connection URL to initialize a short-term memory:

from veadk.memory.short_term_memory import ShortTermMemory

short_term_memory = ShortTermMemory(db_url="...")
Memory with SQLite

Once you want to start the short term memory with a local SQLite, you can specify the backend to sqlite. It will create a local database in local_database_path:

from veadk.memory.short_term_memory import ShortTermMemory

short_term_memory = ShortTermMemory(backend="sqlite", local_database_path="")
源代码位于: veadk/memory/short_term_memory.py
class ShortTermMemory(BaseModel):
    """Short term memory for agent execution.

    The short term memory represents the context of the agent model. All content in the short term memory will be sent to agent model directly, including the system prompt, historical user prompt, and historical model responses.

    Attributes:
        backend (Literal["local", "mysql", "sqlite", "postgresql", "database"]):
            The backend of short term memory:
            - `local` for in-memory storage
            - `mysql` for mysql / PostgreSQL storage
            - `sqlite` for locally sqlite storage
        backend_configs (dict): Configuration dict for init short term memory backend.
        db_url (str):
            Database connection url for init short term memory backend.
            For example, `sqlite:///./test.db`. Once set, it will override the `backend` parameter.
        local_database_path (str):
            Local database path, only used when `backend` is `sqlite`.
            Default to `/tmp/veadk_local_database.db`.
        after_load_memory_callback (Callable | None):
            A callback to be called after loading memory from the backend. The callback function should accept `Session` as an input.

    Examples:
        ### In-memory simple memory

        You can initialize a short term memory with in-memory storage:

        ```python
        from veadk import Agent, Runner
        from veadk.memory.short_term_memory import ShortTermMemory
        import asyncio

        session_id = "veadk_playground_session"

        agent = Agent()
        short_term_memory = ShortTermMemory(backend="local")

        runner = Runner(
            agent=agent, short_term_memory=short_term_memory)

        # This invocation will be stored in short-term memory
        response = asyncio.run(runner.run(
            messages="My name is VeADK", session_id=session_id
        ))
        print(response)

        # The history invocation can be fetched by model
        response = asyncio.run(runner.run(
            messages="Do you remember my name?", session_id=session_id # keep the same `session_id`
        ))
        print(response)
        ```

        ### Memory with a Database URL

        Also you can use a databasae connection URL to initialize a short-term memory:

        ```python
        from veadk.memory.short_term_memory import ShortTermMemory

        short_term_memory = ShortTermMemory(db_url="...")
        ```

        ### Memory with SQLite

        Once you want to start the short term memory with a local SQLite, you can specify the backend to `sqlite`. It will create a local database in `local_database_path`:

        ```python
        from veadk.memory.short_term_memory import ShortTermMemory

        short_term_memory = ShortTermMemory(backend="sqlite", local_database_path="")
        ```
    """

    backend: Literal["local", "mysql", "sqlite", "postgresql", "database"] = "local"

    backend_configs: dict = Field(default_factory=dict)

    db_kwargs: dict = Field(default_factory=dict)

    db_url: str = ""

    local_database_path: str = "/tmp/veadk_local_database.db"

    after_load_memory_callback: Callable | None = None

    _session_service: BaseSessionService = PrivateAttr()

    def model_post_init(self, __context: Any) -> None:
        if self.db_url:
            logger.info("The `db_url` is set, ignore `backend` option.")
            if self.db_url.count("@") > 1 or self.db_url.count(":") > 2:
                logger.warning(
                    "Multiple `@` or `:` symbols detected in the database URL. "
                    "Please encode `username` or `password` with `urllib.parse.quote_plus`. "
                    "Examples: p@ssword→p%40ssword."
                )
            self._session_service = DatabaseSessionService(
                db_url=self.db_url, **self.db_kwargs
            )
        else:
            if self.backend == "database":
                logger.warning(
                    "Backend `database` is deprecated, use `sqlite` to create short term memory."
                )
                self.backend = "sqlite"
            match self.backend:
                case "local":
                    self._session_service = InMemorySessionService()
                case "mysql":
                    self._session_service = MysqlSTMBackend(
                        db_kwargs=self.db_kwargs, **self.backend_configs
                    ).session_service
                case "sqlite":
                    self._session_service = SQLiteSTMBackend(
                        local_path=self.local_database_path
                    ).session_service
                case "postgresql":
                    self._session_service = PostgreSqlSTMBackend(
                        db_kwargs=self.db_kwargs, **self.backend_configs
                    ).session_service

        if self.after_load_memory_callback:
            wrap_get_session_with_callbacks(
                self._session_service, self.after_load_memory_callback
            )

    @property
    def session_service(self) -> BaseSessionService:
        return self._session_service

    async def create_session(
        self,
        app_name: str,
        user_id: str,
        session_id: str,
    ) -> Session | None:
        """Create or retrieve a user session.

        Short term memory can attempt to create a new session for a given application and user. If a session with the same `session_id` already exists, it will be returned instead of creating a new one.

        If the underlying session service is backed by a database (`DatabaseSessionService`), the method first lists all existing sessions for the given `app_name` and `user_id` and logs the number of sessions found. It then checks whether a session with the specified `session_id` already exists:
        - If it exists → returns the existing session.
        - If it does not exist → creates and returns a new session.

        Args:
            app_name (str): The name of the application associated with the session.
            user_id (str): The unique identifier of the user.
            session_id (str): The unique identifier of the session to be created or retrieved.

        Returns:
            Session | None: The retrieved or newly created `Session` object, or `None` if the session creation failed.

        Examples:
            Create a new session manually:

            ```python
            import asyncio

            from veadk.memory import ShortTermMemory

            app_name = "app_name"
            user_id = "user_id"
            session_id = "session_id"

            short_term_memory = ShortTermMemory()

            session = asyncio.run(
                short_term_memory.create_session(
                    app_name=app_name, user_id=user_id, session_id=session_id
                )
            )

            print(session)

            session = asyncio.run(
                short_term_memory.session_service.get_session(
                    app_name=app_name, user_id=user_id, session_id=session_id
                )
            )

            print(session)
            ```
        """
        if isinstance(self._session_service, DatabaseSessionService):
            list_sessions_response = await self._session_service.list_sessions(
                app_name=app_name, user_id=user_id
            )

            logger.debug(
                f"Loaded {len(list_sessions_response.sessions)} sessions from db {self.db_url}."
            )

        session = await self._session_service.get_session(
            app_name=app_name, user_id=user_id, session_id=session_id
        )

        if session:
            logger.info(
                f"Session {session_id} already exists with app_name={app_name} user_id={user_id}."
            )
            return session
        else:
            return await self._session_service.create_session(
                app_name=app_name, user_id=user_id, session_id=session_id
            )

create_session(app_name, user_id, session_id) async

Create or retrieve a user session.

Short term memory can attempt to create a new session for a given application and user. If a session with the same session_id already exists, it will be returned instead of creating a new one.

If the underlying session service is backed by a database (DatabaseSessionService), the method first lists all existing sessions for the given app_name and user_id and logs the number of sessions found. It then checks whether a session with the specified session_id already exists: - If it exists → returns the existing session. - If it does not exist → creates and returns a new session.

参数:

名称 类型 描述 默认
app_name str

The name of the application associated with the session.

必需
user_id str

The unique identifier of the user.

必需
session_id str

The unique identifier of the session to be created or retrieved.

必需

返回:

类型 描述
Session | None

Session | None: The retrieved or newly created Session object, or None if the session creation failed.

示例:

Create a new session manually:

import asyncio

from veadk.memory import ShortTermMemory

app_name = "app_name"
user_id = "user_id"
session_id = "session_id"

short_term_memory = ShortTermMemory()

session = asyncio.run(
    short_term_memory.create_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )
)

print(session)

session = asyncio.run(
    short_term_memory.session_service.get_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )
)

print(session)
源代码位于: veadk/memory/short_term_memory.py
async def create_session(
    self,
    app_name: str,
    user_id: str,
    session_id: str,
) -> Session | None:
    """Create or retrieve a user session.

    Short term memory can attempt to create a new session for a given application and user. If a session with the same `session_id` already exists, it will be returned instead of creating a new one.

    If the underlying session service is backed by a database (`DatabaseSessionService`), the method first lists all existing sessions for the given `app_name` and `user_id` and logs the number of sessions found. It then checks whether a session with the specified `session_id` already exists:
    - If it exists → returns the existing session.
    - If it does not exist → creates and returns a new session.

    Args:
        app_name (str): The name of the application associated with the session.
        user_id (str): The unique identifier of the user.
        session_id (str): The unique identifier of the session to be created or retrieved.

    Returns:
        Session | None: The retrieved or newly created `Session` object, or `None` if the session creation failed.

    Examples:
        Create a new session manually:

        ```python
        import asyncio

        from veadk.memory import ShortTermMemory

        app_name = "app_name"
        user_id = "user_id"
        session_id = "session_id"

        short_term_memory = ShortTermMemory()

        session = asyncio.run(
            short_term_memory.create_session(
                app_name=app_name, user_id=user_id, session_id=session_id
            )
        )

        print(session)

        session = asyncio.run(
            short_term_memory.session_service.get_session(
                app_name=app_name, user_id=user_id, session_id=session_id
            )
        )

        print(session)
        ```
    """
    if isinstance(self._session_service, DatabaseSessionService):
        list_sessions_response = await self._session_service.list_sessions(
            app_name=app_name, user_id=user_id
        )

        logger.debug(
            f"Loaded {len(list_sessions_response.sessions)} sessions from db {self.db_url}."
        )

    session = await self._session_service.get_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )

    if session:
        logger.info(
            f"Session {session_id} already exists with app_name={app_name} user_id={user_id}."
        )
        return session
    else:
        return await self._session_service.create_session(
            app_name=app_name, user_id=user_id, session_id=session_id
        )

LongTermMemory

Bases: BaseMemoryService, BaseModel

Manages long-term memory storage and retrieval for applications.

This class provides an interface to store, retrieve, and manage long-term contextual information using different backend types (e.g., OpenSearch, Redis). It supports configuration of the backend service and retrieval behavior.

属性:

名称 类型 描述
backend Union[Literal['local', 'opensearch', 'redis', 'viking', 'viking_mem', 'mem0'], BaseLongTermMemoryBackend]

The type or instance of the long-term memory backend. Defaults to "opensearch".

backend_config dict

Configuration parameters for the selected backend. Defaults to an empty dictionary.

top_k int

The number of top similar documents to retrieve during search. Defaults to 5.

index str

The name of the index or collection used for storing memory items. Defaults to an empty string.

app_name str

The name of the application that owns this memory instance. Defaults to an empty string.

user_id str

Deprecated attribute. Retained for backward compatibility. Defaults to an empty string.

Notes

Please ensure that you have set the embedding-related configurations in environment variables.

示例:

Simple long-term memory

Once create a long-term memory withou any arguments, all configurations are come from environment variables.

import asyncio

from veadk import Agent, Runner
from veadk.memory.long_term_memory import LongTermMemory
from veadk.memory.short_term_memory import ShortTermMemory

app_name = "veadk_playground_app"
user_id = "veadk_playground_user"

long_term_memory = LongTermMemory(backend="local", app_name=app_name)

agent = Agent(long_term_memory=long_term_memory)

runner = Runner(
    agent=agent,
    app_name=app_name,
    user_id=user_id,
    short_term_memory=ShortTermMemory(),
)

# ===== add memory =====
session_id = "veadk_playground_session"
teaching_prompt = "I brought an ice-cream last week."

asyncio.run(runner.run(messages=teaching_prompt, session_id=session_id))
asyncio.run(
    runner.save_session_to_long_term_memory(session_id=session_id)
)  # save session to long-term memory


# ===== check memory =====
session_id = "veadk_playground_session_2"  # use a new session
student_prompt = "What I brought last week?"

response = asyncio.run(runner.run(messages=student_prompt, session_id=session_id))

print(response)
Create with a backend instance
from veadk.memory.long_term_memory import LongTermMemory
from veadk.memory.long_term_memory.backends import LongTermMemory

long_term_memory = LongTermMemory(backend=...)
Create with backend configurations
from veadk.memory.long_term_memory import LongTermMemory

long_term_memory = LongTermMemory(backend="", backend_config={})
源代码位于: veadk/memory/long_term_memory.py
class LongTermMemory(BaseMemoryService, BaseModel):
    """Manages long-term memory storage and retrieval for applications.

    This class provides an interface to store, retrieve, and manage long-term
    contextual information using different backend types (e.g., OpenSearch, Redis).
    It supports configuration of the backend service and retrieval behavior.

    Attributes:
        backend (Union[Literal["local", "opensearch", "redis", "viking", "viking_mem", "mem0"], BaseLongTermMemoryBackend]):
            The type or instance of the long-term memory backend. Defaults to "opensearch".

        backend_config (dict):
            Configuration parameters for the selected backend. Defaults to an empty dictionary.

        top_k (int):
            The number of top similar documents to retrieve during search. Defaults to 5.

        index (str):
            The name of the index or collection used for storing memory items. Defaults to an empty string.

        app_name (str):
            The name of the application that owns this memory instance. Defaults to an empty string.

        user_id (str):
            Deprecated attribute. Retained for backward compatibility. Defaults to an empty string.

    Notes:
        Please ensure that you have set the embedding-related configurations in environment variables.

    Examples:
        ### Simple long-term memory

        Once create a long-term memory withou any arguments, all configurations are come from **environment variables**.

        ```python
        import asyncio

        from veadk import Agent, Runner
        from veadk.memory.long_term_memory import LongTermMemory
        from veadk.memory.short_term_memory import ShortTermMemory

        app_name = "veadk_playground_app"
        user_id = "veadk_playground_user"

        long_term_memory = LongTermMemory(backend="local", app_name=app_name)

        agent = Agent(long_term_memory=long_term_memory)

        runner = Runner(
            agent=agent,
            app_name=app_name,
            user_id=user_id,
            short_term_memory=ShortTermMemory(),
        )

        # ===== add memory =====
        session_id = "veadk_playground_session"
        teaching_prompt = "I brought an ice-cream last week."

        asyncio.run(runner.run(messages=teaching_prompt, session_id=session_id))
        asyncio.run(
            runner.save_session_to_long_term_memory(session_id=session_id)
        )  # save session to long-term memory


        # ===== check memory =====
        session_id = "veadk_playground_session_2"  # use a new session
        student_prompt = "What I brought last week?"

        response = asyncio.run(runner.run(messages=student_prompt, session_id=session_id))

        print(response)
        ```

        ### Create with a backend instance

        ```python
        from veadk.memory.long_term_memory import LongTermMemory
        from veadk.memory.long_term_memory.backends import LongTermMemory

        long_term_memory = LongTermMemory(backend=...)
        ```

        ### Create with backend configurations

        ```python
        from veadk.memory.long_term_memory import LongTermMemory

        long_term_memory = LongTermMemory(backend="", backend_config={})
        ```
    """

    backend: Union[
        Literal["local", "opensearch", "redis", "viking", "viking_mem", "mem0"],
        BaseLongTermMemoryBackend,
    ] = "opensearch"

    backend_config: dict = Field(default_factory=dict)

    top_k: int = 5

    index: str = ""

    app_name: str = ""

    user_id: str = ""

    def model_post_init(self, __context: Any) -> None:
        # Once user define a backend instance, use it directly
        if isinstance(self.backend, BaseLongTermMemoryBackend):
            self._backend = self.backend
            self.index = self._backend.index
            logger.info(
                f"Initialized long term memory with provided backend instance {self._backend.__class__.__name__}, index={self.index}"
            )
            return

        # Once user define backend config, use it directly
        if self.backend_config:
            if "index" not in self.backend_config:
                logger.warning(
                    "Attribute `index` not provided in backend_config, use `index` or `app_name` instead."
                )
                self.backend_config["index"] = self.index or self.app_name

            logger.debug(
                f"Init {self.backend}, Use provided backend config: {self.backend_config}"
            )
            self._backend = _get_backend_cls(self.backend)(**self.backend_config)
            return

        # Check index
        self.index = self.index or self.app_name
        if not self.index:
            logger.warning(
                "Attribute `index` or `app_name` not provided, use `default_app` instead."
            )
            self.index = "default_app"

        # Forward compliance
        if self.backend == "viking_mem":
            logger.warning(
                "The `viking_mem` backend is deprecated, change to `viking` instead."
            )
            self.backend = "viking"

        self._backend = _get_backend_cls(self.backend)(index=self.index)

        logger.info(
            f"Initialized long term memory with provided backend instance {self._backend.__class__.__name__}, index={self.index}"
        )

    def _filter_and_convert_events(self, events: list[Event]) -> list[str]:
        final_events = []
        for event in events:
            # filter: bad event
            if not event.content or not event.content.parts:
                continue

            # filter: only add user event to memory to enhance retrieve performance
            if not event.author == "user":
                continue

            # filter: discard function call and function response
            if not event.content.parts[0].text:
                continue

            # convert: to string-format for storage
            message = event.content.model_dump(exclude_none=True, mode="json")

            final_events.append(json.dumps(message, ensure_ascii=False))
        return final_events

    @override
    async def add_session_to_memory(
        self,
        session: Session,
        **kwargs,
    ):
        """Add a chat session's events to the long-term memory backend.

        This method extracts and filters the events from a given `Session` object,
        converts them into serialized strings, and stores them into the long-term
        memory system. It is typically called after a chat session ends or when
        important contextual data needs to be persisted for future retrieval.

        Args:
            session (Session):
                The session object containing user ID and a list of events to persist.

        Examples:
            ```python
            session = Session(
                user_id="user_123",
                events=[
                    Event(role="user", content="I like Go and Rust."),
                    Event(role="assistant", content="Got it! I'll remember that."),
                ]
            )

            await memory_service.add_session_to_memory(session)
            # Logs:
            # Adding 2 events to long term memory: index=main
            # Added 2 events to long term memory: index=main, user_id=user_123
            ```
        """
        user_id = session.user_id
        event_strings = self._filter_and_convert_events(session.events)

        logger.info(
            f"Adding {len(event_strings)} events to long term memory: index={self.index}"
        )
        if self.backend == "viking":
            self._backend.save_memory(
                user_id=user_id, event_strings=event_strings, **kwargs
            )
        else:
            self._backend.save_memory(user_id=user_id, event_strings=event_strings)
        logger.info(
            f"Added {len(event_strings)} events to long term memory: index={self.index}, user_id={user_id}"
        )

    @override
    async def search_memory(
        self, *, app_name: str, user_id: str, query: str
    ) -> SearchMemoryResponse:
        """Search memory entries for a given user and query.

        This method queries the memory backend to retrieve the most relevant stored
        memory chunks for a given user and text query. It then converts those raw
        memory chunks into structured `MemoryEntry` objects to be returned to the caller.

        Args:
            app_name (str): Name of the application requesting the memory search.
            user_id (str): Unique identifier for the user whose memory is being queried.
            query (str): The text query to match against stored memory content.

        Returns:
            SearchMemoryResponse:
                An object containing a list of `MemoryEntry` items representing
                the retrieved memory snippets relevant to the query.

        Examples:
            ```python
            response = await memory_service.search_memory(
                app_name="chat_app",
                user_id="user_123",
                query="favorite programming language"
            )

            for memory in response.memories:
                print(memory.content.parts[0].text)
            # Output:
            # User likes Python and TypeScript for backend development.
            ```
        """
        logger.info(f"Search memory with query={query}")

        memory_chunks = []
        try:
            memory_chunks = self._backend.search_memory(
                query=query, top_k=self.top_k, user_id=user_id
            )
        except Exception as e:
            logger.error(
                f"Exception orrcus during memory search: {e}. Return empty memory chunks"
            )

        memory_events = []
        for memory in memory_chunks:
            try:
                memory_dict = json.loads(memory)
                try:
                    text = memory_dict["parts"][0]["text"]
                    role = memory_dict["role"]
                except KeyError as _:
                    # prevent not a standard text-based event
                    logger.warning(
                        f"Memory content: {memory_dict}. Skip return this memory."
                    )
                    continue
            except json.JSONDecodeError:
                # prevent the memory string is not dumped by `Event` class
                text = memory
                role = "user"

            memory_events.append(
                MemoryEntry(
                    author="user",
                    content=types.Content(parts=[types.Part(text=text)], role=role),
                )
            )

        logger.info(
            f"Return {len(memory_events)} memory events for query: {query} index={self.index} user_id={user_id}"
        )
        return SearchMemoryResponse(memories=memory_events)

add_session_to_memory(session, **kwargs) async

Add a chat session's events to the long-term memory backend.

This method extracts and filters the events from a given Session object, converts them into serialized strings, and stores them into the long-term memory system. It is typically called after a chat session ends or when important contextual data needs to be persisted for future retrieval.

参数:

名称 类型 描述 默认
session Session

The session object containing user ID and a list of events to persist.

必需

示例:

session = Session(
    user_id="user_123",
    events=[
        Event(role="user", content="I like Go and Rust."),
        Event(role="assistant", content="Got it! I'll remember that."),
    ]
)

await memory_service.add_session_to_memory(session)
# Logs:
# Adding 2 events to long term memory: index=main
# Added 2 events to long term memory: index=main, user_id=user_123
源代码位于: veadk/memory/long_term_memory.py
@override
async def add_session_to_memory(
    self,
    session: Session,
    **kwargs,
):
    """Add a chat session's events to the long-term memory backend.

    This method extracts and filters the events from a given `Session` object,
    converts them into serialized strings, and stores them into the long-term
    memory system. It is typically called after a chat session ends or when
    important contextual data needs to be persisted for future retrieval.

    Args:
        session (Session):
            The session object containing user ID and a list of events to persist.

    Examples:
        ```python
        session = Session(
            user_id="user_123",
            events=[
                Event(role="user", content="I like Go and Rust."),
                Event(role="assistant", content="Got it! I'll remember that."),
            ]
        )

        await memory_service.add_session_to_memory(session)
        # Logs:
        # Adding 2 events to long term memory: index=main
        # Added 2 events to long term memory: index=main, user_id=user_123
        ```
    """
    user_id = session.user_id
    event_strings = self._filter_and_convert_events(session.events)

    logger.info(
        f"Adding {len(event_strings)} events to long term memory: index={self.index}"
    )
    if self.backend == "viking":
        self._backend.save_memory(
            user_id=user_id, event_strings=event_strings, **kwargs
        )
    else:
        self._backend.save_memory(user_id=user_id, event_strings=event_strings)
    logger.info(
        f"Added {len(event_strings)} events to long term memory: index={self.index}, user_id={user_id}"
    )

search_memory(*, app_name, user_id, query) async

Search memory entries for a given user and query.

This method queries the memory backend to retrieve the most relevant stored memory chunks for a given user and text query. It then converts those raw memory chunks into structured MemoryEntry objects to be returned to the caller.

参数:

名称 类型 描述 默认
app_name str

Name of the application requesting the memory search.

必需
user_id str

Unique identifier for the user whose memory is being queried.

必需
query str

The text query to match against stored memory content.

必需

返回:

名称 类型 描述
SearchMemoryResponse SearchMemoryResponse

An object containing a list of MemoryEntry items representing the retrieved memory snippets relevant to the query.

示例:

response = await memory_service.search_memory(
    app_name="chat_app",
    user_id="user_123",
    query="favorite programming language"
)

for memory in response.memories:
    print(memory.content.parts[0].text)
# Output:
# User likes Python and TypeScript for backend development.
源代码位于: veadk/memory/long_term_memory.py
@override
async def search_memory(
    self, *, app_name: str, user_id: str, query: str
) -> SearchMemoryResponse:
    """Search memory entries for a given user and query.

    This method queries the memory backend to retrieve the most relevant stored
    memory chunks for a given user and text query. It then converts those raw
    memory chunks into structured `MemoryEntry` objects to be returned to the caller.

    Args:
        app_name (str): Name of the application requesting the memory search.
        user_id (str): Unique identifier for the user whose memory is being queried.
        query (str): The text query to match against stored memory content.

    Returns:
        SearchMemoryResponse:
            An object containing a list of `MemoryEntry` items representing
            the retrieved memory snippets relevant to the query.

    Examples:
        ```python
        response = await memory_service.search_memory(
            app_name="chat_app",
            user_id="user_123",
            query="favorite programming language"
        )

        for memory in response.memories:
            print(memory.content.parts[0].text)
        # Output:
        # User likes Python and TypeScript for backend development.
        ```
    """
    logger.info(f"Search memory with query={query}")

    memory_chunks = []
    try:
        memory_chunks = self._backend.search_memory(
            query=query, top_k=self.top_k, user_id=user_id
        )
    except Exception as e:
        logger.error(
            f"Exception orrcus during memory search: {e}. Return empty memory chunks"
        )

    memory_events = []
    for memory in memory_chunks:
        try:
            memory_dict = json.loads(memory)
            try:
                text = memory_dict["parts"][0]["text"]
                role = memory_dict["role"]
            except KeyError as _:
                # prevent not a standard text-based event
                logger.warning(
                    f"Memory content: {memory_dict}. Skip return this memory."
                )
                continue
        except json.JSONDecodeError:
            # prevent the memory string is not dumped by `Event` class
            text = memory
            role = "user"

        memory_events.append(
            MemoryEntry(
                author="user",
                content=types.Content(parts=[types.Part(text=text)], role=role),
            )
        )

    logger.info(
        f"Return {len(memory_events)} memory events for query: {query} index={self.index} user_id={user_id}"
    )
    return SearchMemoryResponse(memories=memory_events)

KnowledgeBase

Bases: BaseModel

A knowledge base for storing user-related information.

This class represents a knowledge base used to store and retrieve user-specific data. It supports multiple backend options, including in-memory, OpenSearch, Redis, and Volcengine's VikingDB. The knowledge base allows for efficient document retrieval based on similarity, with the ability to configure backend-specific settings.

属性:

名称 类型 描述
name str

The name of the knowledge base. Default is "user_knowledgebase".

description str

A description of the knowledge base. Default is "This knowledgebase stores some user-related information."

backend Union[Literal['local', 'opensearch', 'viking', 'redis'], BaseKnowledgebaseBackend]

The type of backend to use for storing and querying the knowledge base. Supported options include: - 'local' for in-memory storage (data is lost when the program exits). - 'opensearch' for OpenSearch (requires OpenSearch cluster). - 'viking' for Volcengine VikingDB (requires VikingDB service). - 'redis' for Redis with vector search capability (requires Redis). Default is 'local'.

backend_config dict

Configuration dictionary for the selected backend.

top_k int

The number of top similar documents to retrieve during a search. Default is 10.

app_name str

The name of the application associated with the knowledge base. If index is not provided, this value will be set to index.

index str

The name of the knowledge base index.

Notes

Please ensure that you have set the embedding-related configurations in environment variables.

示例:

Simple backend

Create a local knowledgebase:

from veadk import Agent, Runner
from veadk.knowledgebase.knowledgebase import KnowledgeBase
from veadk.memory.short_term_memory import ShortTermMemory

app_name = "veadk_playground_app"
user_id = "veadk_playground_user"
session_id = "veadk_playground_session"


knowledgebase = KnowledgeBase(backend="opensearch", app_name=app_name)
knowledgebase.add_from_files(files=[knowledgebase_file])

agent = Agent(knowledgebase=knowledgebase)

runner = Runner(
    agent=agent,
    short_term_memory=ShortTermMemory(),
    app_name=app_name,
    user_id=user_id,
)

response = await runner.run(
    messages="Tell me the secret of green.", session_id=session_id
)
print(response)
Initialize knowledgebase with metadata
from veadk.knowledgebase import KnowledgeBase

knowledgebase = KnowledgeBase(
    name="user_data",
    description="A knowledgebase contains user hobbies.",
    index="my_app",
)
Initialize knowledgebase with backend instance
import veadk.config  # noqa

from veadk.knowledgebase import KnowledgeBase
from veadk.knowledgebase.backends.in_memory_backend import InMemoryKnowledgeBackend

backend = InMemoryKnowledgeBackend(
    index="my_app",
    embedding_config=...,
)

knowledgebase = KnowledgeBase(
    name="user_data",
    description="A knowledgebase contains user hobbies.",
    backend=backend,
)
Initialize knowledgebase with backend config
from veadk.knowledgebase import KnowledgeBase

knowledgebase = KnowledgeBase(
    name="user_data",
    description="A knowledgebase contains user hobbies.",
    backend="local",
    backend_config={"index": "user_app"},
)
源代码位于: veadk/knowledgebase/knowledgebase.py
class KnowledgeBase(BaseModel):
    """A knowledge base for storing user-related information.

    This class represents a knowledge base used to store and retrieve user-specific data.
    It supports multiple backend options, including in-memory, OpenSearch, Redis, and Volcengine's
    VikingDB. The knowledge base allows for efficient document retrieval based on similarity,
    with the ability to configure backend-specific settings.

    Attributes:
        name (str): The name of the knowledge base. Default is "user_knowledgebase".
        description (str): A description of the knowledge base. Default is "This knowledgebase stores some user-related information."
        backend (Union[Literal["local", "opensearch", "viking", "redis"], BaseKnowledgebaseBackend]):
            The type of backend to use for storing and querying the knowledge base. Supported options include:
            - 'local' for in-memory storage (data is lost when the program exits).
            - 'opensearch' for OpenSearch (requires OpenSearch cluster).
            - 'viking' for Volcengine VikingDB (requires VikingDB service).
            - 'redis' for Redis with vector search capability (requires Redis).
            Default is 'local'.
        backend_config (dict): Configuration dictionary for the selected backend.
        top_k (int): The number of top similar documents to retrieve during a search. Default is 10.
        app_name (str): The name of the application associated with the knowledge base. If index is not provided, this value will be set to `index`.
        index (str): The name of the knowledge base index.

    Notes:
        Please ensure that you have set the embedding-related configurations in environment variables.

    Examples:
        ### Simple backend

        Create a local knowledgebase:

        ```python
        from veadk import Agent, Runner
        from veadk.knowledgebase.knowledgebase import KnowledgeBase
        from veadk.memory.short_term_memory import ShortTermMemory

        app_name = "veadk_playground_app"
        user_id = "veadk_playground_user"
        session_id = "veadk_playground_session"


        knowledgebase = KnowledgeBase(backend="opensearch", app_name=app_name)
        knowledgebase.add_from_files(files=[knowledgebase_file])

        agent = Agent(knowledgebase=knowledgebase)

        runner = Runner(
            agent=agent,
            short_term_memory=ShortTermMemory(),
            app_name=app_name,
            user_id=user_id,
        )

        response = await runner.run(
            messages="Tell me the secret of green.", session_id=session_id
        )
        print(response)
        ```

        ### Initialize knowledgebase with metadata

        ```python
        from veadk.knowledgebase import KnowledgeBase

        knowledgebase = KnowledgeBase(
            name="user_data",
            description="A knowledgebase contains user hobbies.",
            index="my_app",
        )
        ```

        ### Initialize knowledgebase with backend instance

        ```python
        import veadk.config  # noqa

        from veadk.knowledgebase import KnowledgeBase
        from veadk.knowledgebase.backends.in_memory_backend import InMemoryKnowledgeBackend

        backend = InMemoryKnowledgeBackend(
            index="my_app",
            embedding_config=...,
        )

        knowledgebase = KnowledgeBase(
            name="user_data",
            description="A knowledgebase contains user hobbies.",
            backend=backend,
        )
        ```

        ### Initialize knowledgebase with backend config

        ```python
        from veadk.knowledgebase import KnowledgeBase

        knowledgebase = KnowledgeBase(
            name="user_data",
            description="A knowledgebase contains user hobbies.",
            backend="local",
            backend_config={"index": "user_app"},
        )
        ```
    """

    name: str = "user_knowledgebase"

    description: str = "This knowledgebase stores some user-related information."

    backend: Union[
        Literal["local", "opensearch", "viking", "redis"], BaseKnowledgebaseBackend
    ] = "local"

    backend_config: dict = Field(default_factory=dict)

    top_k: int = 10

    app_name: str = ""

    index: str = ""

    def model_post_init(self, __context: Any) -> None:
        if isinstance(self.backend, BaseKnowledgebaseBackend):
            self._backend = self.backend
            self.index = self._backend.index
            logger.info(
                f"Initialized knowledgebase with provided backend instance {self._backend.__class__.__name__}"
            )
            return

        # Once user define backend config, use it directly
        if self.backend_config:
            self._backend = _get_backend_cls(self.backend)(**self.backend_config)
            return

        self.index = self.index or self.app_name
        if not self.index:
            raise ValueError("Either `index` or `app_name` must be provided.")

        logger.info(
            f"Initializing knowledgebase: backend={self.backend} index={self.index} top_k={self.top_k}"
        )
        self._backend = _get_backend_cls(self.backend)(index=self.index)
        logger.info(
            f"Initialized knowledgebase with backend {self._backend.__class__.__name__}"
        )

    def add_from_directory(self, directory: str, **kwargs) -> bool:
        """Add knowledge from file path to knowledgebase.

        Add the files in the directory to knowledgebase backend.

        Args:
            directory (str): The directory path that needs to store.

        Returns:
            bool: True if successfully store the knowledgebase, False otherwise.

        Examples:
            Store a directory to knowledgebase:

            ```python
            knowledgebase = Knowledgebase(backend="local")

            if knowledgebase.add_from_directory("./knowledgebase"):
                # add successfully
                ...
            else:
                raise RuntimeError("Uploaded directory failed.")
            ```
        """
        return self._backend.add_from_directory(directory=directory, **kwargs)

    def add_from_files(self, files: list[str], **kwargs) -> bool:
        """Add knowledge files to knowledgebase.

        Add a list of files to knowledgebase backend.

        Args:
            files (str): The list of files.

        Returns:
            bool: True if successfully store the knowledgebase, False otherwise.

        Examples:
            Store files to knowledgebase:

            ```python
            knowledgebase = Knowledgebase(backend="local")

            if knowledgebase.add_from_files("./knowledgebase"):
                # add successfully
                ...
            else:
                raise RuntimeError("Uploaded files failed.")
            ```
        """
        return self._backend.add_from_files(files=files, **kwargs)

    def add_from_text(self, text: str | list[str], **kwargs) -> bool:
        """Add a piece of text or a list of text to knowledgebase.

        The `text` can be a string or a list of string. The text will be embedded and stored by the corresponding backend.

        Args:
            text (str | list[str]): The text string or a list of text strings.

        Returns:
            bool: True if successfully store the knowledgebase, False otherwise.

        Examples:
            Store a string or a list of string to knowledgebase:

            ```python
            knowledgebase = Knowledgebase(backend="local")

            if knowledgebase.add_from_text("./knowledgebase"):
                # add successfully
                ...
            else:
                raise RuntimeError("Uploaded text failed.")
            ```
        """
        return self._backend.add_from_text(text=text, **kwargs)

    def search(self, query: str, top_k: int = 0, **kwargs) -> list[KnowledgebaseEntry]:
        """Search knowledge from knowledgebase"""
        top_k = top_k if top_k != 0 else self.top_k

        _entries = self._backend.search(query=query, top_k=top_k, **kwargs)

        entries = []
        for entry in _entries:
            if isinstance(entry, KnowledgebaseEntry):
                entries.append(entry)
            elif isinstance(entry, str):
                entries.append(KnowledgebaseEntry(content=entry))
            else:
                logger.error(
                    f"Unsupported entry type from backend search method: {type(entry)} with {entry}. Expected `KnowledgebaseEntry` or `str`. Skip for this entry."
                )

        return entries

    def __getattr__(self, name) -> Callable:
        """In case of knowledgebase have no backends' methods (`delete`, `list_chunks`, etc)

        For example, knowledgebase.delete(...) -> self._backend.delete(...)
        """
        return getattr(self._backend, name)

__getattr__(name)

In case of knowledgebase have no backends' methods (delete, list_chunks, etc)

For example, knowledgebase.delete(...) -> self._backend.delete(...)

源代码位于: veadk/knowledgebase/knowledgebase.py
def __getattr__(self, name) -> Callable:
    """In case of knowledgebase have no backends' methods (`delete`, `list_chunks`, etc)

    For example, knowledgebase.delete(...) -> self._backend.delete(...)
    """
    return getattr(self._backend, name)

add_from_directory(directory, **kwargs)

Add knowledge from file path to knowledgebase.

Add the files in the directory to knowledgebase backend.

参数:

名称 类型 描述 默认
directory str

The directory path that needs to store.

必需

返回:

名称 类型 描述
bool bool

True if successfully store the knowledgebase, False otherwise.

示例:

Store a directory to knowledgebase:

knowledgebase = Knowledgebase(backend="local")

if knowledgebase.add_from_directory("./knowledgebase"):
    # add successfully
    ...
else:
    raise RuntimeError("Uploaded directory failed.")
源代码位于: veadk/knowledgebase/knowledgebase.py
def add_from_directory(self, directory: str, **kwargs) -> bool:
    """Add knowledge from file path to knowledgebase.

    Add the files in the directory to knowledgebase backend.

    Args:
        directory (str): The directory path that needs to store.

    Returns:
        bool: True if successfully store the knowledgebase, False otherwise.

    Examples:
        Store a directory to knowledgebase:

        ```python
        knowledgebase = Knowledgebase(backend="local")

        if knowledgebase.add_from_directory("./knowledgebase"):
            # add successfully
            ...
        else:
            raise RuntimeError("Uploaded directory failed.")
        ```
    """
    return self._backend.add_from_directory(directory=directory, **kwargs)

add_from_files(files, **kwargs)

Add knowledge files to knowledgebase.

Add a list of files to knowledgebase backend.

参数:

名称 类型 描述 默认
files str

The list of files.

必需

返回:

名称 类型 描述
bool bool

True if successfully store the knowledgebase, False otherwise.

示例:

Store files to knowledgebase:

knowledgebase = Knowledgebase(backend="local")

if knowledgebase.add_from_files("./knowledgebase"):
    # add successfully
    ...
else:
    raise RuntimeError("Uploaded files failed.")
源代码位于: veadk/knowledgebase/knowledgebase.py
def add_from_files(self, files: list[str], **kwargs) -> bool:
    """Add knowledge files to knowledgebase.

    Add a list of files to knowledgebase backend.

    Args:
        files (str): The list of files.

    Returns:
        bool: True if successfully store the knowledgebase, False otherwise.

    Examples:
        Store files to knowledgebase:

        ```python
        knowledgebase = Knowledgebase(backend="local")

        if knowledgebase.add_from_files("./knowledgebase"):
            # add successfully
            ...
        else:
            raise RuntimeError("Uploaded files failed.")
        ```
    """
    return self._backend.add_from_files(files=files, **kwargs)

add_from_text(text, **kwargs)

Add a piece of text or a list of text to knowledgebase.

The text can be a string or a list of string. The text will be embedded and stored by the corresponding backend.

参数:

名称 类型 描述 默认
text str | list[str]

The text string or a list of text strings.

必需

返回:

名称 类型 描述
bool bool

True if successfully store the knowledgebase, False otherwise.

示例:

Store a string or a list of string to knowledgebase:

knowledgebase = Knowledgebase(backend="local")

if knowledgebase.add_from_text("./knowledgebase"):
    # add successfully
    ...
else:
    raise RuntimeError("Uploaded text failed.")
源代码位于: veadk/knowledgebase/knowledgebase.py
def add_from_text(self, text: str | list[str], **kwargs) -> bool:
    """Add a piece of text or a list of text to knowledgebase.

    The `text` can be a string or a list of string. The text will be embedded and stored by the corresponding backend.

    Args:
        text (str | list[str]): The text string or a list of text strings.

    Returns:
        bool: True if successfully store the knowledgebase, False otherwise.

    Examples:
        Store a string or a list of string to knowledgebase:

        ```python
        knowledgebase = Knowledgebase(backend="local")

        if knowledgebase.add_from_text("./knowledgebase"):
            # add successfully
            ...
        else:
            raise RuntimeError("Uploaded text failed.")
        ```
    """
    return self._backend.add_from_text(text=text, **kwargs)

search(query, top_k=0, **kwargs)

Search knowledge from knowledgebase

源代码位于: veadk/knowledgebase/knowledgebase.py
def search(self, query: str, top_k: int = 0, **kwargs) -> list[KnowledgebaseEntry]:
    """Search knowledge from knowledgebase"""
    top_k = top_k if top_k != 0 else self.top_k

    _entries = self._backend.search(query=query, top_k=top_k, **kwargs)

    entries = []
    for entry in _entries:
        if isinstance(entry, KnowledgebaseEntry):
            entries.append(entry)
        elif isinstance(entry, str):
            entries.append(KnowledgebaseEntry(content=entry))
        else:
            logger.error(
                f"Unsupported entry type from backend search method: {type(entry)} with {entry}. Expected `KnowledgebaseEntry` or `str`. Skip for this entry."
            )

    return entries