Skip to content

Commit a1888c7

Browse files
committed
review fixes
1 parent 60a4504 commit a1888c7

File tree

3 files changed

+26
-32
lines changed

3 files changed

+26
-32
lines changed

tests/topics/test_topic_writer.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,10 @@ async def test_random_producer_id(self, driver: ydb.aio.Driver, topic_path, topi
4141
async with driver.topic_client.writer(topic_path) as writer:
4242
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
4343

44-
batch = await topic_reader.receive_batch()
44+
msg1 = await topic_reader.receive_message()
45+
msg2 = await topic_reader.receive_message()
4546

46-
if len(batch.messages) == 1:
47-
batch2 = await topic_reader.receive_batch()
48-
batch.messages.extend(batch2.messages)
49-
50-
assert batch.messages[0].producer_id != batch.messages[1].producer_id
47+
assert msg1.producer_id != msg2.producer_id
5148

5249
async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
5350
async with driver.topic_client.writer(
@@ -80,18 +77,16 @@ async def test_write_multi_message_with_ack(
8077
assert res1.offset == 0
8178
assert res2.offset == 1
8279

83-
batch = await topic_reader.receive_batch()
80+
msg1 = await topic_reader.receive_message()
81+
msg2 = await topic_reader.receive_message()
8482

85-
assert batch.messages[0].offset == 0
86-
assert batch.messages[0].seqno == 1
87-
assert batch.messages[0].data == "123".encode()
83+
assert msg1.offset == 0
84+
assert msg1.seqno == 1
85+
assert msg1.data == "123".encode()
8886

89-
# # remove second recieve batch when implement batching
90-
# # https://github.com/ydb-platform/ydb-python-sdk/issues/142
91-
# batch = await topic_reader.receive_batch()
92-
assert batch.messages[1].offset == 1
93-
assert batch.messages[1].seqno == 2
94-
assert batch.messages[1].data == "456".encode()
87+
assert msg2.offset == 1
88+
assert msg2.seqno == 2
89+
assert msg2.data == "456".encode()
9590

9691
@pytest.mark.parametrize(
9792
"codec",
@@ -189,9 +184,10 @@ def test_random_producer_id(
189184
with driver_sync.topic_client.writer(topic_path) as writer:
190185
writer.write(ydb.TopicWriterMessage(data="123".encode()))
191186

192-
batch = topic_reader_sync.receive_batch()
187+
msg1 = topic_reader_sync.receive_message()
188+
msg2 = topic_reader_sync.receive_message()
193189

194-
assert batch.messages[0].producer_id != batch.messages[1].producer_id
190+
assert msg1.producer_id != msg2.producer_id
195191

196192
def test_write_multi_message_with_ack(
197193
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader
@@ -204,18 +200,16 @@ def test_write_multi_message_with_ack(
204200
]
205201
)
206202

207-
batch = topic_reader_sync.receive_batch()
208-
if len(batch.messages) == 1:
209-
batch2 = topic_reader_sync.receive_batch()
210-
batch.messages.extend(batch2.messages)
203+
msg1 = topic_reader_sync.receive_message()
204+
msg2 = topic_reader_sync.receive_message()
211205

212-
assert batch.messages[0].offset == 0
213-
assert batch.messages[0].seqno == 1
214-
assert batch.messages[0].data == "123".encode()
206+
assert msg1.offset == 0
207+
assert msg1.seqno == 1
208+
assert msg1.data == "123".encode()
215209

216-
assert batch.messages[1].offset == 1
217-
assert batch.messages[1].seqno == 2
218-
assert batch.messages[1].data == "456".encode()
210+
assert msg2.offset == 1
211+
assert msg2.seqno == 2
212+
assert msg2.data == "456".encode()
219213

220214
@pytest.mark.parametrize(
221215
"codec",

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class ReaderStream:
264264

265265
_state_changed: asyncio.Event
266266
_closed: bool
267-
_message_batches: typing.Dict[int, datatypes.PublicBatch]
267+
_message_batches: typing.Dict[int, datatypes.PublicBatch] # keys are partition session ID
268268
_first_error: asyncio.Future[YdbError]
269269

270270
_update_token_interval: Union[int, float]
@@ -360,8 +360,8 @@ async def wait_messages(self):
360360
self._state_changed.clear()
361361

362362
def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
363-
first_id, batch = self._message_batches.popitem(last=False)
364-
return first_id, batch
363+
partition_session_id, batch = self._message_batches.popitem(last=False)
364+
return partition_session_id, batch
365365

366366
def receive_batch_nowait(self):
367367
if self._get_first_error():

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def batch_size():
232232
return len(stream_reader._message_batches[partition_session_id].messages)
233233

234234
initial_batches = batch_count()
235-
initial_batch_size = batch_size() if not new_batch else 0
235+
initial_batch_size = 0 if new_batch else batch_size()
236236

237237
stream = stream_reader._stream # type: StreamMock
238238
stream.from_server.put_nowait(

0 commit comments

Comments
 (0)