対象サイトを検索するai agent with Agent2Agent (A2A)
AI agentの作成からA2Aのサーバー、クライアントをpythonで簡単に試す用のをつくったのでメモ
エージェントの機能としては、適当な検索ワードを入力したらいい感じに自分のサイト内の記事を検索エンジンから取得して記事を一覧で教えてくれるやつ
https://blog.uni-3.app/gatsbyjs-gemini-search/ の改良版
A2A使ってみたかったのでgoogle adkでagentを定義して、作ってみた
実行してコマンド上で確認するまでのコード。書いて試すまでならgradioとかmcp sdkを使ったほうがはやくできそうだなってなったが、それはそれとして
環境
[project]
name = "agent"
version = "0.1.0"
description = "A multi-agent AI system"
requires-python = "~=3.11"
dependencies = [
"requests==2.32.3",
"beautifulsoup4==4.13.4",
"googlesearch-python==1.3.0",
"python-dotenv==1.1.0",
"starlette==0.46.2",
"a2a-sdk>=0.2.4",
"uvicorn[standard]>=0.29.0",
"google-adk>=1.2.1",
"google-genai>=1.19.0",
]
- uv 0.6.16
構成
以下のような構成。AIに書かせたので細かいところは気にしない
client→server→agent→toolが相互作用する感じのやつ
入力はユーザーの検索クエリ、出力はマークダウン形式でリンクとサマリ文書を添えて返ってくる
コード
ソースのフォルダ構成はこんな感じ
├── agent
├── client
└── server
agent/server周りは https://github.com/google-a2a/a2a-samples/tree/6e32a978569708840b5c320b121a1620ea05047b/samples/python/agents/google_adk
の公式サンプルコードを参考にした
プロンプトなども日本語で適当に設定している
clientは、ここではサーバーとやり取りするテスト用コードのこと。実際はAI agentにおけるインターフェイスを担う部分で、アプリケーション上で使うことも想定しているものらしい
環境設定
プロンプト
# constants.py
SYSTEM_PROMPT = (
"site:{}内の記事についてinputに基づく関連ワードとともに検索してください\n"
"結果をもとに、descriptionがあればそれを返す、なければ本文から約{}字程度の文字数で具体的に記事の内容を紹介して\n"
"- 日本語で作成すること\n"
"- 記事ごとにmarkdown形式の箇条書きで紹介すること\n"
"- 結果のみ返すこと\n"
)
.env geminiを使うのでGOOGLE_API_KEYを設定している
GOOGLE_API_KEY="AIxxxx"
TARGET_DOMAIN="blog.uni-3.app"
api keyの取得方法は以下
https://ai.google.dev/gemini-api/docs/api-key?hl=ja
agent
agentの定義とagentの返り値の設定。一応streamingでなんの関数(処理)をしているか逐次返すようにしてみている
コード
from typing import Any, AsyncIterable
from google.adk.agents.llm_agent import LlmAgent
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts import InMemoryArtifactService
from google.genai.types import Part, Content
# tools
from agent.keyword_extractor import extract_keywords
from agent.searcher import search_google
from agent.summarizer import fetch_article_text, summarize_text
from agent import constants
class SearchAgent:
"""An agent that handles requests."""
SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']
def __init__(self,
model_name: str,
target_domain: str,
max_length: int= 100):
self.model_name = model_name
self.target_domain = target_domain
self.max_length = max_length
self.max_article = 3
self._agent = self._build_agent()
self._user_id = 'search_agent'
self._runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
def get_processing_message(self, tool_name: str) -> str:
return f'Processing with {tool_name}...'
def _build_agent(self) -> LlmAgent:
"""
Builds the LLM agent for the search agent.
"""
return LlmAgent(
model=self.model_name,
name='search_agent',
description=(
'AI Agent that takes a user query, finds relevant articles from a specific domain, and returns their URLs and summaries. '
),
instruction=constants.SYSTEM_PROMPT.format(self.target_domain, self.max_length),
tools=[
extract_keywords,
search_google,
fetch_article_text,
summarize_text,
],
)
async def stream(self, query: str, session_id: str) -> AsyncIterable[dict[str, Any]]:
session = await self._runner.session_service.get_session(
app_name=self._agent.name,
user_id=self._user_id,
session_id=session_id,
)
content = Content(
role='user', parts=[Part.from_text(text=query)]
)
if session is None:
session = await self._runner.session_service.create_session(
app_name=self._agent.name,
user_id=self._user_id,
state={},
session_id=session_id,
)
async for event in self._runner.run_async(
user_id=self._user_id, session_id=session.id, new_message=content
):
if event.is_final_response():
response = ''
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
response = '\n'.join(
[p.text for p in event.content.parts if p.text]
)
elif (
event.content
and event.content.parts
and any(
[
True
for p in event.content.parts
if p.function_response
]
)
):
response = next(
p.function_response.model_dump()
for p in event.content.parts
)
yield {
'is_task_complete': True,
'content': response,
}
else:
parts = event.content.parts[0]
# if parts has function call
if parts.function_call:
yield {
'is_task_complete': False,
'updates': self.get_processing_message(parts.function_call.name),
}
def invoke(self, query: str, sessionId: str) -> dict[str, Any]:
return self.stream(query, sessionId)
tools
ただの関数。役割と分け方とかは適当。検索ワードの生成なんかはなくてもいい気がしているけど試していない。サマリの作成なんかではAI modelを呼び出している
検索ワードの生成
from google import genai
import os
import agent.constants as constants
# It's recommended to secure your API key by setting it as an environment variable.
# e.g., export GOOGLE_API_KEY="YOUR_API_KEY"
def extract_keywords(text: str) -> list[str]:
"""
Extracts relevant search keywords from the given text using a generative AI model.
Args:
text: The input text from which to extract keywords.
Returns:
A list of extracted keywords, or an empty list if an error occurs.
"""
try:
#api_key = os.getenv("GOOGLE_API_KEY")
#if not api_key:
# print("Error: GOOGLE_API_KEY environment variable not set.")
# return []
#genai.configure(api_key=api_key)
#model = genai.GenerativeModel('gemini-1.5-flash-001') # Or another suitable model
client = genai.Client()
prompt = f"Extract the most relevant search keywords from the following text. Return them as a comma-separated list. Text: {text}"
#response = model.generate_content(prompt)
response = client.models.generate_content(
model=constants.MODEL_NAME,
contents=prompt
)
# Ensure response.text is not None and not empty before processing
if response.text:
keywords = [keyword.strip() for keyword in response.text.split(',')]
return keywords
else:
# Log if response.text is None or empty
print("Error: No content in response.text from API or response.text is empty.")
# It's good practice to also log or inspect the full response object if available and not too large
# For example, print(f"Full response object: {response}")
# This can help diagnose issues like blocked prompts, API errors not caught by exceptions, etc.
return []
except Exception as e:
print(f"An error occurred during keyword extraction: {e}")
return []
検索
from googlesearch import search
def search_google(query: str, domain: str, num_results: int) -> list[str]:
"""
Performs a Google search for the given query within a specific domain.
Args:
query: The search query (e.g., keywords).
domain: The domain to restrict the search to (e.g., "example.com").
num_results: The desired number of search results.
Returns:
A list of URLs found, or an empty list if an error occurs or no results.
"""
try:
site_specific_query = f"{query} site:{domain}"
print(f"Searching for: {site_specific_query}") # For logging/debugging
# The googlesearch library returns a generator.
# We need to iterate a specific number of times, hence stop=num_results.
# lang='en' to prefer English results.
# pause=2.0 to avoid making too many requests too quickly.
# tld='com' can be used to specify the Google domain to use.
res = [res for res in search(
site_specific_query,
num_results=num_results,
lang='en',
unique=True,
)]
if len(res) == 0:
print(f"No results found for query: {site_specific_query}")
return res
except Exception as e:
# It's good to be specific about the type of error if possible,
# but a general Exception catch is okay for now.
print(f"An error occurred during Google search: {e}")
# In a real application, you might want to log this error to a file or monitoring system.
return []
サマリ作成
# src/agent/summarizer.py
import requests
from bs4 import BeautifulSoup
from google import genai
import agent.constants as constants
def fetch_article_text(url: str) -> str:
"""
Fetches the main textual content from a given article URL.
Args:
url: The URL of the article.
Returns:
The extracted text content, or an empty string if fetching or parsing fails.
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:133.0) Gecko/20100101 Firefox/133.0"
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
soup = BeautifulSoup(response.content, 'html.parser')
# Try to find a main content area, otherwise fallback to paragraphs
article_body = soup.find('article') # Common tag for articles
if not article_body:
article_body = soup.find('main') # Another common tag
if article_body:
paragraphs = article_body.find_all('p')
else:
paragraphs = soup.find_all('p')
if not paragraphs: # If no <p> tags, try getting all text and cleaning it
print(f"Warning: No <p> tags found in {url}. Falling back to soup.get_text().")
text_content = soup.get_text(separator='\n', strip=True)
# Further cleaning might be needed here if using get_text() broadly
return text_content
# Using get_text() as in the example, strip=True is often better in practice
text_content = '\n'.join([p.get_text() for p in paragraphs])
return text_content
except requests.exceptions.RequestException as e:
print(f"Error fetching URL {url}: {e}")
return ""
except Exception as e:
print(f"Error parsing content from {url}: {e}")
return ""
def summarize_text(text: str, max_words: int) -> str: # Changed max_length to max_words to match example
"""
Summarizes the given text using a generative AI model.
Args:
text: The text to summarize.
max_words: Approximate maximum number of words for the summary.
Returns:
The summarized text, or an empty string if an error occurs.
"""
if not text.strip():
print("Error: Cannot summarize empty text.")
return ""
try:
# Check if text is empty or too short to summarize
if not text or len(text.strip()) < 100: # Arbitrary threshold, e.g., 50 chars
print("Info: Input text is empty or too short to summarize.")
return "記事の内容を取得できなかったか、要約するには情報が不足しています。"
# api_key = os.getenv("GOOGLE_API_KEY")
# if not api_key:
# print("Error: GOOGLE_API_KEY environment variable not set for summarizer.")
# return ""
#genai.configure(api_key=api_key)
#model = genai.GenerativeModel('gemini-1.5-flash-001') # Or another suitable model
from .constants import SUMMARIZE_TEXT_PROMPT_TEMPLATE
prompt = SUMMARIZE_TEXT_PROMPT_TEMPLATE.format(max_words, text)
# Reduce text size if too long to avoid API limits, if necessary
max_input_length = 30000 # Check Gemini API limits (conservative estimate)
if len(text) > max_input_length:
print(f"Warning: Input text length ({len(text)} chars) exceeds max_input_length ({max_input_length}). Truncating text.")
text = text[:max_input_length]
client = genai.Client()
response = client.models.generate_content(
model=constants.MODEL_NAME,
contents=prompt
)
if response.text:
return response.text.strip()
else:
print("Error: No summary content in response from API or content was filtered.")
if hasattr(response, 'prompt_feedback') and response.prompt_feedback: # Check attribute before accessing
print(f"Prompt feedback: {response.prompt_feedback}")
return ""
except Exception as e:
print(f"An error occurred during summarization: {e}")
return ""
agent executor
返り値やagentが実行しているタスクの状態(処理中、終了など)の設定を書いている。役割としてはagentとサーバーのインターフェイスとして、両プロトコルに対するやりとりを担う
コード
# src/agent/main.py
import asyncio
import os
import json
import traceback
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TextPart, Message, Task, TaskState, Part, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from agent.agent import SearchAgent
import agent.constants as constants
class SearchAgentExecutor(AgentExecutor):
SUPPORTED_CONTENT_TYPES = SearchAgent.SUPPORTED_CONTENT_TYPES
"""
AI Agent that takes a user query, finds relevant articles from a specific domain,
and returns their URLs and summaries. Implements A2A AgentExecutor.
"""
def __init__(self, model_name: str = 'gemini-2.0-flash-001', target_domain: str = 'blog.uni-3.app', summary_max_length: int = 100):
self.agent = SearchAgent(model_name=model_name, target_domain=target_domain, max_length=summary_max_length)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task
# This agent always produces Task objects. If this request does
# not have current task, create a new one and use it.
if not task:
task = new_task(context.message)
event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.contextId)
# invoke the underlying agent, using streaming results. The streams
# now are update events.
async for item in self.agent.stream(query, task.contextId):
is_task_complete = item['is_task_complete']
if not is_task_complete:
updater.update_status(
TaskState.working,
new_agent_text_message(
item['updates'], task.contextId, task.id
),
)
continue
# If the response is a dictionary, assume its a form
if isinstance(item['content'], dict):
# Verify it is a valid form
if (
'response' in item['content']
and 'result' in item['content']['response']
):
data = json.loads(item['content']['response']['result'])
updater.update_status(
TaskState.input_required,
new_agent_parts_message(
[Part(root=DataPart(data=data))],
task.contextId,
task.id,
),
final=True,
)
continue
else:
updater.update_status(
TaskState.failed,
new_agent_text_message(
'Reaching an unexpected state',
task.contextId,
task.id,
),
final=True,
)
break
else:
# Emit the appropriate events
updater.add_artifact(
[Part(root=TextPart(text=item['content']))],
)
updater.complete()
break
async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())
server
サーバーが立ち上がる。雰囲気で書いている
外部に置く予定なのでsseとして実行
コード
# app.py
import logging
import os
import click
import uvicorn
from dotenv import load_dotenv
# A2A SDK Components
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
# Refactored Agent (which is an AgentExecutor)
from agent.agent_executor import SearchAgentExecutor
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
TARGET_DOMAIN = os.getenv("TARGET_DOMAIN", "example.com")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") # Checked here for early warning
class MissingAPIKeyError(Exception):
"""Exception for missing API key."""
pass
@click.command()
@click.option('--host', default=os.getenv('A2A_SERVER_HOST', 'localhost'), show_default=True, help="Host to bind the server to.")
@click.option('--port', default=int(os.getenv('A2A_PORT', '10008')), show_default=True, help="Port to bind the server to.")
def main(host: str, port: int):
try:
if not GOOGLE_API_KEY:
raise MissingAPIKeyError(
'GOOGLE_API_KEY environment variable not set.'
)
agent_executor_instance = SearchAgentExecutor(target_domain=TARGET_DOMAIN)
logger.info("AgentExecutor initialized successfully.")
except ValueError as e:
logger.error(f"CRITICAL: Error initializing AgentExecutor: {e}")
logger.error("Server will not start due to agent initialization failure.")
return
except MissingAPIKeyError as e:
logger.error(f'Error: {e}')
return
except Exception as e:
# Catch any other unexpected errors during Agent instantiation
logger.error(f"CRITICAL: Unexpected error initializing AgentExecutor: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
logger.error("Server will not start due to agent initialization failure.")
return
skill = AgentSkill(
id='process_query',
name='Process User Query',
description='Takes a user query, performs web searches on the configured domain, and returns summarized results.',
tags=['research', 'summarization', 'web_search'],
)
agent_card = AgentCard(
name='AI Research Agent',
description='summarizes and show link for search results.',
url=f'http://{host}:{port}/',
version='1.0.0',
defaultInputModes=SearchAgentExecutor.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=SearchAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=True),
skills=[skill],
)
request_handler = DefaultRequestHandler(
agent_executor=agent_executor_instance, # Pass the direct instance of our refactored Agent
task_store=InMemoryTaskStore()
)
a2a_app_starlette = A2AStarletteApplication(
agent_card=agent_card,
http_handler=request_handler
)
logger.info(f"Starting AI Agent server (A2A SDK) on http://{host}:{port}")
logger.info(f"Agent configured for target domain: {TARGET_DOMAIN}")
logger.info(f"Agent Card available at: http://{host}:{port}/.well-known/agent-card")
uvicorn.run(a2a_app_starlette.build(), host=host, port=port)
if __name__ == "__main__":
main()
client
cliベースでserverとやりとりするスクリプト。ターミナルからinput()
にて入力を受け付けて適当にレスポンスをprintする
ほぼこちらのコードを流用。responseのパース関数だけ書いた。でもちょっとしたらまた構造や、便利関数などがつくられそうではある
ここにa2aがやり取りに用いているJSON-RPCのプロトコルが定義されている
https://github.com/google-a2a/A2A/blob/main/types/src/types.ts
コード
# cli.py
import logging
from typing import Any, cast
from uuid import uuid4
import httpx
from a2a.client import A2AClient
from a2a.types import (
MessageSendParams,
SendStreamingMessageRequest,
SendStreamingMessageResponse,
SendStreamingMessageSuccessResponse,
JSONRPCErrorResponse,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
)
# Configure logging to show INFO level messages
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) # Get a logger instance
def parse_streaming_res(res: SendStreamingMessageResponse) -> str:
"""
テキストメッセージを適宜処理する
"""
root = res.root
if isinstance(root, SendStreamingMessageSuccessResponse):
root = cast(SendStreamingMessageSuccessResponse, root)
result = root.result
if isinstance(result, TaskStatusUpdateEvent):
if result.final == True:
return "finish!"
result = cast(TaskStatusUpdateEvent, result)
state = result.status.state.value
res = result.status.message.parts[0].root.text
return f"state: {state} message: {res}"
elif isinstance(result, TaskArtifactUpdateEvent):
result = cast(TaskArtifactUpdateEvent, result)
return result.artifact.parts[0].root.text
else:
return ""
else:
root = cast(JSONRPCErrorResponse, root)
return root.error.__str__()
async def main() -> None:
base_url = 'http://localhost:10008'
async with httpx.AsyncClient(timeout=600) as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, base_url=base_url
)
logger.info('A2AClient initialized.')
while True:
print("Please enter your query (type 'exit' to quit):")
user_input = input("\n> ")
if user_input.lower() == 'exit':
print("exit bye!")
exit(0)
# request message
send_message_payload: dict[str, Any] = {
'message': {
'role': 'user',
'parts': [
{
'kind': 'text',
'text': user_input,
}
],
'messageId': uuid4().hex,
},
}
streaming_request = SendStreamingMessageRequest(
params=MessageSendParams(**send_message_payload)
)
stream_response = client.send_message_streaming(streaming_request)
# show responses as they come
async for chunk in stream_response:
res = parse_streaming_res(chunk)
print(res)
continue
#### parse as json
# d = chunk.model_dump(mode='json', exclude_none=True)
# # if state is message by user or submitted skip
# #print(f"res: {d}")
# if d['result']['kind'] in ['message']:
# continue
# if d['result']['kind'] == 'status-update':
# if d['result']['final'] == True:
# print("finish!")
# continue
# print(f"state: {d['result']['status']['state']}, message: {d['result']['status']['message']['parts'][0]['text']}")
# continue
# if d['result']['kind'] == 'artifact-update':
# print(d['result']['artifact']['parts'][0]['text'])
# await asyncio.sleep(0.1)
if __name__ == '__main__':
import asyncio
asyncio.run(main())
使い方
実行
はじめに接続先serverの実行
uv run src/server/app.py
INFO:__main__:AgentExecutor initialized successfully.
INFO:__main__:Starting AI Agent server (A2A SDK) on http://localhost:10008
INFO:__main__:Agent configured for target domain: blog.uni-3.app
INFO:__main__:Agent Card available at: http://localhost:10008/.well-known/agent-card
INFO: Started server process [88696]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:10008 (Press CTRL+C to quit)
...
別ターミナルで
uv run src/client/cli.py
Please enter your query (type 'exit' to quit):
response
コウペンちゃん記事の検索。こういうのがやりたかった
- server
自動的に構造化されたログ出てくる。トークン数も出てくる
...
Function calls:
name: search_google, args: {'query': 'コウペンちゃん', 'num_results': 3, 'domain': 'blog.uni-3.app'}
Raw response:
{"candidates":[{"content":{"parts":[{"function_call":{"args":{"query":"コウペンちゃん","num_results":3,"domain":"blog.uni-3.app"},"name":"search_google"}}],"role":"model"},"finish_reason":"STOP","avg_logprobs":-0.0001001852919886771}],"model_version":"gemini-2.0-flash-001","usage_metadata":{"candidates_token_count":19,"candidates_tokens_details":[{"modality":"TEXT","token_count":19}],"prompt_token_count":450,"prompt_tokens_details":[{"modality":"TEXT","token_count":450}],"total_token_count":469},"automatic_function_calling_history":[]}
....
- client
> コウペンちゃん
INFO:httpx:HTTP Request: POST http://localhost:10008/ "HTTP/1.1 200 OK"
state: working message: Processing with extract_keywords...
state: working message: Processing with search_google...
state: working message: Processing with fetch_article_text...
* https://blog.uni-3.app/flutter-art-style-classify/: モバイルアプリで画像分類を行う方法を紹介。h5形式のモデルをtfliteに変換し、Flutterで予測モデルを読み込む手順を解説。画像パスの取得にはimage\_pickerを使用。記事では、画風分類モデルを使用し、コウペンちゃんがシュルレアリスムに分類される例を紹介。ソースコードはGitHubで公開されており、widgetのレイアウトとスタイルの分離、共通スタイルのまとめ、推論速度の改善などが今後の課題。
finish!
Please enter your query (type 'exit' to quit):
感想
- やり取りは全部テキスト、かつ複雑なタスクでもないのでコードやサーバー、クライアントのやり取りまで理解すれば、以降はあまり引っかかるところはなかった
- 対象サイトやページ数など、外部の変数の設定やプロンプトはあまり試行錯誤してないが、適当に関数の引数やプロンプトを設定したらいい感じにfunction callしてくれた
- モデルはgemini 2.0 flashをつかっているが、gemini 1.5にしたら、返り値の内容や指定内容の精度が低くなったので、モデルのよさで使いこなし度合いがあるっぽい
実装がほぼ指示書に、なので必要最低限な項目とかチューニングの見極めがむずそう
本来はサイトの検索機能として組み込みたいのでそこまでやりたい
参考
a2a server and client
- https://towardsdatascience.com/multi-agent-communication-with-the-a2a-python-sdk/
- https://github.com/google-a2a/a2a-samples/blob/d4bc2b2c214d57b1cf429a4a7cc70b69cb363fd2/samples/python/agents/azureaifoundry_sdk/azurefoundryagent/foundry_agent_executor.py#L27
a2a client with typescript