-
Notifications
You must be signed in to change notification settings - Fork 215
Open
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.
Description
Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.
Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?
def streaming_pull( |
This is my current usage:
from google import pubsub_v1
subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
try:
print(f"Listening for messages on {self.subscription_path}...")
request = pubsub_v1.StreamingPullRequest(
subscription=self.subscription_path,
)
# Not sure if there is any other way to do this
async def request_generator():
yield request
print(f"stream: {request}"). # Code gets blocked here
stream = await self.subscriber.streaming_pull(requests=request_generator())
print(f"stream: {stream}")
# Handle the response
async for response in stream:
for received_message in response.received_messages:
print("Received message: ", received_message.message.data.decode('utf-8'))
except Exception as e:
raise e
kbondar17
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.