Skip to content

Commit 4877cc8

Browse files
authored
Merge pull request #491 from ydb-platform/topic_batch_messages
Ability to batch messages in topic reader
2 parents c84538d + 2560e5f commit 4877cc8

File tree

5 files changed

+126
-103
lines changed

5 files changed

+126
-103
lines changed

tests/topics/test_topic_reader.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,14 @@ async def test_read_and_commit_with_close_reader(self, driver, topic_with_messag
4040
assert message != message2
4141

4242
async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic_consumer):
43-
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
44-
batch = await reader.receive_batch()
45-
await reader.commit_with_ack(batch)
43+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
44+
message = await reader.receive_message()
45+
await reader.commit_with_ack(message)
4646

47-
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
48-
batch2 = await reader.receive_batch()
49-
assert batch.messages[0] != batch2.messages[0]
47+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
48+
batch = await reader.receive_batch()
5049

51-
await reader.close()
50+
assert message != batch.messages[0]
5251

5352
async def test_read_compressed_messages(self, driver, topic_path, topic_consumer):
5453
async with driver.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
@@ -147,12 +146,12 @@ def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_message
147146

148147
def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_consumer):
149148
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
150-
batch = reader.receive_batch()
151-
reader.commit_with_ack(batch)
149+
message = reader.receive_message()
150+
reader.commit_with_ack(message)
152151

153152
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
154-
batch2 = reader.receive_batch()
155-
assert batch.messages[0] != batch2.messages[0]
153+
batch = reader.receive_batch()
154+
assert message != batch.messages[0]
156155

157156
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
158157
with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:

tests/topics/test_topic_writer.py

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ async def test_random_producer_id(self, driver: ydb.aio.Driver, topic_path, topi
4141
async with driver.topic_client.writer(topic_path) as writer:
4242
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
4343

44-
batch1 = await topic_reader.receive_batch()
45-
batch2 = await topic_reader.receive_batch()
44+
msg1 = await topic_reader.receive_message()
45+
msg2 = await topic_reader.receive_message()
4646

47-
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
47+
assert msg1.producer_id != msg2.producer_id
4848

4949
async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
5050
async with driver.topic_client.writer(
@@ -77,18 +77,16 @@ async def test_write_multi_message_with_ack(
7777
assert res1.offset == 0
7878
assert res2.offset == 1
7979

80-
batch = await topic_reader.receive_batch()
80+
msg1 = await topic_reader.receive_message()
81+
msg2 = await topic_reader.receive_message()
8182

82-
assert batch.messages[0].offset == 0
83-
assert batch.messages[0].seqno == 1
84-
assert batch.messages[0].data == "123".encode()
83+
assert msg1.offset == 0
84+
assert msg1.seqno == 1
85+
assert msg1.data == "123".encode()
8586

86-
# remove second recieve batch when implement batching
87-
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
88-
batch = await topic_reader.receive_batch()
89-
assert batch.messages[0].offset == 1
90-
assert batch.messages[0].seqno == 2
91-
assert batch.messages[0].data == "456".encode()
87+
assert msg2.offset == 1
88+
assert msg2.seqno == 2
89+
assert msg2.data == "456".encode()
9290

9391
@pytest.mark.parametrize(
9492
"codec",
@@ -186,10 +184,10 @@ def test_random_producer_id(
186184
with driver_sync.topic_client.writer(topic_path) as writer:
187185
writer.write(ydb.TopicWriterMessage(data="123".encode()))
188186

189-
batch1 = topic_reader_sync.receive_batch()
190-
batch2 = topic_reader_sync.receive_batch()
187+
msg1 = topic_reader_sync.receive_message()
188+
msg2 = topic_reader_sync.receive_message()
191189

192-
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
190+
assert msg1.producer_id != msg2.producer_id
193191

194192
def test_write_multi_message_with_ack(
195193
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader
@@ -202,18 +200,16 @@ def test_write_multi_message_with_ack(
202200
]
203201
)
204202

205-
batch = topic_reader_sync.receive_batch()
203+
msg1 = topic_reader_sync.receive_message()
204+
msg2 = topic_reader_sync.receive_message()
206205

207-
assert batch.messages[0].offset == 0
208-
assert batch.messages[0].seqno == 1
209-
assert batch.messages[0].data == "123".encode()
206+
assert msg1.offset == 0
207+
assert msg1.seqno == 1
208+
assert msg1.data == "123".encode()
210209

211-
# remove second recieve batch when implement batching
212-
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
213-
batch = topic_reader_sync.receive_batch()
214-
assert batch.messages[0].offset == 1
215-
assert batch.messages[0].seqno == 2
216-
assert batch.messages[0].data == "456".encode()
210+
assert msg2.offset == 1
211+
assert msg2.seqno == 2
212+
assert msg2.data == "456".encode()
217213

218214
@pytest.mark.parametrize(
219215
"codec",

ydb/_topic_reader/datatypes.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import deque
88
from dataclasses import dataclass, field
99
import datetime
10-
from typing import Union, Any, List, Dict, Deque, Optional
10+
from typing import Union, Any, List, Dict, Deque, Optional, Tuple
1111

1212
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec
1313
from ydb._topic_reader import topic_reader_asyncio
@@ -171,3 +171,11 @@ def alive(self) -> bool:
171171

172172
def pop_message(self) -> PublicMessage:
173173
return self.messages.pop(0)
174+
175+
def _extend(self, batch: PublicBatch) -> None:
176+
self.messages.extend(batch.messages)
177+
self._bytes_size += batch._bytes_size
178+
179+
def _pop(self) -> Tuple[List[PublicMessage], bool]:
180+
msgs_left = True if len(self.messages) > 1 else False
181+
return self.messages.pop(0), msgs_left

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import gzip
66
import typing
77
from asyncio import Task
8-
from collections import deque
8+
from collections import OrderedDict
99
from typing import Optional, Set, Dict, Union, Callable
1010

1111
import ydb
@@ -266,7 +266,7 @@ class ReaderStream:
266266

267267
_state_changed: asyncio.Event
268268
_closed: bool
269-
_message_batches: typing.Deque[datatypes.PublicBatch]
269+
_message_batches: typing.Dict[int, datatypes.PublicBatch] # keys are partition session ID
270270
_first_error: asyncio.Future[YdbError]
271271

272272
_update_token_interval: Union[int, float]
@@ -298,7 +298,7 @@ def __init__(
298298
self._closed = False
299299
self._first_error = asyncio.get_running_loop().create_future()
300300
self._batches_to_decode = asyncio.Queue()
301-
self._message_batches = deque()
301+
self._message_batches = OrderedDict()
302302

303303
self._update_token_interval = settings.update_token_interval
304304
self._get_token_function = get_token_function
@@ -379,29 +379,38 @@ async def wait_messages(self):
379379
await self._state_changed.wait()
380380
self._state_changed.clear()
381381

382+
def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
383+
partition_session_id, batch = self._message_batches.popitem(last=False)
384+
return partition_session_id, batch
385+
382386
def receive_batch_nowait(self):
383387
if self._get_first_error():
384388
raise self._get_first_error()
385389

386390
if not self._message_batches:
387391
return None
388392

389-
batch = self._message_batches.popleft()
393+
_, batch = self._get_first_batch()
390394
self._buffer_release_bytes(batch._bytes_size)
395+
391396
return batch
392397

393398
def receive_message_nowait(self):
394399
if self._get_first_error():
395400
raise self._get_first_error()
396401

397-
try:
398-
batch = self._message_batches[0]
399-
message = batch.pop_message()
400-
except IndexError:
402+
if not self._message_batches:
401403
return None
402404

403-
if batch.empty():
404-
self.receive_batch_nowait()
405+
part_sess_id, batch = self._get_first_batch()
406+
407+
message, msgs_left = batch._pop()
408+
409+
if not msgs_left:
410+
self._buffer_release_bytes(batch._bytes_size)
411+
else:
412+
# TODO: we should somehow release bytes from single message as well
413+
self._message_batches[part_sess_id] = batch
405414

406415
return message
407416

@@ -625,9 +634,17 @@ async def _decode_batches_loop(self):
625634
while True:
626635
batch = await self._batches_to_decode.get()
627636
await self._decode_batch_inplace(batch)
628-
self._message_batches.append(batch)
637+
self._add_batch_to_queue(batch)
629638
self._state_changed.set()
630639

640+
def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
641+
part_sess_id = batch._partition_session.id
642+
if part_sess_id in self._message_batches:
643+
self._message_batches[part_sess_id]._extend(batch)
644+
return
645+
646+
self._message_batches[part_sess_id] = batch
647+
631648
async def _decode_batch_inplace(self, batch):
632649
if batch._codec == Codec.CODEC_RAW:
633650
return

0 commit comments

Comments
 (0)