Skip to content

Async Streaming Pull Usage #1174

@ash2703

Description

@ash2703

Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.

Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?

This is my current usage:

from google import pubsub_v1
subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
    try:
        print(f"Listening for messages on {self.subscription_path}...")
        request = pubsub_v1.StreamingPullRequest(
            subscription=self.subscription_path,
        )

        # Not sure if there is any other way to do this
        async def request_generator():
            yield request
        print(f"stream: {request}"). # Code gets blocked here


        stream = await self.subscriber.streaming_pull(requests=request_generator())
        print(f"stream: {stream}")
        # Handle the response
        async for response in stream:
            for received_message in response.received_messages:
                print("Received message: ", received_message.message.data.decode('utf-8'))
    except Exception as e:
        raise e

Metadata

Metadata

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions