Skip to content

Commit eeef3fa

Browse files
authored
Merge pull request #581 from ydb-platform/no_stream_commit_offset
CommitOffset non-stream
2 parents f81788f + 0c32132 commit eeef3fa

File tree

8 files changed

+116
-3
lines changed

8 files changed

+116
-3
lines changed

docker-compose-tls.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: "3.9"
22
services:
33
ydb:
4-
image: ydbplatform/local-ydb:trunk
4+
image: ydbplatform/local-ydb:latest
55
restart: always
66
ports:
77
- 2136:2136

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: "3.3"
22
services:
33
ydb:
4-
image: ydbplatform/local-ydb:trunk
4+
image: ydbplatform/local-ydb:latest
55
restart: always
66
ports:
77
- 2136:2136

tests/query/test_query_parameters.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,73 +4,84 @@
44
import ydb
55

66

7-
query = """SELECT $a AS value"""
7+
query_template = "DECLARE $a as %s; SELECT $a AS value"
88

99

1010
def test_select_implicit_int(pool: ydb.QuerySessionPool):
11+
query = query_template % "Int64"
1112
expected_value = 111
1213
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
1314
actual_value = res[0].rows[0]["value"]
1415
assert expected_value == actual_value
1516

1617

1718
def test_select_implicit_float(pool: ydb.QuerySessionPool):
19+
query = query_template % "Double"
1820
expected_value = 11.1
1921
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
2022
actual_value = res[0].rows[0]["value"]
2123
assert expected_value == pytest.approx(actual_value)
2224

2325

2426
def test_select_implicit_bool(pool: ydb.QuerySessionPool):
27+
query = query_template % "Bool"
2528
expected_value = False
2629
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
2730
actual_value = res[0].rows[0]["value"]
2831
assert expected_value == actual_value
2932

3033

3134
def test_select_implicit_str(pool: ydb.QuerySessionPool):
35+
query = query_template % "Utf8"
3236
expected_value = "text"
3337
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
3438
actual_value = res[0].rows[0]["value"]
3539
assert expected_value == actual_value
3640

3741

3842
def test_select_implicit_bytes(pool: ydb.QuerySessionPool):
43+
query = query_template % "String"
3944
expected_value = b"text"
4045
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
4146
actual_value = res[0].rows[0]["value"]
4247
assert expected_value == actual_value
4348

4449

4550
def test_select_implicit_list(pool: ydb.QuerySessionPool):
51+
query = query_template % "List<Int64>"
4652
expected_value = [1, 2, 3]
4753
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
4854
actual_value = res[0].rows[0]["value"]
4955
assert expected_value == actual_value
5056

5157

5258
def test_select_implicit_dict(pool: ydb.QuerySessionPool):
59+
query = query_template % "Dict<Utf8, Int64>"
5360
expected_value = {"a": 1, "b": 2}
5461
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
5562
actual_value = res[0].rows[0]["value"]
5663
assert expected_value == actual_value
5764

5865

5966
def test_select_implicit_list_nested(pool: ydb.QuerySessionPool):
67+
query = query_template % "List<Dict<Utf8, Int64>>"
6068
expected_value = [{"a": 1}, {"b": 2}]
6169
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
6270
actual_value = res[0].rows[0]["value"]
6371
assert expected_value == actual_value
6472

6573

6674
def test_select_implicit_dict_nested(pool: ydb.QuerySessionPool):
75+
query = query_template % "Dict<Utf8, List<Int64>>"
6776
expected_value = {"a": [1, 2, 3], "b": [4, 5]}
6877
res = pool.execute_with_retries(query, parameters={"$a": expected_value})
6978
actual_value = res[0].rows[0]["value"]
7079
assert expected_value == actual_value
7180

7281

7382
def test_select_implicit_custom_type_raises(pool: ydb.QuerySessionPool):
83+
query = query_template % "Struct"
84+
7485
class CustomClass:
7586
pass
7687

@@ -80,25 +91,29 @@ class CustomClass:
8091

8192

8293
def test_select_implicit_empty_list_raises(pool: ydb.QuerySessionPool):
94+
query = query_template % "List<Int64>"
8395
expected_value = []
8496
with pytest.raises(ValueError):
8597
pool.execute_with_retries(query, parameters={"$a": expected_value})
8698

8799

88100
def test_select_implicit_empty_dict_raises(pool: ydb.QuerySessionPool):
101+
query = query_template % "Dict<Int64, Int64>"
89102
expected_value = {}
90103
with pytest.raises(ValueError):
91104
pool.execute_with_retries(query, parameters={"$a": expected_value})
92105

93106

94107
def test_select_explicit_primitive(pool: ydb.QuerySessionPool):
108+
query = query_template % "Int64"
95109
expected_value = 111
96110
res = pool.execute_with_retries(query, parameters={"$a": (expected_value, ydb.PrimitiveType.Int64)})
97111
actual_value = res[0].rows[0]["value"]
98112
assert expected_value == actual_value
99113

100114

101115
def test_select_explicit_list(pool: ydb.QuerySessionPool):
116+
query = query_template % "List<Int64>"
102117
expected_value = [1, 2, 3]
103118
type_ = ydb.ListType(ydb.PrimitiveType.Int64)
104119
res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)})
@@ -107,6 +122,7 @@ def test_select_explicit_list(pool: ydb.QuerySessionPool):
107122

108123

109124
def test_select_explicit_dict(pool: ydb.QuerySessionPool):
125+
query = query_template % "Dict<Utf8, Utf8>"
110126
expected_value = {"key": "value"}
111127
type_ = ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Utf8)
112128
res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)})
@@ -115,6 +131,7 @@ def test_select_explicit_dict(pool: ydb.QuerySessionPool):
115131

116132

117133
def test_select_explicit_empty_list_not_raises(pool: ydb.QuerySessionPool):
134+
query = query_template % "List<Int64>"
118135
expected_value = []
119136
type_ = ydb.ListType(ydb.PrimitiveType.Int64)
120137
res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)})
@@ -123,6 +140,7 @@ def test_select_explicit_empty_list_not_raises(pool: ydb.QuerySessionPool):
123140

124141

125142
def test_select_explicit_empty_dict_not_raises(pool: ydb.QuerySessionPool):
143+
query = query_template % "Dict<Utf8, Utf8>"
126144
expected_value = {}
127145
type_ = ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Utf8)
128146
res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)})
@@ -131,6 +149,7 @@ def test_select_explicit_empty_dict_not_raises(pool: ydb.QuerySessionPool):
131149

132150

133151
def test_select_typedvalue_full_primitive(pool: ydb.QuerySessionPool):
152+
query = query_template % "Int64"
134153
expected_value = 111
135154
typed_value = ydb.TypedValue(expected_value, ydb.PrimitiveType.Int64)
136155
res = pool.execute_with_retries(query, parameters={"$a": typed_value})
@@ -139,6 +158,7 @@ def test_select_typedvalue_full_primitive(pool: ydb.QuerySessionPool):
139158

140159

141160
def test_select_typedvalue_implicit_primitive(pool: ydb.QuerySessionPool):
161+
query = query_template % "Int64"
142162
expected_value = 111
143163
typed_value = ydb.TypedValue(expected_value)
144164
res = pool.execute_with_retries(query, parameters={"$a": typed_value})
@@ -147,6 +167,8 @@ def test_select_typedvalue_implicit_primitive(pool: ydb.QuerySessionPool):
147167

148168

149169
def test_select_typevalue_custom_type_raises(pool: ydb.QuerySessionPool):
170+
query = query_template % "Struct"
171+
150172
class CustomClass:
151173
pass
152174

tests/topics/test_topic_reader.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@ async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic
6464

6565
assert message != batch.messages[0]
6666

67+
async def test_commit_offset_works(self, driver, topic_with_messages, topic_consumer):
68+
for out in ["123", "456", "789", "0"]:
69+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
70+
message = await reader.receive_message()
71+
assert message.data.decode() == out
72+
73+
await driver.topic_client.commit_offset(
74+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
75+
)
76+
77+
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
78+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79+
for out in ["123", "456", "789", "0"]:
80+
message = await reader.receive_message()
81+
assert message.data.decode() == out
82+
83+
await driver.topic_client.commit_offset(
84+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
85+
)
86+
6787
async def test_read_compressed_messages(self, driver, topic_path, topic_consumer):
6888
async with driver.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
6989
await writer.write("123")
@@ -183,6 +203,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183203

184204
assert message != batch.messages[0]
185205

206+
def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consumer):
207+
for out in ["123", "456", "789", "0"]:
208+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
209+
message = reader.receive_message()
210+
assert message.data.decode() == out
211+
212+
driver_sync.topic_client.commit_offset(
213+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
214+
)
215+
216+
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
217+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
218+
for out in ["123", "456", "789", "0"]:
219+
message = reader.receive_message()
220+
assert message.data.decode() == out
221+
222+
driver_sync.topic_client.commit_offset(
223+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
224+
)
225+
186226
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
187227
with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
188228
writer.write("123")

ydb/_apis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TopicService(object):
117117
StreamRead = "StreamRead"
118118
StreamWrite = "StreamWrite"
119119
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
120+
CommitOffset = "CommitOffset"
120121

121122

122123
class QueryService(object):

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
137137
return UpdateTokenResponse()
138138

139139

140+
@dataclass
141+
class CommitOffsetRequest(IToProto):
142+
path: str
143+
consumer: str
144+
partition_id: int
145+
offset: int
146+
147+
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148+
return ydb_topic_pb2.CommitOffsetRequest(
149+
path=self.path,
150+
consumer=self.consumer,
151+
partition_id=self.partition_id,
152+
offset=self.offset,
153+
)
154+
155+
140156
########################################################################################################################
141157
# StreamWrite
142158
########################################################################################################################

ydb/_topic_reader/datatypes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
5656
def alive(self) -> bool:
5757
return not self._partition_session.closed
5858

59+
@property
60+
def partition_id(self) -> int:
61+
return self._partition_session.partition_id
62+
5963

6064
@dataclass
6165
class PartitionSession:

ydb/topic.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,21 @@ def tx_writer(
340340

341341
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
342342

343+
async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
344+
req = _ydb_topic.CommitOffsetRequest(
345+
path=path,
346+
consumer=consumer,
347+
partition_id=partition_id,
348+
offset=offset,
349+
)
350+
351+
await self._driver(
352+
req.to_proto(),
353+
_apis.TopicService.Stub,
354+
_apis.TopicService.CommitOffset,
355+
_wrap_operation,
356+
)
357+
343358
def close(self):
344359
if self._closed:
345360
return
@@ -603,6 +618,21 @@ def tx_writer(
603618

604619
return TopicTxWriter(tx, self._driver, settings, _parent=self)
605620

621+
def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
622+
req = _ydb_topic.CommitOffsetRequest(
623+
path=path,
624+
consumer=consumer,
625+
partition_id=partition_id,
626+
offset=offset,
627+
)
628+
629+
self._driver(
630+
req.to_proto(),
631+
_apis.TopicService.Stub,
632+
_apis.TopicService.CommitOffset,
633+
_wrap_operation,
634+
)
635+
606636
def close(self):
607637
if self._closed:
608638
return

0 commit comments

Comments
 (0)