Skip to content

feat: Added the pubsub_publisher tool #1699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions contributing/samples/pubsub_publisher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import agent
116 changes: 116 additions & 0 deletions contributing/samples/pubsub_publisher/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# --- Imports ---
# Vertex Agent Modules
from google.adk.agents import Agent # Base class for creating agents


# Other Python Modules
import warnings # For suppressing warnings
import logging # For controlling logging output
from dotenv import load_dotenv # For loading environment variables from a .env file


# Modules for Publishing Events to GCP Pub/Sub
import json # For working with JSON data, such as sending events to Pub/Sub
import uuid # For generating unique identifiers used for idempotency in event publishing
from typing import Optional # For type hinting optional parameters for gcp_project_id and pubsub_topic_id
from google.adk.tools import publish_event # For publishing events to Pub/Sub (not used in this script, but included for completeness)


# --- Configuration ---
load_dotenv()


# Ignore all warnings
warnings.filterwarnings("ignore")
# Set logging level to ERROR to suppress informational messages
logging.basicConfig(level=logging.INFO)


# --- Agent Tool Definitions ---

# Tool function to publish an event to GCP Pub/Sub
def publish_to_gcp_pubsub_tool(
event_data_json: str,
event_type: str = "custom_agent_event",
gcp_project_id: Optional[str] = 'YOUR_GCP_PROJECT_ID', # If gcp_project_id is not provided, it will use the environment variable GOOGLE_CLOUD_PROJECT
pubsub_topic_id: Optional[str] = 'YOUR_PUBSUB_TOPIC_ID' # If pubsub_topic_id is not provided, it will use the environment variable PUBSUB_TOPIC_ID
) -> str:
"""
Publishes a structured event from the agent to a GCP Pub/Sub topic.
The agent should provide the event data as a JSON string.
Optionally, gcp_project_id and pubsub_topic_id can be provided to override environment settings.

Args:
event_data_json: A JSON string representing the structured data for the event.
event_type: A string to categorize the event (e.g., 'decision_made', 'action_taken').
gcp_project_id: The Google Cloud project ID. If not provided, uses the environment variable.
pubsub_topic_id: The Pub/Sub topic ID. If not provided, uses the environment variable.
"""
try:
event_data = json.loads(event_data_json)
event_data["app_message_id"] = str(uuid.uuid4())
publish_event(
event_data=event_data,
event_type=event_type,
gcp_project_id=gcp_project_id,
pubsub_topic_id=pubsub_topic_id
)
return f"Event (type: {event_type}) with data '{event_data_json}' has been queued for publishing."
except json.JSONDecodeError:
return "Error: The provided event_data_json was not valid JSON."
except Exception as e:
print(f"[AgentTool ERROR] Error attempting to publish event to GCP Pub/Sub: {e}")
return f"Error attempting to publish event to GCP Pub/Sub: {str(e)}"



# --- Root Agent Definition ---
# @title Define the Root Agent

# Initialize root agent variables
root_agent = None
runner_root = None # Initialize runner variable (although runner is created later)

# Define the root agent (coordinator)
root_agent = Agent(
name="root_support_agent", # Name for the root agent
model="gemini-2.5-flash", # Model for the root agent (orchestration)
description="The main coordinator agent. Handles user requests and delegates tasks to specialist sub-agents and tools.",
instruction= # The core instructions defining the workflow
"""
You are the lead support coordinator agent. Your goal is to understand the customer's question or topic and provide insightful answers.

You have access to specialized tools and sub-agents:
1. Tool `publish_to_gcp_pubsub_tool`: Use this tool to publish information to GCP Pub/Sub.



Your workflow:
1. You will be provided with a request from a user.
2. Inform the user you will begin the research (e.g., "Okay, I'll start researching that for you. Please wait a moment.").
3. For key findings, use `publish_to_gcp_pubsub_tool` with `event_type="key_finding_identified"` and `event_data_json` containing the finding details in JSON format.
4. Provide a summary of the research findings to the user.


""",
tools=[
publish_to_gcp_pubsub_tool, # Tool to publish events to GCP Pub/Sub
],
sub_agents=[
],

)
66 changes: 66 additions & 0 deletions contributing/samples/pubsub_publisher/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import time
import warnings

import agent
from dotenv import load_dotenv
from google.adk import Runner
from google.adk.artifacts import InMemoryArtifactService
from google.adk.cli.utils import logs
from google.adk.sessions import InMemorySessionService
from google.adk.sessions import Session
from google.genai import types

load_dotenv(override=True)
warnings.filterwarnings('ignore', category=UserWarning)
logs.log_to_tmp_folder()


async def main():
app_name = 'my_app'
user_id_1 = 'user1'
session_service = InMemorySessionService()
artifact_service = InMemoryArtifactService()
runner = Runner(
app_name=app_name,
agent=agent.root_agent,
artifact_service=artifact_service,
session_service=session_service,
)
session_11 = await session_service.create_session(app_name=app_name, user_id=user_id_1)

async def run_prompt(session: Session, new_message: str):
content = types.Content(
role='user', parts=[types.Part.from_text(text=new_message)]
)
print('** User says:', content.model_dump(exclude_none=True))
async for event in runner.run_async(
user_id=user_id_1,
session_id=session.id,
new_message=content,
):
if event.content.parts and event.content.parts[0].text:
print(f'** {event.author}: {event.content.parts[0].text}')

print('------------------------------------')
await run_prompt(session_11, 'Give me a summary of the first moon landing.')

print('------------------------------------')


if __name__ == '__main__':
asyncio.run(main())
116 changes: 116 additions & 0 deletions src/google/adk/tools/pubsub_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import asyncio
import os
import datetime
from typing import Dict, Any, Optional

try:
from google.cloud import pubsub_v1
from google.auth.exceptions import DefaultCredentialsError
except ImportError:
pubsub_v1 = None
DefaultCredentialsError = None
print("[PubSubPublisher WARNING] google-cloud-pubsub not installed. Pub/Sub features will be unavailable.")


_loop: Optional[asyncio.AbstractEventLoop] = None
_pubsub_publisher: Optional[Any] = None
_pubsub_topic_path: Optional[str] = None
_pubsub_enabled: bool = False

def _pubsub_callback(future: Any) -> None:
"""Callback for Pub/Sub publish results."""
try:
message_id = future.result()
print(f"[PubSubPublisher INFO] Published Pub/Sub message with ID: {message_id}")
except Exception as e:
print(f"[PubSubPublisher ERROR] Failed to publish Pub/Sub message: {e}")

def setup_pubsub_publisher_async(
gcp_project_id: Optional[str] = None,
pubsub_topic_id: Optional[str] = None
) -> None:
"""
Initializes the GCP Pub/Sub publisher.
Uses provided arguments or falls back to environment variables.
"""
global _loop, _pubsub_publisher, _pubsub_topic_path, _pubsub_enabled

_loop = asyncio.get_running_loop()

if pubsub_v1 is None:
print("[PubSubPublisher INFO] GCP Pub/Sub client library not found. Publishing disabled.")
_pubsub_enabled = False
return

# Use provided args or get from environment
project_id = gcp_project_id or os.environ.get("GOOGLE_CLOUD_PROJECT")
topic_id = pubsub_topic_id or os.environ.get("PUBSUB_TOPIC_ID")

if project_id and topic_id:
try:
_pubsub_publisher = pubsub_v1.PublisherClient()
_pubsub_topic_path = _pubsub_publisher.topic_path(project_id, topic_id)
_pubsub_enabled = True
print(f"[PubSubPublisher] GCP Pub/Sub publishing enabled for topic: {_pubsub_topic_path}")
except DefaultCredentialsError:
print("[PubSubPublisher ERROR] GCP Default Credentials not found. Publishing disabled.")
_pubsub_enabled = False
except Exception as e:
print(f"[PubSubPublisher ERROR] Failed to initialize GCP Pub/Sub publisher: {e}. Publishing disabled.")
_pubsub_enabled = False
else:
print("[PubSubPublisher INFO] GOOGLE_CLOUD_PROJECT and/or PUBSUB_TOPIC_ID not set. Publishing disabled.")
_pubsub_enabled = False

def publish_event(
event_data: Dict[str, Any],
event_type: str = "custom_event",
gcp_project_id: Optional[str] = None,
pubsub_topic_id: Optional[str] = None
) -> None:
"""
Publishes a structured event to the configured GCP Pub/Sub topic.
"""
if not _pubsub_enabled:
setup_pubsub_publisher_async(gcp_project_id=gcp_project_id, pubsub_topic_id=pubsub_topic_id)

if not _pubsub_enabled or _pubsub_publisher is None or _pubsub_topic_path is None or _loop is None:
return

timestamp = datetime.datetime.utcnow().isoformat() + "Z"

payload = {
"event_type": event_type,
"timestamp": timestamp,
"data": event_data,
}

try:
data_bytes = json.dumps(payload).encode("utf-8")

def do_publish():
try:
publish_future = _pubsub_publisher.publish(_pubsub_topic_path, data_bytes)
publish_future.add_done_callback(_pubsub_callback)
except Exception as e:
print(f"[PubSubPublisher ERROR] Error when trying to initiate publish: {e}")

_loop.call_soon_threadsafe(do_publish)

except Exception as e:
print(f"[PubSubPublisher ERROR] Failed to prepare event for publishing: {e}")
Loading