importasynciofromtypingimportAny,Dictfromgoogle.adk.tools.tool_contextimportToolContextfromveadkimportAgent,Runnerdefcalculator(a:float,b:float,operation:str,tool_context:ToolContext)->Dict[str,Any]:"""A simple calculator tool that performs basic arithmetic operations. Args: a (float): The first operand. b (float): The second operand. operation (str): The arithmetic operation to perform. Supported operations are "add", "subtract", "multiply", and "divide". Returns: Dict[str, Any]: A dictionary containing the result of the operation, the operation performed, and the status of the operation ("success" or "error"). """ifoperation=="add":return{"result":a+b,"operation":"+","status":"success"}ifoperation=="subtract":return{"result":a-b,"operation":"-","status":"success"}ifoperation=="multiply":return{"result":a*b,"operation":"*","status":"success"}ifoperation=="divide":return{"result":a/bifb!=0else"Error, divisor cannot be zero","operation":"/","status":"success"ifb!=0else"error",}return{"status":"error","message":"Unsupported operation"}agent=Agent(name="computing_agent",instruction="Please use the `calculator` tool to perform user-required calculations",tools=[calculator],)runner=Runner(agent=agent)response=asyncio.run(runner.run(messages="Add 2 and 3"))print(response)
importasynciofromgoogle.adk.tools.tool_contextimportToolContextfromveadkimportAgent,Runnerdefmessage_checker(user_message:str,tool_context:ToolContext,)->str:"""A user message checker tool that checks if the user message is valid. Args: user_message (str): The user message to check. Returns: str: The checked message. """print(f"user_message: {user_message}")print(f"current running agent name: {tool_context._invocation_context.agent.name}")print(f"app_name: {tool_context._invocation_context.app_name}")print(f"user_id: {tool_context._invocation_context.user_id}")print(f"session_id: {tool_context._invocation_context.session.id}")returnf"Checked message: {user_message.upper()}"agent=Agent(name="context_agent",tools=[message_checker],instruction="Use message_checker tool to check user message, and show the checked message",)runner=Runner(agent=agent)response=asyncio.run(runner.run(messages="Hello world!"))print(response)
defbig_data_processing(data_url:str)->dict[str,Any]:"""Process the big data from a specific data url. Args: data_url (str): The url of the big data to process. Returns: dict[str, Any]: A dictionary containing the result of the big data processing, the data url processed, and the status of the processing ("pending" or "finish"). """# create a new task for processing big data.return{"status":"pending","data-url":data_url,"task-id":"big-data-processing-1",}long_running_tool=LongRunningFunctionTool(func=big_data_processing)
APP_NAME="long_running_tool_app"USER_ID="long_running_tool_user"SESSION_ID="long_running_tool_session"agent=Agent(name="long_running_tool_agent",tools=[long_running_tool],instruction="Use long_running_tool to process big data",)runner=Runner(agent=agent,app_name=APP_NAME)# 初始化 Sessionsession=asyncio.run(runner.short_term_memory.create_session(app_name=APP_NAME,user_id=USER_ID,session_id=SESSION_ID))
asyncdefcall_agent_async(query):defget_long_running_function_call(event:Event)->FunctionCall|None:# Get the long running function call from the eventif(notevent.long_running_tool_idsornotevent.contentornotevent.content.parts):returnforpartinevent.content.parts:if(partandpart.function_callandevent.long_running_tool_idsandpart.function_call.idinevent.long_running_tool_ids):returnpart.function_calldefget_function_response(event:Event,function_call_id:str)->FunctionResponse|None:# Get the function response for the fuction call with specified id.ifnotevent.contentornotevent.content.parts:returnforpartinevent.content.parts:if(partandpart.function_responseandpart.function_response.id==function_call_id):returnpart.function_responsecontent=Content(role="user",parts=[Part(text=query)])print("Running agent...")events_async=runner.run_async(session_id=session.id,user_id=USER_ID,new_message=content)long_running_function_call,long_running_function_response,task_id=(None,None,None,)asyncforeventinevents_async:# Use helper to check for the specific auth request eventifnotlong_running_function_call:long_running_function_call=get_long_running_function_call(event)else:_potential_response=get_function_response(event,long_running_function_call.id)if_potential_response:# Only update if we get a non-None responselong_running_function_response=_potential_responsetask_id=long_running_function_response.response["task-id"]ifevent.contentandevent.content.parts:iftext:="".join(part.textor""forpartinevent.content.parts):print(f"[{event.author}]: {text}")iflong_running_function_response:# query the status of the correpsonding ticket via tciket_id# send back an intermediate / final responseupdated_response=long_running_function_response.model_copy(deep=True)updated_response.response={"status":"finish"}asyncforeventinrunner.run_async(session_id=session.id,user_id=USER_ID,new_message=Content(parts=[Part(function_response=updated_response)],role="user"),):ifevent.contentandevent.content.parts:iftext:="".join(part.textor""forpartinevent.content.parts):print(f"[{event.author}]: {text}")asyncio.run(call_agent_async("Process the big data from https://example.com/data.csv"))