Skip to content

Commit 4c5593f

Browse files
committed
SLO for query service
1 parent 8608a6f commit 4c5593f

File tree

3 files changed

+131
-6
lines changed

3 files changed

+131
-6
lines changed

tests/slo/src/jobs.py

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import dataclasses
55
from random import randint
6-
from typing import Callable, Tuple
6+
from typing import Callable, Tuple, Union
77
from ratelimiter import RateLimiter
88

99
import threading
@@ -36,7 +36,7 @@
3636

3737
@dataclasses.dataclass
3838
class RequestParams:
39-
pool: ydb.SessionPool
39+
pool: Union[ydb.SessionPool, ydb.QuerySessionPool]
4040
query: str
4141
params: dict
4242
metrics: Metrics
@@ -56,7 +56,7 @@ def transaction(session):
5656

5757
result = session.transaction().execute(
5858
params.query,
59-
params.params,
59+
parameters=params.params,
6060
commit_tx=True,
6161
settings=params.request_settings,
6262
)
@@ -135,6 +135,61 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics):
135135
return futures
136136

137137

138+
def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout):
139+
start_time = time.time()
140+
141+
logger.info("Start read workload")
142+
143+
request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
144+
retry_setting = ydb.RetrySettings(
145+
idempotent=True,
146+
max_session_acquire_timeout=timeout,
147+
)
148+
149+
with ydb.QuerySessionPool(driver) as pool:
150+
logger.info("Session pool for read requests created")
151+
152+
while time.time() - start_time < runtime:
153+
params = {"$object_id": randint(1, max_id)}
154+
with limiter:
155+
156+
def check_result(result):
157+
res = next(result)
158+
assert res[0].rows[0]
159+
160+
params = RequestParams(
161+
pool=pool,
162+
query=query,
163+
params=params,
164+
metrics=metrics,
165+
labels=(JOB_READ_LABEL,),
166+
request_settings=request_settings,
167+
retry_settings=retry_setting,
168+
check_result_cb=check_result,
169+
)
170+
execute_query(params)
171+
172+
logger.info("Stop read workload")
173+
174+
175+
def run_read_jobs_query(args, driver, tb_name, max_id, metrics):
176+
logger.info("Start read jobs for query service")
177+
178+
read_q = READ_QUERY_TEMPLATE.format(tb_name)
179+
180+
read_limiter = RateLimiter(max_calls=args.read_rps, period=1)
181+
futures = []
182+
for _ in range(args.read_threads):
183+
future = threading.Thread(
184+
name="slo_run_read",
185+
target=run_reads_query,
186+
args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000),
187+
)
188+
future.start()
189+
futures.append(future)
190+
return futures
191+
192+
138193
def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout):
139194
start_time = time.time()
140195

@@ -157,6 +212,11 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
157212
"$payload_double": row.payload_double,
158213
"$payload_timestamp": row.payload_timestamp,
159214
}
215+
216+
def check_result(result):
217+
with result:
218+
pass
219+
160220
with limiter:
161221
params = RequestParams(
162222
pool=pool,
@@ -166,6 +226,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
166226
labels=(JOB_WRITE_LABEL,),
167227
request_settings=request_settings,
168228
retry_settings=retry_setting,
229+
check_result_cb=check_result,
169230
)
170231
execute_query(params)
171232

@@ -194,6 +255,63 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics):
194255
return futures
195256

196257

258+
def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout):
259+
start_time = time.time()
260+
261+
logger.info("Start write workload")
262+
263+
request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
264+
retry_setting = ydb.RetrySettings(
265+
idempotent=True,
266+
max_session_acquire_timeout=timeout,
267+
)
268+
269+
with ydb.QuerySessionPool(driver) as pool:
270+
logger.info("Session pool for read requests created")
271+
272+
while time.time() - start_time < runtime:
273+
row = row_generator.get()
274+
params = {
275+
"$object_id": (row.object_id, ydb.PrimitiveType.Int64),
276+
"$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8),
277+
"$payload_double": (row.payload_double, ydb.PrimitiveType.Double),
278+
"$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp),
279+
}
280+
with limiter:
281+
params = RequestParams(
282+
pool=pool,
283+
query=query,
284+
params=params,
285+
metrics=metrics,
286+
labels=(JOB_WRITE_LABEL,),
287+
request_settings=request_settings,
288+
retry_settings=retry_setting,
289+
)
290+
execute_query(params)
291+
292+
logger.info("Stop write workload")
293+
294+
295+
def run_write_jobs_query(args, driver, tb_name, max_id, metrics):
296+
logger.info("Start write jobs for query service")
297+
298+
write_q = WRITE_QUERY_TEMPLATE.format(tb_name)
299+
300+
write_limiter = RateLimiter(max_calls=args.write_rps, period=1)
301+
row_generator = RowGenerator(max_id)
302+
303+
futures = []
304+
for _ in range(args.write_threads):
305+
future = threading.Thread(
306+
name="slo_run_write",
307+
target=run_writes_query,
308+
args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000),
309+
)
310+
future.start()
311+
futures.append(future)
312+
return futures
313+
314+
197315
def push_metric(limiter, runtime, metrics):
198316
start_time = time.time()
199317
logger.info("Start push metrics")

tests/slo/src/runner.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77
import concurrent.futures
88
from concurrent.futures import ThreadPoolExecutor
99

10-
from jobs import run_read_jobs, run_write_jobs, run_metric_job
10+
from jobs import (
11+
run_read_jobs,
12+
run_write_jobs,
13+
run_read_jobs_query,
14+
run_write_jobs_query,
15+
run_metric_job,
16+
)
1117
from metrics import Metrics, SDK_SERVICE_NAME
1218

1319
logger = logging.getLogger(__name__)
@@ -93,8 +99,8 @@ def run_slo(args, driver, tb_name):
9399
)
94100
elif SDK_SERVICE_NAME == "query-service":
95101
futures = (
96-
*run_read_jobs(args, driver, tb_name, max_id, metrics),
97-
*run_write_jobs(args, driver, tb_name, max_id, metrics),
102+
*run_read_jobs_query(args, driver, tb_name, max_id, metrics),
103+
*run_write_jobs_query(args, driver, tb_name, max_id, metrics),
98104
run_metric_job(args, metrics),
99105
)
100106
else:

ydb/query/transaction.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ def execute(
370370
exec_mode: Optional[base.QueryExecMode] = None,
371371
parameters: Optional[dict] = None,
372372
concurrent_result_sets: Optional[bool] = False,
373+
settings: dict = None, # TODO: temporary hack
373374
) -> base.SyncResponseContextIterator:
374375
"""WARNING: This API is experimental and could be changed.
375376

0 commit comments

Comments
 (0)