Skip to content

Commit bf65dc3

Browse files
committed
review fixes
1 parent ffdd843 commit bf65dc3

File tree

2 files changed

+5
-21
lines changed

2 files changed

+5
-21
lines changed

ydb/_topic_reader/datatypes.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ def _pop(self) -> Tuple[List[PublicMessage], bool]:
182182

183183
def _pop_batch(self, size: int) -> PublicBatch:
184184
initial_length = len(self.messages)
185+
186+
if size >= initial_length:
187+
raise ValueError("Pop batch with size >= actual size is not supported.")
188+
185189
one_message_size = self._bytes_size // initial_length
186190

187191
new_batch = PublicBatch(
@@ -192,6 +196,6 @@ def _pop_batch(self, size: int) -> PublicBatch:
192196
)
193197

194198
self.messages = self.messages[size:]
195-
self._bytes_size = one_message_size * (initial_length - size)
199+
self._bytes_size = self._bytes_size - new_batch._bytes_size
196200

197201
return new_batch

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -388,26 +388,6 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
388388
partition_session_id, batch = self._message_batches.popitem(last=False)
389389
return partition_session_id, batch
390390

391-
def _cut_batch_by_max_messages(
392-
self,
393-
batch: datatypes.PublicBatch,
394-
max_messages: int,
395-
) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]:
396-
initial_length = len(batch.messages)
397-
one_message_size = batch._bytes_size // initial_length
398-
399-
new_batch = datatypes.PublicBatch(
400-
messages=batch.messages[:max_messages],
401-
_partition_session=batch._partition_session,
402-
_bytes_size=one_message_size * max_messages,
403-
_codec=batch._codec,
404-
)
405-
406-
batch.messages = batch.messages[max_messages:]
407-
batch._bytes_size = one_message_size * (initial_length - max_messages)
408-
409-
return new_batch, batch
410-
411391
def receive_batch_nowait(self, max_messages: Optional[int] = None):
412392
if self._get_first_error():
413393
raise self._get_first_error()

0 commit comments

Comments
 (0)