Skip to content

Commit 58b0a70

Browse files
committed
CommitOffset with readsessionid
1 parent 7de9687 commit 58b0a70

File tree

5 files changed

+68
-2
lines changed

5 files changed

+68
-2
lines changed

tests/topics/test_topic_reader.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,28 @@ async def test_commit_offset_works(self, driver, topic_with_messages, topic_cons
7474
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
7575
)
7676

77+
async def test_commit_offset_with_session_id_works(self, driver, topic_with_messages, topic_consumer):
78+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79+
msg1 = await reader.receive_message()
80+
assert msg1.seqno == 1
81+
msg2 = await reader.receive_message()
82+
assert msg2.seqno == 2
83+
84+
await driver.topic_client.commit_offset(
85+
topic_with_messages,
86+
topic_consumer,
87+
msg1.partition_id,
88+
msg1.offset + 1,
89+
reader.read_session_id,
90+
)
91+
92+
msg3 = await reader.receive_message()
93+
assert msg3.seqno == 3
94+
95+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
96+
msg2 = await reader.receive_message()
97+
assert msg2.seqno == 2
98+
7799
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
78100
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79101
for out in ["123", "456", "789", "0"]:
@@ -213,6 +235,28 @@ def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consu
213235
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
214236
)
215237

238+
def test_commit_offset_with_session_id_works(self, driver_sync, topic_with_messages, topic_consumer):
239+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
240+
msg1 = reader.receive_message()
241+
assert msg1.seqno == 1
242+
msg2 = reader.receive_message()
243+
assert msg2.seqno == 2
244+
245+
driver_sync.topic_client.commit_offset(
246+
topic_with_messages,
247+
topic_consumer,
248+
msg1.partition_id,
249+
msg1.offset + 1,
250+
reader.read_session_id,
251+
)
252+
253+
msg3 = reader.receive_message()
254+
assert msg3.seqno == 3
255+
256+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
257+
msg2 = reader.receive_message()
258+
assert msg2.seqno == 2
259+
216260
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
217261
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
218262
for out in ["123", "456", "789", "0"]:

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,15 @@ class CommitOffsetRequest(IToProto):
143143
consumer: str
144144
partition_id: int
145145
offset: int
146+
read_session_id: Optional[str]
146147

147148
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148149
return ydb_topic_pb2.CommitOffsetRequest(
149150
path=self.path,
150151
consumer=self.consumer,
151152
partition_id=self.partition_id,
152153
offset=self.offset,
154+
read_session_id=self.read_session_id,
153155
)
154156

155157

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ async def close(self, flush: bool = True):
190190
self._closed = True
191191
await self._reconnector.close(flush)
192192

193+
@property
194+
def read_session_id(self) -> Optional[str]:
195+
return self._reconnector.read_session_id
196+
193197

194198
class ReaderReconnector:
195199
_static_reader_reconnector_counter = AtomicCounter()
@@ -373,6 +377,12 @@ def _set_first_error(self, err: issues.Error):
373377
# skip if already has result
374378
pass
375379

380+
@property
381+
def read_session_id(self) -> Optional[str]:
382+
if not self._stream_reader:
383+
return None
384+
return self._stream_reader._session_id
385+
376386

377387
class ReaderStream:
378388
_static_id_counter = AtomicCounter()

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,7 @@ def close(self, *, flush: bool = True, timeout: TimeoutType = None):
204204
def _check_closed(self):
205205
if self._closed:
206206
raise TopicReaderClosedError()
207+
208+
@property
209+
def read_session_id(self) -> Optional[str]:
210+
return self._async_reader.read_session_id

ydb/topic.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,15 @@ def tx_writer(
340340

341341
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
342342

343-
async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
343+
async def commit_offset(
344+
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
345+
) -> None:
344346
req = _ydb_topic.CommitOffsetRequest(
345347
path=path,
346348
consumer=consumer,
347349
partition_id=partition_id,
348350
offset=offset,
351+
read_session_id=read_session_id,
349352
)
350353

351354
await self._driver(
@@ -618,12 +621,15 @@ def tx_writer(
618621

619622
return TopicTxWriter(tx, self._driver, settings, _parent=self)
620623

621-
def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
624+
def commit_offset(
625+
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
626+
) -> None:
622627
req = _ydb_topic.CommitOffsetRequest(
623628
path=path,
624629
consumer=consumer,
625630
partition_id=partition_id,
626631
offset=offset,
632+
read_session_id=read_session_id,
627633
)
628634

629635
self._driver(

0 commit comments

Comments
 (0)