31
31
);
32
32
"""
33
33
34
+ QUERY_READ_QUERY_TEMPLATE = """
35
+ SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id);
36
+ """
37
+
38
+ QUERY_WRITE_QUERY_TEMPLATE = """
39
+ UPSERT INTO `{}` (
40
+ object_id, object_hash, payload_str, payload_double, payload_timestamp
41
+ ) VALUES (
42
+ $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp
43
+ );
44
+ """
45
+
34
46
logger = logging .getLogger (__name__ )
35
47
36
48
@@ -150,12 +162,12 @@ def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout):
150
162
logger .info ("Session pool for read requests created" )
151
163
152
164
while time .time () - start_time < runtime :
153
- params = {"$object_id" : randint (1 , max_id )}
165
+ params = {"$object_id" : ( randint (1 , max_id ), ydb . PrimitiveType . Uint64 )}
154
166
with limiter :
155
167
156
168
def check_result (result ):
157
169
res = next (result )
158
- assert res [ 0 ] .rows [0 ]
170
+ assert res .rows [0 ]
159
171
160
172
params = RequestParams (
161
173
pool = pool ,
@@ -175,7 +187,7 @@ def check_result(result):
175
187
def run_read_jobs_query (args , driver , tb_name , max_id , metrics ):
176
188
logger .info ("Start read jobs for query service" )
177
189
178
- read_q = READ_QUERY_TEMPLATE .format (tb_name )
190
+ read_q = QUERY_READ_QUERY_TEMPLATE .format (tb_name )
179
191
180
192
read_limiter = RateLimiter (max_calls = args .read_rps , period = 1 )
181
193
futures = []
@@ -213,10 +225,6 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
213
225
"$payload_timestamp" : row .payload_timestamp ,
214
226
}
215
227
216
- def check_result (result ):
217
- with result :
218
- pass
219
-
220
228
with limiter :
221
229
params = RequestParams (
222
230
pool = pool ,
@@ -226,7 +234,6 @@ def check_result(result):
226
234
labels = (JOB_WRITE_LABEL ,),
227
235
request_settings = request_settings ,
228
236
retry_settings = retry_setting ,
229
- check_result_cb = check_result ,
230
237
)
231
238
execute_query (params )
232
239
@@ -272,11 +279,16 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti
272
279
while time .time () - start_time < runtime :
273
280
row = row_generator .get ()
274
281
params = {
275
- "$object_id" : (row .object_id , ydb .PrimitiveType .Int64 ),
282
+ "$object_id" : (row .object_id , ydb .PrimitiveType .Uint64 ),
276
283
"$payload_str" : (row .payload_str , ydb .PrimitiveType .Utf8 ),
277
284
"$payload_double" : (row .payload_double , ydb .PrimitiveType .Double ),
278
285
"$payload_timestamp" : (row .payload_timestamp , ydb .PrimitiveType .Timestamp ),
279
286
}
287
+
288
+ def check_result (result ):
289
+ with result :
290
+ pass
291
+
280
292
with limiter :
281
293
params = RequestParams (
282
294
pool = pool ,
@@ -286,6 +298,7 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti
286
298
labels = (JOB_WRITE_LABEL ,),
287
299
request_settings = request_settings ,
288
300
retry_settings = retry_setting ,
301
+ check_result_cb = check_result ,
289
302
)
290
303
execute_query (params )
291
304
@@ -295,7 +308,7 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti
295
308
def run_write_jobs_query (args , driver , tb_name , max_id , metrics ):
296
309
logger .info ("Start write jobs for query service" )
297
310
298
- write_q = WRITE_QUERY_TEMPLATE .format (tb_name )
311
+ write_q = QUERY_WRITE_QUERY_TEMPLATE .format (tb_name )
299
312
300
313
write_limiter = RateLimiter (max_calls = args .write_rps , period = 1 )
301
314
row_generator = RowGenerator (max_id )
0 commit comments