3
3
import logging
4
4
import dataclasses
5
5
from random import randint
6
- from typing import Callable , Tuple
6
+ from typing import Callable , Tuple , Union
7
7
from ratelimiter import RateLimiter
8
8
9
9
import threading
31
31
);
32
32
"""
33
33
34
+
34
35
logger = logging .getLogger (__name__ )
35
36
36
37
37
38
@dataclasses .dataclass
38
39
class RequestParams :
39
- pool : ydb .SessionPool
40
+ pool : Union [ ydb .SessionPool , ydb . QuerySessionPool ]
40
41
query : str
41
42
params : dict
42
43
metrics : Metrics
@@ -56,7 +57,7 @@ def transaction(session):
56
57
57
58
result = session .transaction ().execute (
58
59
params .query ,
59
- params .params ,
60
+ parameters = params .params ,
60
61
commit_tx = True ,
61
62
settings = params .request_settings ,
62
63
)
@@ -82,7 +83,7 @@ def transaction(session):
82
83
def run_reads (driver , query , max_id , metrics , limiter , runtime , timeout ):
83
84
start_time = time .time ()
84
85
85
- logger .info ("Start read workload" )
86
+ logger .info ("Start read workload over table service " )
86
87
87
88
request_settings = ydb .BaseRequestSettings ().with_timeout (timeout )
88
89
retry_setting = ydb .RetrySettings (
@@ -116,7 +117,7 @@ def check_result(result):
116
117
117
118
118
119
def run_read_jobs (args , driver , tb_name , max_id , metrics ):
119
- logger .info ("Start read jobs" )
120
+ logger .info ("Start read jobs over table service " )
120
121
121
122
session = ydb .retry_operation_sync (lambda : driver .table_client .session ().create ())
122
123
read_q = session .prepare (READ_QUERY_TEMPLATE .format (tb_name ))
@@ -135,10 +136,65 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics):
135
136
return futures
136
137
137
138
139
+ def run_reads_query (driver , query , max_id , metrics , limiter , runtime , timeout ):
140
+ start_time = time .time ()
141
+
142
+ logger .info ("Start read workload over query service" )
143
+
144
+ request_settings = ydb .BaseRequestSettings ().with_timeout (timeout )
145
+ retry_setting = ydb .RetrySettings (
146
+ idempotent = True ,
147
+ max_session_acquire_timeout = timeout ,
148
+ )
149
+
150
+ with ydb .QuerySessionPool (driver ) as pool :
151
+ logger .info ("Session pool for read requests created" )
152
+
153
+ while time .time () - start_time < runtime :
154
+ params = {"$object_id" : (randint (1 , max_id ), ydb .PrimitiveType .Uint64 )}
155
+ with limiter :
156
+
157
+ def check_result (result ):
158
+ res = next (result )
159
+ assert res .rows [0 ]
160
+
161
+ params = RequestParams (
162
+ pool = pool ,
163
+ query = query ,
164
+ params = params ,
165
+ metrics = metrics ,
166
+ labels = (JOB_READ_LABEL ,),
167
+ request_settings = request_settings ,
168
+ retry_settings = retry_setting ,
169
+ check_result_cb = check_result ,
170
+ )
171
+ execute_query (params )
172
+
173
+ logger .info ("Stop read workload" )
174
+
175
+
176
+ def run_read_jobs_query (args , driver , tb_name , max_id , metrics ):
177
+ logger .info ("Start read jobs over query service" )
178
+
179
+ read_q = READ_QUERY_TEMPLATE .format (tb_name )
180
+
181
+ read_limiter = RateLimiter (max_calls = args .read_rps , period = 1 )
182
+ futures = []
183
+ for _ in range (args .read_threads ):
184
+ future = threading .Thread (
185
+ name = "slo_run_read" ,
186
+ target = run_reads_query ,
187
+ args = (driver , read_q , max_id , metrics , read_limiter , args .time , args .read_timeout / 1000 ),
188
+ )
189
+ future .start ()
190
+ futures .append (future )
191
+ return futures
192
+
193
+
138
194
def run_writes (driver , query , row_generator , metrics , limiter , runtime , timeout ):
139
195
start_time = time .time ()
140
196
141
- logger .info ("Start write workload" )
197
+ logger .info ("Start write workload over table service " )
142
198
143
199
request_settings = ydb .BaseRequestSettings ().with_timeout (timeout )
144
200
retry_setting = ydb .RetrySettings (
@@ -157,6 +213,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
157
213
"$payload_double" : row .payload_double ,
158
214
"$payload_timestamp" : row .payload_timestamp ,
159
215
}
216
+
160
217
with limiter :
161
218
params = RequestParams (
162
219
pool = pool ,
@@ -173,7 +230,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
173
230
174
231
175
232
def run_write_jobs (args , driver , tb_name , max_id , metrics ):
176
- logger .info ("Start write jobs" )
233
+ logger .info ("Start write jobs over table service " )
177
234
178
235
session = ydb .retry_operation_sync (lambda : driver .table_client .session ().create ())
179
236
write_q = session .prepare (WRITE_QUERY_TEMPLATE .format (tb_name ))
@@ -194,6 +251,70 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics):
194
251
return futures
195
252
196
253
254
+ def run_writes_query (driver , query , row_generator , metrics , limiter , runtime , timeout ):
255
+ start_time = time .time ()
256
+
257
+ logger .info ("Start write workload over query service" )
258
+
259
+ request_settings = ydb .BaseRequestSettings ().with_timeout (timeout )
260
+ retry_setting = ydb .RetrySettings (
261
+ idempotent = True ,
262
+ max_session_acquire_timeout = timeout ,
263
+ )
264
+
265
+ with ydb .QuerySessionPool (driver ) as pool :
266
+ logger .info ("Session pool for read requests created" )
267
+
268
+ while time .time () - start_time < runtime :
269
+ row = row_generator .get ()
270
+ params = {
271
+ "$object_id" : (row .object_id , ydb .PrimitiveType .Uint64 ),
272
+ "$payload_str" : (row .payload_str , ydb .PrimitiveType .Utf8 ),
273
+ "$payload_double" : (row .payload_double , ydb .PrimitiveType .Double ),
274
+ "$payload_timestamp" : (row .payload_timestamp , ydb .PrimitiveType .Timestamp ),
275
+ }
276
+
277
+ def check_result (result ):
278
+ # we have to close stream by reading it till the end
279
+ with result :
280
+ pass
281
+
282
+ with limiter :
283
+ params = RequestParams (
284
+ pool = pool ,
285
+ query = query ,
286
+ params = params ,
287
+ metrics = metrics ,
288
+ labels = (JOB_WRITE_LABEL ,),
289
+ request_settings = request_settings ,
290
+ retry_settings = retry_setting ,
291
+ check_result_cb = check_result ,
292
+ )
293
+ execute_query (params )
294
+
295
+ logger .info ("Stop write workload" )
296
+
297
+
298
+ def run_write_jobs_query (args , driver , tb_name , max_id , metrics ):
299
+ logger .info ("Start write jobs for query service" )
300
+
301
+ write_q = WRITE_QUERY_TEMPLATE .format (tb_name )
302
+
303
+ write_limiter = RateLimiter (max_calls = args .write_rps , period = 1 )
304
+ row_generator = RowGenerator (max_id )
305
+
306
+ futures = []
307
+ for _ in range (args .write_threads ):
308
+ future = threading .Thread (
309
+ name = "slo_run_write" ,
310
+ target = run_writes_query ,
311
+ args = (driver , write_q , row_generator , metrics , write_limiter , args .time , args .write_timeout / 1000 ),
312
+ )
313
+ future .start ()
314
+ futures .append (future )
315
+ return futures
316
+
317
+
197
318
def push_metric (limiter , runtime , metrics ):
198
319
start_time = time .time ()
199
320
logger .info ("Start push metrics" )
0 commit comments