From 95099cf7f6c1e87356b0ab5a66af0b355810b91c Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:17:55 +0300 Subject: [PATCH 01/17] specify service to slo via arg --- .github/workflows/slo.yml | 12 +++++++++--- tests/slo/Dockerfile | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 701d6d98..23da75bc 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -48,11 +48,17 @@ jobs: timeBetweenPhases: 30 shutdownTime: 30 - language_id0: sync - language0: python-sync + language_id0: sync-table + language0: Python SDK over Table Service workload_path0: tests/slo workload_build_context0: ../.. - workload_build_options0: -f Dockerfile + workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=table-service + + language_id1: sync-query + language1: Python SDK over Query Service + workload_path1: tests/slo + workload_build_context1: ../.. + workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=query-service - uses: actions/upload-artifact@v3 if: env.DOCKER_REPO != null diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index bcb01d72..a948d69c 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -3,5 +3,6 @@ COPY . /src WORKDIR /src RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt WORKDIR tests/slo +ARG SDK_SERVICE ENTRYPOINT ["python", "src"] From d2c8526828f1d56f2a8ec344a227d86f32045b9b Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:17:55 +0300 Subject: [PATCH 02/17] fix hardcoded metric name --- .github/workflows/slo.yml | 4 ++-- tests/slo/src/metrics.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 23da75bc..a0be4442 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -48,13 +48,13 @@ jobs: timeBetweenPhases: 30 shutdownTime: 30 - language_id0: sync-table + language_id0: sync-python-table language0: Python SDK over Table Service workload_path0: tests/slo workload_build_context0: ../.. workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=table-service - language_id1: sync-query + language_id1: sync-python-query language1: Python SDK over Query Service workload_path1: tests/slo workload_build_context1: ../.. diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 34e6ca80..14abd8e7 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -13,6 +13,8 @@ JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write" JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" +SDK_SERVICE_NAME = "sync-python-table" + class Metrics: def __init__(self, push_gateway): @@ -105,7 +107,7 @@ def push(self): job="workload-sync", registry=self._registry, grouping_key={ - "sdk": "python-sync", + "sdk": SDK_SERVICE_NAME, "sdkVersion": version("ydb"), }, ) From ebc1d51adce0abf5618e50b9e99faa741108a4d3 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:17:55 +0300 Subject: [PATCH 03/17] switch between different slo services --- tests/slo/src/metrics.py | 2 +- tests/slo/src/runner.py | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 14abd8e7..722c66eb 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -13,7 +13,7 @@ JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write" JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" -SDK_SERVICE_NAME = "sync-python-table" +SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "table-service") class Metrics: diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index d2957d62..bd1ee636 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor from jobs import run_read_jobs, run_write_jobs, run_metric_job -from metrics import Metrics +from metrics import Metrics, SDK_SERVICE_NAME logger = logging.getLogger(__name__) @@ -85,12 +85,20 @@ def run_slo(args, driver, tb_name): logger.info("Max ID: %s", max_id) metrics = Metrics(args.prom_pgw) - - futures = ( - *run_read_jobs(args, driver, tb_name, max_id, metrics), - *run_write_jobs(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), - ) + if SDK_SERVICE_NAME == "table-service": + futures = ( + *run_read_jobs(args, driver, tb_name, max_id, metrics), + *run_write_jobs(args, driver, tb_name, max_id, metrics), + run_metric_job(args, metrics), + ) + elif SDK_SERVICE_NAME == "query-service": + futures = ( + *run_read_jobs(args, driver, tb_name, max_id, metrics), + *run_write_jobs(args, driver, tb_name, max_id, metrics), + run_metric_job(args, metrics), + ) + else: + raise ValueError(f"Unsupported service: {SDK_SERVICE_NAME}") for future in futures: future.join() From 88fbd4516859d0c9f08a69d2899c3f093538cff2 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:25 +0300 Subject: [PATCH 04/17] SLO for query service --- tests/slo/src/jobs.py | 124 +++++++++++++++++++++++++++++++++++++++- tests/slo/src/runner.py | 12 +++- 2 files changed, 130 insertions(+), 6 deletions(-) diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 8a2f5f20..3d19bc67 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -3,7 +3,7 @@ import logging import dataclasses from random import randint -from typing import Callable, Tuple +from typing import Callable, Tuple, Union from ratelimiter import RateLimiter import threading @@ -36,7 +36,7 @@ @dataclasses.dataclass class RequestParams: - pool: ydb.SessionPool + pool: Union[ydb.SessionPool, ydb.QuerySessionPool] query: str params: dict metrics: Metrics @@ -56,7 +56,7 @@ def transaction(session): result = session.transaction().execute( params.query, - params.params, + parameters=params.params, commit_tx=True, settings=params.request_settings, ) @@ -135,6 +135,61 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics): return futures +def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): + start_time = time.time() + + logger.info("Start read workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(timeout) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=timeout, + ) + + with ydb.QuerySessionPool(driver) as pool: + logger.info("Session pool for read requests created") + + while time.time() - start_time < runtime: + params = {"$object_id": randint(1, max_id)} + with limiter: + + def check_result(result): + res = next(result) + assert res[0].rows[0] + + params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=metrics, + labels=(JOB_READ_LABEL,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(params) + + logger.info("Stop read workload") + + +def run_read_jobs_query(args, driver, tb_name, max_id, metrics): + logger.info("Start read jobs for query service") + + read_q = READ_QUERY_TEMPLATE.format(tb_name) + + read_limiter = RateLimiter(max_calls=args.read_rps, period=1) + futures = [] + for _ in range(args.read_threads): + future = threading.Thread( + name="slo_run_read", + target=run_reads_query, + args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): start_time = time.time() @@ -157,6 +212,11 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) "$payload_double": row.payload_double, "$payload_timestamp": row.payload_timestamp, } + + def check_result(result): + with result: + pass + with limiter: params = RequestParams( pool=pool, @@ -166,6 +226,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) labels=(JOB_WRITE_LABEL,), request_settings=request_settings, retry_settings=retry_setting, + check_result_cb=check_result, ) execute_query(params) @@ -194,6 +255,63 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics): return futures +def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout): + start_time = time.time() + + logger.info("Start write workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(timeout) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=timeout, + ) + + with ydb.QuerySessionPool(driver) as pool: + logger.info("Session pool for read requests created") + + while time.time() - start_time < runtime: + row = row_generator.get() + params = { + "$object_id": (row.object_id, ydb.PrimitiveType.Int64), + "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), + "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), + "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), + } + with limiter: + params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=metrics, + labels=(JOB_WRITE_LABEL,), + request_settings=request_settings, + retry_settings=retry_setting, + ) + execute_query(params) + + logger.info("Stop write workload") + + +def run_write_jobs_query(args, driver, tb_name, max_id, metrics): + logger.info("Start write jobs for query service") + + write_q = WRITE_QUERY_TEMPLATE.format(tb_name) + + write_limiter = RateLimiter(max_calls=args.write_rps, period=1) + row_generator = RowGenerator(max_id) + + futures = [] + for _ in range(args.write_threads): + future = threading.Thread( + name="slo_run_write", + target=run_writes_query, + args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + def push_metric(limiter, runtime, metrics): start_time = time.time() logger.info("Start push metrics") diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index bd1ee636..8daeb9c6 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -7,7 +7,13 @@ import concurrent.futures from concurrent.futures import ThreadPoolExecutor -from jobs import run_read_jobs, run_write_jobs, run_metric_job +from jobs import ( + run_read_jobs, + run_write_jobs, + run_read_jobs_query, + run_write_jobs_query, + run_metric_job, +) from metrics import Metrics, SDK_SERVICE_NAME logger = logging.getLogger(__name__) @@ -93,8 +99,8 @@ def run_slo(args, driver, tb_name): ) elif SDK_SERVICE_NAME == "query-service": futures = ( - *run_read_jobs(args, driver, tb_name, max_id, metrics), - *run_write_jobs(args, driver, tb_name, max_id, metrics), + *run_read_jobs_query(args, driver, tb_name, max_id, metrics), + *run_write_jobs_query(args, driver, tb_name, max_id, metrics), run_metric_job(args, metrics), ) else: From d21ae3ca42e204862fdbc7661801937f66affe19 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 05/17] try fix charts --- .github/workflows/slo.yml | 4 ++-- tests/slo/src/jobs.py | 33 +++++++++++++++++++++++---------- tests/slo/src/metrics.py | 2 +- tests/slo/src/runner.py | 4 ++-- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index a0be4442..87144db0 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -52,13 +52,13 @@ jobs: language0: Python SDK over Table Service workload_path0: tests/slo workload_build_context0: ../.. - workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=table-service + workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=sync-python-table language_id1: sync-python-query language1: Python SDK over Query Service workload_path1: tests/slo workload_build_context1: ../.. - workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=query-service + workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=sync-python-query - uses: actions/upload-artifact@v3 if: env.DOCKER_REPO != null diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 3d19bc67..5df16c44 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -31,6 +31,18 @@ ); """ +QUERY_READ_QUERY_TEMPLATE = """ +SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); +""" + +QUERY_WRITE_QUERY_TEMPLATE = """ +UPSERT INTO `{}` ( + object_id, object_hash, payload_str, payload_double, payload_timestamp +) VALUES ( + $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp +); +""" + logger = logging.getLogger(__name__) @@ -150,12 +162,12 @@ def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): logger.info("Session pool for read requests created") while time.time() - start_time < runtime: - params = {"$object_id": randint(1, max_id)} + params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)} with limiter: def check_result(result): res = next(result) - assert res[0].rows[0] + assert res.rows[0] params = RequestParams( pool=pool, @@ -175,7 +187,7 @@ def check_result(result): def run_read_jobs_query(args, driver, tb_name, max_id, metrics): logger.info("Start read jobs for query service") - read_q = READ_QUERY_TEMPLATE.format(tb_name) + read_q = QUERY_READ_QUERY_TEMPLATE.format(tb_name) read_limiter = RateLimiter(max_calls=args.read_rps, period=1) futures = [] @@ -213,10 +225,6 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) "$payload_timestamp": row.payload_timestamp, } - def check_result(result): - with result: - pass - with limiter: params = RequestParams( pool=pool, @@ -226,7 +234,6 @@ def check_result(result): labels=(JOB_WRITE_LABEL,), request_settings=request_settings, retry_settings=retry_setting, - check_result_cb=check_result, ) execute_query(params) @@ -272,11 +279,16 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti while time.time() - start_time < runtime: row = row_generator.get() params = { - "$object_id": (row.object_id, ydb.PrimitiveType.Int64), + "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), } + + def check_result(result): + with result: + pass + with limiter: params = RequestParams( pool=pool, @@ -286,6 +298,7 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti labels=(JOB_WRITE_LABEL,), request_settings=request_settings, retry_settings=retry_setting, + check_result_cb=check_result, ) execute_query(params) @@ -295,7 +308,7 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti def run_write_jobs_query(args, driver, tb_name, max_id, metrics): logger.info("Start write jobs for query service") - write_q = WRITE_QUERY_TEMPLATE.format(tb_name) + write_q = QUERY_WRITE_QUERY_TEMPLATE.format(tb_name) write_limiter = RateLimiter(max_calls=args.write_rps, period=1) row_generator = RowGenerator(max_id) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 722c66eb..9dc201f4 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -13,7 +13,7 @@ JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write" JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" -SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "table-service") +SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table") class Metrics: diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index 8daeb9c6..cc8908ca 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -91,13 +91,13 @@ def run_slo(args, driver, tb_name): logger.info("Max ID: %s", max_id) metrics = Metrics(args.prom_pgw) - if SDK_SERVICE_NAME == "table-service": + if SDK_SERVICE_NAME == "sync-python-table": futures = ( *run_read_jobs(args, driver, tb_name, max_id, metrics), *run_write_jobs(args, driver, tb_name, max_id, metrics), run_metric_job(args, metrics), ) - elif SDK_SERVICE_NAME == "query-service": + elif SDK_SERVICE_NAME == "sync-python-query": futures = ( *run_read_jobs_query(args, driver, tb_name, max_id, metrics), *run_write_jobs_query(args, driver, tb_name, max_id, metrics), From 67ca0f0359921c5d79531220fa0bbfc33c89bf02 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 06/17] try to increase timeout on driver wait --- tests/slo/src/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index cc8908ca..091a975a 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -128,7 +128,7 @@ def run_from_args(args): table_name = path.join(args.db, args.table_name) with ydb.Driver(driver_config) as driver: - driver.wait(timeout=5) + driver.wait(timeout=60) try: if args.subcommand == "create": run_create(args, driver, table_name) From 83bcbe1e19df888fe112c2bf20b6d0f4140a0655 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 07/17] try something new --- tests/slo/src/metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 9dc201f4..ea989a0c 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -109,6 +109,7 @@ def push(self): grouping_key={ "sdk": SDK_SERVICE_NAME, "sdkVersion": version("ydb"), + "jobName": SDK_SERVICE_NAME, }, ) From 473a8960eba3477639f45b77452eef6b32b19aaa Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 08/17] increase wait timeout --- tests/slo/src/metrics.py | 1 - tests/slo/src/runner.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index ea989a0c..9dc201f4 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -109,7 +109,6 @@ def push(self): grouping_key={ "sdk": SDK_SERVICE_NAME, "sdkVersion": version("ydb"), - "jobName": SDK_SERVICE_NAME, }, ) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index 091a975a..b9380436 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -128,7 +128,7 @@ def run_from_args(args): table_name = path.join(args.db, args.table_name) with ydb.Driver(driver_config) as driver: - driver.wait(timeout=60) + driver.wait(timeout=300) try: if args.subcommand == "create": run_create(args, driver, table_name) From a7b06ce52d3f88a4b6aeffd94fc71b511585e5ea Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 09/17] try again --- tests/slo/src/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 9dc201f4..b9d33a5c 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -104,7 +104,7 @@ def stop(self, labels, start_time, attempts=1, error=None): def push(self): push_to_gateway( self._push_gtw, - job="workload-sync", + job=f"workload-{SDK_SERVICE_NAME}", registry=self._registry, grouping_key={ "sdk": SDK_SERVICE_NAME, From 9ab156560bc16220cb110fc8a8ecca1dd9b1957a Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 19 Aug 2024 13:18:57 +0300 Subject: [PATCH 10/17] hack to disable reset after run --- tests/slo/src/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index b9380436..59110400 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -109,7 +109,7 @@ def run_slo(args, driver, tb_name): for future in futures: future.join() - metrics.reset() + # metrics.reset() def run_cleanup(args, driver, tb_name): From 6f1d7a329d5b2cec89e48e58ac69743631569eb8 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 14:14:36 +0300 Subject: [PATCH 11/17] extend job logs --- tests/slo/src/jobs.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 5df16c44..5ed80938 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -94,7 +94,7 @@ def transaction(session): def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start read workload") + logger.info("Start read workload over table service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( @@ -128,7 +128,7 @@ def check_result(result): def run_read_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs") + logger.info("Start read jobs over table service") session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name)) @@ -150,7 +150,7 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics): def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start read workload") + logger.info("Start read workload over query service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( @@ -185,7 +185,7 @@ def check_result(result): def run_read_jobs_query(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs for query service") + logger.info("Start read jobs over query service") read_q = QUERY_READ_QUERY_TEMPLATE.format(tb_name) @@ -205,7 +205,7 @@ def run_read_jobs_query(args, driver, tb_name, max_id, metrics): def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start write workload") + logger.info("Start write workload over table service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( @@ -241,7 +241,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) def run_write_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs") + logger.info("Start write jobs over table service") session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name)) @@ -265,7 +265,7 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics): def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start write workload") + logger.info("Start write workload over query service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( From 5412369ca360c30834d9815eba0c5d2ca6a42ec4 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 14:24:12 +0300 Subject: [PATCH 12/17] additional logs to SLO run --- tests/slo/src/metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index b9d33a5c..467314d1 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -14,6 +14,7 @@ JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table") +print(f"SDK_SERVICE_NAME = {SDK_SERVICE_NAME}") class Metrics: From 331b724cff5fde168f319e191d7fe827a6403abd Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 14:51:16 +0300 Subject: [PATCH 13/17] try to fix docker envs --- tests/slo/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index a948d69c..e705e624 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -4,5 +4,6 @@ WORKDIR /src RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt WORKDIR tests/slo ARG SDK_SERVICE +ENV SDK_SERVICE=$SDK_SERVICE ENTRYPOINT ["python", "src"] From b6f4f88a0aa0607893440e82632f8b41bb381ef6 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 15:11:19 +0300 Subject: [PATCH 14/17] skip on failure --- .github/workflows/slo.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 87144db0..4ca0adac 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -31,6 +31,7 @@ jobs: if: env.DOCKER_REPO != null env: DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }} + continue-on-error: true with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }} From e68168ab14f7c23204b90a334c0d637d862554c8 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 15:26:23 +0300 Subject: [PATCH 15/17] try to fix --- tests/slo/src/jobs.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 5ed80938..545b345c 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -31,17 +31,17 @@ ); """ -QUERY_READ_QUERY_TEMPLATE = """ -SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); -""" - -QUERY_WRITE_QUERY_TEMPLATE = """ -UPSERT INTO `{}` ( - object_id, object_hash, payload_str, payload_double, payload_timestamp -) VALUES ( - $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp -); -""" +# QUERY_READ_QUERY_TEMPLATE = """ +# SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); +# """ + +# QUERY_WRITE_QUERY_TEMPLATE = """ +# UPSERT INTO `{}` ( +# object_id, object_hash, payload_str, payload_double, payload_timestamp +# ) VALUES ( +# $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp +# ); +# """ logger = logging.getLogger(__name__) @@ -187,7 +187,7 @@ def check_result(result): def run_read_jobs_query(args, driver, tb_name, max_id, metrics): logger.info("Start read jobs over query service") - read_q = QUERY_READ_QUERY_TEMPLATE.format(tb_name) + read_q = READ_QUERY_TEMPLATE.format(tb_name) read_limiter = RateLimiter(max_calls=args.read_rps, period=1) futures = [] @@ -308,7 +308,7 @@ def check_result(result): def run_write_jobs_query(args, driver, tb_name, max_id, metrics): logger.info("Start write jobs for query service") - write_q = QUERY_WRITE_QUERY_TEMPLATE.format(tb_name) + write_q = WRITE_QUERY_TEMPLATE.format(tb_name) write_limiter = RateLimiter(max_calls=args.write_rps, period=1) row_generator = RowGenerator(max_id) From a27d6798892196b06dfe5387c48913af02290f28 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 16:03:38 +0300 Subject: [PATCH 16/17] change settings obj on requests --- ydb/aio/query/transaction.py | 19 +++++++++--------- ydb/query/transaction.py | 39 +++++++++++++++++++----------------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index e9993fcc..429ba125 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -5,6 +5,7 @@ from .base import AsyncResponseContextIterator from ... import issues +from ...settings import BaseRequestSettings from ...query import base from ...query.transaction import ( BaseQueryTxContext, @@ -46,25 +47,25 @@ async def _ensure_prev_stream_finished(self) -> None: pass self._prev_stream = None - async def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextAsync": + async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextAsync": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: None or exception if begin is failed """ await self._begin_call(settings) return self - async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls commit on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -79,13 +80,13 @@ async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> N await self._commit_call(settings) - async def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -108,7 +109,7 @@ async def execute( syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, - settings: Optional[base.QueryClientSettings] = None, + settings: Optional[BaseRequestSettings] = None, ) -> AsyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -137,9 +138,9 @@ async def execute( exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) - settings = settings if settings is not None else self.session._settings self._prev_stream = AsyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -147,7 +148,7 @@ async def execute( response_pb=resp, tx=self, commit_tx=commit_tx, - settings=settings, + settings=self.session._settings, ), ) return self._prev_stream diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 750a94b0..be7396b1 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -15,6 +15,7 @@ from ..connection import _RpcState as RpcState from . import base +from ..settings import BaseRequestSettings logger = logging.getLogger(__name__) @@ -214,7 +215,7 @@ def tx_id(self) -> Optional[str]: """ return self._tx_state.tx_id - def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _begin_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) return self._driver( @@ -226,7 +227,7 @@ def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQuer (self._session_state, self._tx_state, self), ) - def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _commit_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) return self._driver( @@ -238,7 +239,7 @@ def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQue (self._session_state, self._tx_state, self), ) - def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _rollback_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) return self._driver( @@ -253,11 +254,12 @@ def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQ def _execute_call( self, query: str, - commit_tx: bool = False, - syntax: base.QuerySyntax = None, - exec_mode: base.QueryExecMode = None, - parameters: dict = None, - concurrent_result_sets: bool = False, + commit_tx: Optional[bool], + syntax: Optional[base.QuerySyntax], + exec_mode: Optional[base.QueryExecMode], + parameters: Optional[dict], + concurrent_result_sets: Optional[bool], + settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() @@ -277,6 +279,7 @@ def _execute_call( request.to_proto(), _apis.QueryService.Stub, _apis.QueryService.ExecuteQuery, + settings=settings, ) def _move_to_beginned(self, tx_id: str) -> None: @@ -323,12 +326,12 @@ def _ensure_prev_stream_finished(self) -> None: pass self._prev_stream = None - def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextSync": + def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextSync": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: Transaction object or exception if begin is failed """ @@ -336,13 +339,13 @@ def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTx return self - def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls commit on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -357,13 +360,13 @@ def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: self._commit_call(settings) - def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -386,7 +389,7 @@ def execute( syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, - settings: Optional[base.QueryClientSettings] = None, + settings: Optional[BaseRequestSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -403,7 +406,7 @@ def execute( 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; - :param settings: An additional request settings QueryClientSettings; + :param settings: An additional request settings BaseRequestSettings; :return: Iterator with result sets """ @@ -416,9 +419,9 @@ def execute( exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) - settings = settings if settings is not None else self.session._settings self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -426,7 +429,7 @@ def execute( response_pb=resp, tx=self, commit_tx=commit_tx, - settings=settings, + settings=self.session._settings, ), ) return self._prev_stream From 197c428f38d847a87cc16c06311b088e44e53322 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 20 Aug 2024 19:22:47 +0300 Subject: [PATCH 17/17] review fixes --- tests/slo/src/jobs.py | 12 +----------- tests/slo/src/metrics.py | 1 - tests/slo/src/runner.py | 2 +- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 545b345c..3fb1833a 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -31,17 +31,6 @@ ); """ -# QUERY_READ_QUERY_TEMPLATE = """ -# SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); -# """ - -# QUERY_WRITE_QUERY_TEMPLATE = """ -# UPSERT INTO `{}` ( -# object_id, object_hash, payload_str, payload_double, payload_timestamp -# ) VALUES ( -# $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp -# ); -# """ logger = logging.getLogger(__name__) @@ -286,6 +275,7 @@ def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, ti } def check_result(result): + # we have to close stream by reading it till the end with result: pass diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 467314d1..b9d33a5c 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -14,7 +14,6 @@ JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table") -print(f"SDK_SERVICE_NAME = {SDK_SERVICE_NAME}") class Metrics: diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index 59110400..b9380436 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -109,7 +109,7 @@ def run_slo(args, driver, tb_name): for future in futures: future.join() - # metrics.reset() + metrics.reset() def run_cleanup(args, driver, tb_name):