Skip to content

CommitOffset with read session id #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ async def test_commit_offset_works(self, driver, topic_with_messages, topic_cons
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
)

async def test_commit_offset_with_session_id_works(self, driver, topic_with_messages, topic_consumer):
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
msg1 = await reader.receive_message()
assert msg1.seqno == 1
msg2 = await reader.receive_message()
assert msg2.seqno == 2

await driver.topic_client.commit_offset(
topic_with_messages,
topic_consumer,
msg1.partition_id,
msg1.offset + 1,
reader.read_session_id,
)

msg3 = await reader.receive_message()
assert msg3.seqno == 3

async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
msg2 = await reader.receive_message()
assert msg2.seqno == 2

async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
for out in ["123", "456", "789", "0"]:
Expand Down Expand Up @@ -213,6 +235,28 @@ def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consu
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
)

def test_commit_offset_with_session_id_works(self, driver_sync, topic_with_messages, topic_consumer):
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
msg1 = reader.receive_message()
assert msg1.seqno == 1
msg2 = reader.receive_message()
assert msg2.seqno == 2

driver_sync.topic_client.commit_offset(
topic_with_messages,
topic_consumer,
msg1.partition_id,
msg1.offset + 1,
reader.read_session_id,
)

msg3 = reader.receive_message()
assert msg3.seqno == 3

with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
msg2 = reader.receive_message()
assert msg2.seqno == 2

def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
for out in ["123", "456", "789", "0"]:
Expand Down
2 changes: 2 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ class CommitOffsetRequest(IToProto):
consumer: str
partition_id: int
offset: int
read_session_id: Optional[str]

def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
return ydb_topic_pb2.CommitOffsetRequest(
path=self.path,
consumer=self.consumer,
partition_id=self.partition_id,
offset=self.offset,
read_session_id=self.read_session_id,
)


Expand Down
1,579 changes: 1,206 additions & 373 deletions ydb/_grpc/v3/draft/protos/ydb_federated_query_pb2.py

Large diffs are not rendered by default.

198 changes: 158 additions & 40 deletions ydb/_grpc/v3/protos/ydb_export_pb2.py

Large diffs are not rendered by default.

491 changes: 453 additions & 38 deletions ydb/_grpc/v3/protos/ydb_import_pb2.py

Large diffs are not rendered by default.

252 changes: 141 additions & 111 deletions ydb/_grpc/v3/protos/ydb_topic_pb2.py

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions ydb/_grpc/v3/ydb_import_v1_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions ydb/_grpc/v3/ydb_import_v1_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=protos_dot_ydb__import__pb2.ImportFromS3Request.SerializeToString,
response_deserializer=protos_dot_ydb__import__pb2.ImportFromS3Response.FromString,
)
self.ListObjectsInS3Export = channel.unary_unary(
'/Ydb.Import.V1.ImportService/ListObjectsInS3Export',
request_serializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.SerializeToString,
response_deserializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.FromString,
)
self.ImportData = channel.unary_unary(
'/Ydb.Import.V1.ImportService/ImportData',
request_serializer=protos_dot_ydb__import__pb2.ImportDataRequest.SerializeToString,
Expand All @@ -37,6 +42,13 @@ def ImportFromS3(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def ListObjectsInS3Export(self, request, context):
"""List objects from existing export stored in S3 bucket
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def ImportData(self, request, context):
"""Writes data to a table.
Method accepts serialized data in the selected format and writes it non-transactionally.
Expand All @@ -53,6 +65,11 @@ def add_ImportServiceServicer_to_server(servicer, server):
request_deserializer=protos_dot_ydb__import__pb2.ImportFromS3Request.FromString,
response_serializer=protos_dot_ydb__import__pb2.ImportFromS3Response.SerializeToString,
),
'ListObjectsInS3Export': grpc.unary_unary_rpc_method_handler(
servicer.ListObjectsInS3Export,
request_deserializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.FromString,
response_serializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.SerializeToString,
),
'ImportData': grpc.unary_unary_rpc_method_handler(
servicer.ImportData,
request_deserializer=protos_dot_ydb__import__pb2.ImportDataRequest.FromString,
Expand Down Expand Up @@ -85,6 +102,23 @@ def ImportFromS3(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def ListObjectsInS3Export(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Ydb.Import.V1.ImportService/ListObjectsInS3Export',
protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.SerializeToString,
protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def ImportData(request,
target,
Expand Down
21 changes: 3 additions & 18 deletions ydb/_grpc/v3/ydb_query_v1_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@


class QueryServiceStub(object):
"""! WARNING: Experimental API
! This API is currently in experimental state and is a subject for changes.
! No backward and/or forward compatibility guarantees are provided.
! DO NOT USE for production workloads.

"""
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.
Expand Down Expand Up @@ -68,12 +63,7 @@ def __init__(self, channel):


class QueryServiceServicer(object):
"""! WARNING: Experimental API
! This API is currently in experimental state and is a subject for changes.
! No backward and/or forward compatibility guarantees are provided.
! DO NOT USE for production workloads.

"""
"""Missing associated documentation comment in .proto file."""

def CreateSession(self, request, context):
"""Sessions are basic primitives for communicating with YDB Query Service. The are similar to
Expand Down Expand Up @@ -214,12 +204,7 @@ def add_QueryServiceServicer_to_server(servicer, server):

# This class is part of an EXPERIMENTAL API.
class QueryService(object):
"""! WARNING: Experimental API
! This API is currently in experimental state and is a subject for changes.
! No backward and/or forward compatibility guarantees are provided.
! DO NOT USE for production workloads.

"""
"""Missing associated documentation comment in .proto file."""

@staticmethod
def CreateSession(request,
Expand Down
510 changes: 286 additions & 224 deletions ydb/_grpc/v4/draft/protos/ydb_federated_query_pb2.py

Large diffs are not rendered by default.

Loading
Loading