From b2e88b5cb1d5ceb1e1e06614c2b121e8875c67c8 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 27 Sep 2024 13:36:59 +0300 Subject: [PATCH 1/7] Implement max_messages on recieve_batch --- ydb/_topic_reader/topic_reader_asyncio.py | 46 +++++++++++++++++++---- ydb/_topic_reader/topic_reader_sync.py | 4 +- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 68ac5451..de8a7f91 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -99,6 +99,7 @@ async def wait_message(self): async def receive_batch( self, + max_messages: typing.Union[int, None] = None, ) -> typing.Union[datatypes.PublicBatch, None]: """ Get one messages batch from reader. @@ -107,7 +108,9 @@ async def receive_batch( use asyncio.wait_for for wait with timeout. """ await self._reconnector.wait_message() - return self._reconnector.receive_batch_nowait() + return self._reconnector.receive_batch_nowait( + max_messages=max_messages, + ) async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: """ @@ -214,8 +217,10 @@ async def wait_message(self): await self._state_changed.wait() self._state_changed.clear() - def receive_batch_nowait(self): - return self._stream_reader.receive_batch_nowait() + def receive_batch_nowait(self, max_messages: Optional[int] = None): + return self._stream_reader.receive_batch_nowait( + max_messages=max_messages, + ) def receive_message_nowait(self): return self._stream_reader.receive_message_nowait() @@ -383,17 +388,44 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: partition_session_id, batch = self._message_batches.popitem(last=False) return partition_session_id, batch - def receive_batch_nowait(self): + def _cut_batch_by_max_messages( + batch: datatypes.PublicBatch, + max_messages: int, + ) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]: + initial_length = len(batch.messages) + one_message_size = batch._bytes_size // initial_length + + new_batch = datatypes.PublicBatch( + messages=batch.messages[:max_messages], + _partition_session=batch._partition_session, + _bytes_size=one_message_size*max_messages, + _codec=batch._codec, + ) + + batch.messages = batch.messages[max_messages:] + batch._bytes_size = one_message_size * (initial_length - max_messages) + + return new_batch, batch + + def receive_batch_nowait(self, max_messages: Optional[int] = None): if self._get_first_error(): raise self._get_first_error() if not self._message_batches: return None - _, batch = self._get_first_batch() - self._buffer_release_bytes(batch._bytes_size) + part_sess_id, batch = self._get_first_batch() + + if max_messages is None or len(batch.messages) <= max_messages: + self._buffer_release_bytes(batch._bytes_size) + return batch + + cutted_batch, remaining_batch = self._cut_batch_by_max_messages(batch, max_messages) + + self._message_batches[part_sess_id] = remaining_batch + self._buffer_release_bytes(cutted_batch._bytes_size) - return batch + return cutted_batch def receive_message_nowait(self): if self._get_first_error(): diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index c266de82..3048d3c4 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -103,7 +103,9 @@ def receive_batch( self._check_closed() return self._caller.safe_call_with_result( - self._async_reader.receive_batch(), + self._async_reader.receive_batch( + max_messages=max_messages, + ), timeout, ) From b68cdd46adb00d702146e92e3e7a72116a6d3d6d Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 27 Sep 2024 13:36:59 +0300 Subject: [PATCH 2/7] style fixes --- ydb/_topic_reader/topic_reader_asyncio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index de8a7f91..5c177e8b 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -389,8 +389,8 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: return partition_session_id, batch def _cut_batch_by_max_messages( - batch: datatypes.PublicBatch, - max_messages: int, + batch: datatypes.PublicBatch, + max_messages: int, ) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]: initial_length = len(batch.messages) one_message_size = batch._bytes_size // initial_length @@ -398,7 +398,7 @@ def _cut_batch_by_max_messages( new_batch = datatypes.PublicBatch( messages=batch.messages[:max_messages], _partition_session=batch._partition_session, - _bytes_size=one_message_size*max_messages, + _bytes_size=one_message_size * max_messages, _codec=batch._codec, ) From 56e37000037ab4c2181831c52d19e3ea850bdef3 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 27 Sep 2024 13:36:59 +0300 Subject: [PATCH 3/7] added tests on max_messages --- ydb/_topic_reader/topic_reader_asyncio.py | 1 + .../topic_reader_asyncio_test.py | 90 +++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 5c177e8b..74747a67 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -389,6 +389,7 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: return partition_session_id, batch def _cut_batch_by_max_messages( + self, batch: datatypes.PublicBatch, max_messages: int, ) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]: diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 77bf57c3..40565133 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1148,6 +1148,96 @@ async def test_read_message( assert mess == expected_message assert dict(stream_reader._message_batches) == batches_after + @pytest.mark.parametrize( + "batches_before,max_messages,actual_messages,batches_after", + [ + ( + { + 0: PublicBatch( + messages=[stub_message(1)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ) + }, + None, + 1, + {}, + ), + ( + { + 0: PublicBatch( + messages=[stub_message(1), stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 1: PublicBatch( + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(1), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + }, + 1, + 1, + { + 1: PublicBatch( + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(1), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 0: PublicBatch( + messages=[stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + }, + ), + ( + { + 0: PublicBatch( + messages=[stub_message(1)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 1: PublicBatch( + messages=[stub_message(2), stub_message(3)], + _partition_session=stub_partition_session(1), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + }, + 100, + 1, + { + 1: PublicBatch( + messages=[stub_message(2), stub_message(3)], + _partition_session=stub_partition_session(1), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ) + }, + ), + ], + ) + async def test_read_batch_max_messages( + self, + stream_reader, + batches_before: typing.List[datatypes.PublicBatch], + max_messages: typing.Optional[int], + actual_messages: int, + batches_after: typing.List[datatypes.PublicBatch], + ): + stream_reader._message_batches = OrderedDict(batches_before) + batch = stream_reader.receive_batch_nowait(max_messages=max_messages) + + assert len(batch.messages) == actual_messages + assert stream_reader._message_batches == OrderedDict(batches_after) + async def test_receive_batch_nowait(self, stream, stream_reader, partition_session): assert stream_reader.receive_batch_nowait() is None From ffdd843a10a0419c42e2ffce90d56c5b61907dde Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 27 Sep 2024 13:47:20 +0300 Subject: [PATCH 4/7] refactor cutting new batch --- ydb/_topic_reader/datatypes.py | 16 ++++++++++++++++ ydb/_topic_reader/topic_reader_asyncio.py | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 0f15ff85..3e0a2797 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -179,3 +179,19 @@ def _extend(self, batch: PublicBatch) -> None: def _pop(self) -> Tuple[List[PublicMessage], bool]: msgs_left = True if len(self.messages) > 1 else False return self.messages.pop(0), msgs_left + + def _pop_batch(self, size: int) -> PublicBatch: + initial_length = len(self.messages) + one_message_size = self._bytes_size // initial_length + + new_batch = PublicBatch( + messages=self.messages[:size], + _partition_session=self._partition_session, + _bytes_size=one_message_size * size, + _codec=self._codec, + ) + + self.messages = self.messages[size:] + self._bytes_size = one_message_size * (initial_length - size) + + return new_batch diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 74747a67..f021ca24 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -421,9 +421,9 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None): self._buffer_release_bytes(batch._bytes_size) return batch - cutted_batch, remaining_batch = self._cut_batch_by_max_messages(batch, max_messages) + cutted_batch = batch._pop_batch(size=max_messages) - self._message_batches[part_sess_id] = remaining_batch + self._message_batches[part_sess_id] = batch self._buffer_release_bytes(cutted_batch._bytes_size) return cutted_batch From bf65dc358f12092d24718321ecaa5e3f3aa85ae4 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 30 Sep 2024 11:02:32 +0300 Subject: [PATCH 5/7] review fixes --- ydb/_topic_reader/datatypes.py | 6 +++++- ydb/_topic_reader/topic_reader_asyncio.py | 20 -------------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 3e0a2797..d3ecd180 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -182,6 +182,10 @@ def _pop(self) -> Tuple[List[PublicMessage], bool]: def _pop_batch(self, size: int) -> PublicBatch: initial_length = len(self.messages) + + if size >= initial_length: + raise ValueError("Pop batch with size >= actual size is not supported.") + one_message_size = self._bytes_size // initial_length new_batch = PublicBatch( @@ -192,6 +196,6 @@ def _pop_batch(self, size: int) -> PublicBatch: ) self.messages = self.messages[size:] - self._bytes_size = one_message_size * (initial_length - size) + self._bytes_size = self._bytes_size - new_batch._bytes_size return new_batch diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index f021ca24..4926246f 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -388,26 +388,6 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: partition_session_id, batch = self._message_batches.popitem(last=False) return partition_session_id, batch - def _cut_batch_by_max_messages( - self, - batch: datatypes.PublicBatch, - max_messages: int, - ) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]: - initial_length = len(batch.messages) - one_message_size = batch._bytes_size // initial_length - - new_batch = datatypes.PublicBatch( - messages=batch.messages[:max_messages], - _partition_session=batch._partition_session, - _bytes_size=one_message_size * max_messages, - _codec=batch._codec, - ) - - batch.messages = batch.messages[max_messages:] - batch._bytes_size = one_message_size * (initial_length - max_messages) - - return new_batch, batch - def receive_batch_nowait(self, max_messages: Optional[int] = None): if self._get_first_error(): raise self._get_first_error() From 6edda440c45447b87c95d88b48b92910d006462a Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 30 Sep 2024 13:46:17 +0300 Subject: [PATCH 6/7] review fixes --- ydb/_topic_reader/datatypes.py | 10 +++++----- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index d3ecd180..01501638 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -180,22 +180,22 @@ def _pop(self) -> Tuple[List[PublicMessage], bool]: msgs_left = True if len(self.messages) > 1 else False return self.messages.pop(0), msgs_left - def _pop_batch(self, size: int) -> PublicBatch: + def _pop_batch(self, message_count: int) -> PublicBatch: initial_length = len(self.messages) - if size >= initial_length: + if message_count >= initial_length: raise ValueError("Pop batch with size >= actual size is not supported.") one_message_size = self._bytes_size // initial_length new_batch = PublicBatch( - messages=self.messages[:size], + messages=self.messages[:message_count], _partition_session=self._partition_session, - _bytes_size=one_message_size * size, + _bytes_size=one_message_size * message_count, _codec=self._codec, ) - self.messages = self.messages[size:] + self.messages = self.messages[message_count:] self._bytes_size = self._bytes_size - new_batch._bytes_size return new_batch diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 4926246f..6833492d 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -401,7 +401,7 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None): self._buffer_release_bytes(batch._bytes_size) return batch - cutted_batch = batch._pop_batch(size=max_messages) + cutted_batch = batch._pop_batch(message_count=max_messages) self._message_batches[part_sess_id] = batch self._buffer_release_bytes(cutted_batch._bytes_size) From 10de36bc3a6258210f21c068a4748f0c36747d3b Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 30 Sep 2024 13:58:56 +0300 Subject: [PATCH 7/7] fix bytes_size on tests --- ydb/_topic_reader/topic_reader_asyncio_test.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 40565133..4c76cd1d 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1156,7 +1156,7 @@ async def test_read_message( 0: PublicBatch( messages=[stub_message(1)], _partition_session=stub_partition_session(), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ) }, @@ -1169,13 +1169,13 @@ async def test_read_message( 0: PublicBatch( messages=[stub_message(1), stub_message(2)], _partition_session=stub_partition_session(), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ), 1: PublicBatch( messages=[stub_message(3), stub_message(4)], _partition_session=stub_partition_session(1), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ), }, @@ -1185,13 +1185,13 @@ async def test_read_message( 1: PublicBatch( messages=[stub_message(3), stub_message(4)], _partition_session=stub_partition_session(1), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ), 0: PublicBatch( messages=[stub_message(2)], _partition_session=stub_partition_session(), - _bytes_size=0, + _bytes_size=2, _codec=Codec.CODEC_RAW, ), }, @@ -1201,13 +1201,13 @@ async def test_read_message( 0: PublicBatch( messages=[stub_message(1)], _partition_session=stub_partition_session(), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ), 1: PublicBatch( messages=[stub_message(2), stub_message(3)], _partition_session=stub_partition_session(1), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ), }, @@ -1217,7 +1217,7 @@ async def test_read_message( 1: PublicBatch( messages=[stub_message(2), stub_message(3)], _partition_session=stub_partition_session(1), - _bytes_size=0, + _bytes_size=4, _codec=Codec.CODEC_RAW, ) },