diff --git a/contributing/samples/pubsub_publisher/__init__.py b/contributing/samples/pubsub_publisher/__init__.py new file mode 100644 index 000000000..44f7dab56 --- /dev/null +++ b/contributing/samples/pubsub_publisher/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import agent \ No newline at end of file diff --git a/contributing/samples/pubsub_publisher/agent.py b/contributing/samples/pubsub_publisher/agent.py new file mode 100644 index 000000000..353a1fd91 --- /dev/null +++ b/contributing/samples/pubsub_publisher/agent.py @@ -0,0 +1,116 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# --- Imports --- +# Vertex Agent Modules +from google.adk.agents import Agent # Base class for creating agents + + +# Other Python Modules +import warnings # For suppressing warnings +import logging # For controlling logging output +from dotenv import load_dotenv # For loading environment variables from a .env file + + +# Modules for Publishing Events to GCP Pub/Sub +import json # For working with JSON data, such as sending events to Pub/Sub +import uuid # For generating unique identifiers used for idempotency in event publishing +from typing import Optional # For type hinting optional parameters for gcp_project_id and pubsub_topic_id +from google.adk.tools import publish_event # For publishing events to Pub/Sub (not used in this script, but included for completeness) + + +# --- Configuration --- +load_dotenv() + + +# Ignore all warnings +warnings.filterwarnings("ignore") +# Set logging level to ERROR to suppress informational messages +logging.basicConfig(level=logging.INFO) + + +# --- Agent Tool Definitions --- + +# Tool function to publish an event to GCP Pub/Sub +def publish_to_gcp_pubsub_tool( + event_data_json: str, + event_type: str = "custom_agent_event", + gcp_project_id: Optional[str] = 'YOUR_GCP_PROJECT_ID', # If gcp_project_id is not provided, it will use the environment variable GOOGLE_CLOUD_PROJECT + pubsub_topic_id: Optional[str] = 'YOUR_PUBSUB_TOPIC_ID' # If pubsub_topic_id is not provided, it will use the environment variable PUBSUB_TOPIC_ID +) -> str: + """ + Publishes a structured event from the agent to a GCP Pub/Sub topic. + The agent should provide the event data as a JSON string. + Optionally, gcp_project_id and pubsub_topic_id can be provided to override environment settings. + + Args: + event_data_json: A JSON string representing the structured data for the event. + event_type: A string to categorize the event (e.g., 'decision_made', 'action_taken'). + gcp_project_id: The Google Cloud project ID. If not provided, uses the environment variable. + pubsub_topic_id: The Pub/Sub topic ID. If not provided, uses the environment variable. + """ + try: + event_data = json.loads(event_data_json) + event_data["app_message_id"] = str(uuid.uuid4()) + publish_event( + event_data=event_data, + event_type=event_type, + gcp_project_id=gcp_project_id, + pubsub_topic_id=pubsub_topic_id + ) + return f"Event (type: {event_type}) with data '{event_data_json}' has been queued for publishing." + except json.JSONDecodeError: + return "Error: The provided event_data_json was not valid JSON." + except Exception as e: + print(f"[AgentTool ERROR] Error attempting to publish event to GCP Pub/Sub: {e}") + return f"Error attempting to publish event to GCP Pub/Sub: {str(e)}" + + + +# --- Root Agent Definition --- +# @title Define the Root Agent + +# Initialize root agent variables +root_agent = None +runner_root = None # Initialize runner variable (although runner is created later) + + # Define the root agent (coordinator) +root_agent = Agent( + name="root_support_agent", # Name for the root agent + model="gemini-2.5-flash", # Model for the root agent (orchestration) + description="The main coordinator agent. Handles user requests and delegates tasks to specialist sub-agents and tools.", + instruction= # The core instructions defining the workflow + """ + You are the lead support coordinator agent. Your goal is to understand the customer's question or topic and provide insightful answers. + + You have access to specialized tools and sub-agents: + 1. Tool `publish_to_gcp_pubsub_tool`: Use this tool to publish information to GCP Pub/Sub. + + + + Your workflow: + 1. You will be provided with a request from a user. + 2. Inform the user you will begin the research (e.g., "Okay, I'll start researching that for you. Please wait a moment."). + 3. For key findings, use `publish_to_gcp_pubsub_tool` with `event_type="key_finding_identified"` and `event_data_json` containing the finding details in JSON format. + 4. Provide a summary of the research findings to the user. + + + """, + tools=[ + publish_to_gcp_pubsub_tool, # Tool to publish events to GCP Pub/Sub + ], + sub_agents=[ + ], + +) diff --git a/contributing/samples/pubsub_publisher/main.py b/contributing/samples/pubsub_publisher/main.py new file mode 100755 index 000000000..e38f3fbaa --- /dev/null +++ b/contributing/samples/pubsub_publisher/main.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import time +import warnings + +import agent +from dotenv import load_dotenv +from google.adk import Runner +from google.adk.artifacts import InMemoryArtifactService +from google.adk.cli.utils import logs +from google.adk.sessions import InMemorySessionService +from google.adk.sessions import Session +from google.genai import types + +load_dotenv(override=True) +warnings.filterwarnings('ignore', category=UserWarning) +logs.log_to_tmp_folder() + + +async def main(): + app_name = 'my_app' + user_id_1 = 'user1' + session_service = InMemorySessionService() + artifact_service = InMemoryArtifactService() + runner = Runner( + app_name=app_name, + agent=agent.root_agent, + artifact_service=artifact_service, + session_service=session_service, + ) + session_11 = await session_service.create_session(app_name=app_name, user_id=user_id_1) + + async def run_prompt(session: Session, new_message: str): + content = types.Content( + role='user', parts=[types.Part.from_text(text=new_message)] + ) + print('** User says:', content.model_dump(exclude_none=True)) + async for event in runner.run_async( + user_id=user_id_1, + session_id=session.id, + new_message=content, + ): + if event.content.parts and event.content.parts[0].text: + print(f'** {event.author}: {event.content.parts[0].text}') + + print('------------------------------------') + await run_prompt(session_11, 'Give me a summary of the first moon landing.') + + print('------------------------------------') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/google/adk/tools/pubsub_publisher.py b/src/google/adk/tools/pubsub_publisher.py new file mode 100644 index 000000000..cf74ba189 --- /dev/null +++ b/src/google/adk/tools/pubsub_publisher.py @@ -0,0 +1,116 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import asyncio +import os +import datetime +from typing import Dict, Any, Optional + +try: + from google.cloud import pubsub_v1 + from google.auth.exceptions import DefaultCredentialsError +except ImportError: + pubsub_v1 = None + DefaultCredentialsError = None + print("[PubSubPublisher WARNING] google-cloud-pubsub not installed. Pub/Sub features will be unavailable.") + + +_loop: Optional[asyncio.AbstractEventLoop] = None +_pubsub_publisher: Optional[Any] = None +_pubsub_topic_path: Optional[str] = None +_pubsub_enabled: bool = False + +def _pubsub_callback(future: Any) -> None: + """Callback for Pub/Sub publish results.""" + try: + message_id = future.result() + print(f"[PubSubPublisher INFO] Published Pub/Sub message with ID: {message_id}") + except Exception as e: + print(f"[PubSubPublisher ERROR] Failed to publish Pub/Sub message: {e}") + +def setup_pubsub_publisher_async( + gcp_project_id: Optional[str] = None, + pubsub_topic_id: Optional[str] = None +) -> None: + """ + Initializes the GCP Pub/Sub publisher. + Uses provided arguments or falls back to environment variables. + """ + global _loop, _pubsub_publisher, _pubsub_topic_path, _pubsub_enabled + + _loop = asyncio.get_running_loop() + + if pubsub_v1 is None: + print("[PubSubPublisher INFO] GCP Pub/Sub client library not found. Publishing disabled.") + _pubsub_enabled = False + return + + # Use provided args or get from environment + project_id = gcp_project_id or os.environ.get("GOOGLE_CLOUD_PROJECT") + topic_id = pubsub_topic_id or os.environ.get("PUBSUB_TOPIC_ID") + + if project_id and topic_id: + try: + _pubsub_publisher = pubsub_v1.PublisherClient() + _pubsub_topic_path = _pubsub_publisher.topic_path(project_id, topic_id) + _pubsub_enabled = True + print(f"[PubSubPublisher] GCP Pub/Sub publishing enabled for topic: {_pubsub_topic_path}") + except DefaultCredentialsError: + print("[PubSubPublisher ERROR] GCP Default Credentials not found. Publishing disabled.") + _pubsub_enabled = False + except Exception as e: + print(f"[PubSubPublisher ERROR] Failed to initialize GCP Pub/Sub publisher: {e}. Publishing disabled.") + _pubsub_enabled = False + else: + print("[PubSubPublisher INFO] GOOGLE_CLOUD_PROJECT and/or PUBSUB_TOPIC_ID not set. Publishing disabled.") + _pubsub_enabled = False + +def publish_event( + event_data: Dict[str, Any], + event_type: str = "custom_event", + gcp_project_id: Optional[str] = None, + pubsub_topic_id: Optional[str] = None +) -> None: + """ + Publishes a structured event to the configured GCP Pub/Sub topic. + """ + if not _pubsub_enabled: + setup_pubsub_publisher_async(gcp_project_id=gcp_project_id, pubsub_topic_id=pubsub_topic_id) + + if not _pubsub_enabled or _pubsub_publisher is None or _pubsub_topic_path is None or _loop is None: + return + + timestamp = datetime.datetime.utcnow().isoformat() + "Z" + + payload = { + "event_type": event_type, + "timestamp": timestamp, + "data": event_data, + } + + try: + data_bytes = json.dumps(payload).encode("utf-8") + + def do_publish(): + try: + publish_future = _pubsub_publisher.publish(_pubsub_topic_path, data_bytes) + publish_future.add_done_callback(_pubsub_callback) + except Exception as e: + print(f"[PubSubPublisher ERROR] Error when trying to initiate publish: {e}") + + _loop.call_soon_threadsafe(do_publish) + + except Exception as e: + print(f"[PubSubPublisher ERROR] Failed to prepare event for publishing: {e}")