Skip to content

No Consumer Reader #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,249 @@ async def wait(fut):

await reader0.close()
await reader1.close()


@pytest.fixture()
def topic_selector(topic_with_messages):
return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0])


@pytest.mark.asyncio
class TestTopicNoConsumerReaderAsyncIO:
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_with_messages,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)

async def test_reader_with_no_event_handler_raises(self, driver, topic_with_messages):
with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_with_messages,
consumer=None,
)

async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
topic_selector.partitions = None

with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)

async def test_reader_with_default_lambda(self, driver, topic_selector):
reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)
msg = await reader.receive_message()

assert msg.seqno == 1

await reader.close()

async def test_reader_with_sync_lambda(self, driver, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_reader_with_async_lambda(self, driver, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
async def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_commit_not_allowed(self, driver, topic_selector):
reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)
batch = await reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
await reader.commit_with_ack(batch)

await reader.close()

async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
current_offset = 0

class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
nonlocal current_offset
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)
msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

await asyncio.sleep(0)

msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

await reader.close()


class TestTopicReaderWithoutConsumer:
def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages):
with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)

def test_reader_with_no_event_handler_raises(self, driver_sync, topic_with_messages):
with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
)

def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
topic_selector.partitions = None

with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)

def test_reader_with_default_lambda(self, driver_sync, topic_selector):
reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)
msg = reader.receive_message()

assert msg.seqno == 1

reader.close()

def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_reader_with_async_lambda(self, driver_sync, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
async def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_commit_not_allowed(self, driver_sync, topic_selector):
reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=ydb.TopicReaderEvents.EventHandler(),
)
batch = reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
reader.commit_with_ack(batch)

reader.close()

def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
current_offset = 0

class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
nonlocal current_offset
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)
msg = reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

msg = reader.receive_message()

assert msg.seqno == current_offset + 1

reader.close()
5 changes: 3 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,13 @@ def from_proto(
@dataclass
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
consumer: Optional[str]
auto_partitioning_support: bool

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
if self.consumer is not None:
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
Expand Down
81 changes: 81 additions & 0 deletions ydb/_topic_reader/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Union

from ..issues import ClientInternalError

__all__ = [
"OnCommit",
"OnPartitionGetStartOffsetRequest",
"OnPartitionGetStartOffsetResponse",
"OnInitPartition",
"OnShutdownPartition",
"EventHandler",
]


class BaseReaderEvent:
pass


@dataclass
class OnCommit(BaseReaderEvent):
topic: str
offset: int


@dataclass
class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
topic: str
partition_id: int


@dataclass
class OnPartitionGetStartOffsetResponse:
start_offset: int


class OnInitPartition(BaseReaderEvent):
pass


class OnShutdownPartition:
pass


TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]


class EventHandler:
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
pass

def on_partition_get_start_offset(
self,
event: OnPartitionGetStartOffsetRequest,
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
pass

def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
pass

def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
pass

async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
f = None
if isinstance(event, OnCommit):
f = self.on_commit
elif isinstance(event, OnPartitionGetStartOffsetRequest):
f = self.on_partition_get_start_offset
elif isinstance(event, OnInitPartition):
f = self.on_init_partition
elif isinstance(event, OnShutdownPartition):
f = self.on_shutdown_partition
else:
raise ClientInternalError("Unsupported topic reader event")

if asyncio.iscoroutinefunction(f):
return await f(event)

return f(event)
Loading
Loading