Skip to content

WIP: Query Service SLO #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/slo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
if: env.DOCKER_REPO != null
env:
DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }}
continue-on-error: true
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }}
Expand All @@ -48,11 +49,17 @@ jobs:
timeBetweenPhases: 30
shutdownTime: 30

language_id0: sync
language0: python-sync
language_id0: sync-python-table
language0: Python SDK over Table Service
workload_path0: tests/slo
workload_build_context0: ../..
workload_build_options0: -f Dockerfile
workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=sync-python-table

language_id1: sync-python-query
language1: Python SDK over Query Service
workload_path1: tests/slo
workload_build_context1: ../..
workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=sync-python-query

- uses: actions/upload-artifact@v3
if: env.DOCKER_REPO != null
Expand Down
2 changes: 2 additions & 0 deletions tests/slo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ COPY . /src
WORKDIR /src
RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt
WORKDIR tests/slo
ARG SDK_SERVICE
ENV SDK_SERVICE=$SDK_SERVICE

ENTRYPOINT ["python", "src"]
145 changes: 138 additions & 7 deletions tests/slo/src/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import dataclasses
from random import randint
from typing import Callable, Tuple
from typing import Callable, Tuple, Union
from ratelimiter import RateLimiter

import threading
Expand Down Expand Up @@ -31,12 +31,24 @@
);
"""

# QUERY_READ_QUERY_TEMPLATE = """
# SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id);
# """

# QUERY_WRITE_QUERY_TEMPLATE = """
# UPSERT INTO `{}` (
# object_id, object_hash, payload_str, payload_double, payload_timestamp
# ) VALUES (
# $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp
# );
# """

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class RequestParams:
pool: ydb.SessionPool
pool: Union[ydb.SessionPool, ydb.QuerySessionPool]
query: str
params: dict
metrics: Metrics
Expand All @@ -56,7 +68,7 @@ def transaction(session):

result = session.transaction().execute(
params.query,
params.params,
parameters=params.params,
commit_tx=True,
settings=params.request_settings,
)
Expand All @@ -82,7 +94,7 @@ def transaction(session):
def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start read workload")
logger.info("Start read workload over table service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
Expand Down Expand Up @@ -116,7 +128,7 @@ def check_result(result):


def run_read_jobs(args, driver, tb_name, max_id, metrics):
logger.info("Start read jobs")
logger.info("Start read jobs over table service")

session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name))
Expand All @@ -135,10 +147,65 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics):
return futures


def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start read workload over query service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
idempotent=True,
max_session_acquire_timeout=timeout,
)

with ydb.QuerySessionPool(driver) as pool:
logger.info("Session pool for read requests created")

while time.time() - start_time < runtime:
params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)}
with limiter:

def check_result(result):
res = next(result)
assert res.rows[0]

params = RequestParams(
pool=pool,
query=query,
params=params,
metrics=metrics,
labels=(JOB_READ_LABEL,),
request_settings=request_settings,
retry_settings=retry_setting,
check_result_cb=check_result,
)
execute_query(params)

logger.info("Stop read workload")


def run_read_jobs_query(args, driver, tb_name, max_id, metrics):
logger.info("Start read jobs over query service")

read_q = READ_QUERY_TEMPLATE.format(tb_name)

read_limiter = RateLimiter(max_calls=args.read_rps, period=1)
futures = []
for _ in range(args.read_threads):
future = threading.Thread(
name="slo_run_read",
target=run_reads_query,
args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000),
)
future.start()
futures.append(future)
return futures


def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start write workload")
logger.info("Start write workload over table service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
Expand All @@ -157,6 +224,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
"$payload_double": row.payload_double,
"$payload_timestamp": row.payload_timestamp,
}

with limiter:
params = RequestParams(
pool=pool,
Expand All @@ -173,7 +241,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)


def run_write_jobs(args, driver, tb_name, max_id, metrics):
logger.info("Start write jobs")
logger.info("Start write jobs over table service")

session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name))
Expand All @@ -194,6 +262,69 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics):
return futures


def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start write workload over query service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
idempotent=True,
max_session_acquire_timeout=timeout,
)

with ydb.QuerySessionPool(driver) as pool:
logger.info("Session pool for read requests created")

while time.time() - start_time < runtime:
row = row_generator.get()
params = {
"$object_id": (row.object_id, ydb.PrimitiveType.Uint64),
"$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8),
"$payload_double": (row.payload_double, ydb.PrimitiveType.Double),
"$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp),
}

def check_result(result):
with result:
pass

with limiter:
params = RequestParams(
pool=pool,
query=query,
params=params,
metrics=metrics,
labels=(JOB_WRITE_LABEL,),
request_settings=request_settings,
retry_settings=retry_setting,
check_result_cb=check_result,
)
execute_query(params)

logger.info("Stop write workload")


def run_write_jobs_query(args, driver, tb_name, max_id, metrics):
logger.info("Start write jobs for query service")

write_q = WRITE_QUERY_TEMPLATE.format(tb_name)

write_limiter = RateLimiter(max_calls=args.write_rps, period=1)
row_generator = RowGenerator(max_id)

futures = []
for _ in range(args.write_threads):
future = threading.Thread(
name="slo_run_write",
target=run_writes_query,
args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000),
)
future.start()
futures.append(future)
return futures


def push_metric(limiter, runtime, metrics):
start_time = time.time()
logger.info("Start push metrics")
Expand Down
7 changes: 5 additions & 2 deletions tests/slo/src/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write"
JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err"

SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table")
print(f"SDK_SERVICE_NAME = {SDK_SERVICE_NAME}")


class Metrics:
def __init__(self, push_gateway):
Expand Down Expand Up @@ -102,10 +105,10 @@ def stop(self, labels, start_time, attempts=1, error=None):
def push(self):
push_to_gateway(
self._push_gtw,
job="workload-sync",
job=f"workload-{SDK_SERVICE_NAME}",
registry=self._registry,
grouping_key={
"sdk": "python-sync",
"sdk": SDK_SERVICE_NAME,
"sdkVersion": version("ydb"),
},
)
Expand Down
34 changes: 24 additions & 10 deletions tests/slo/src/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

from jobs import run_read_jobs, run_write_jobs, run_metric_job
from metrics import Metrics
from jobs import (
run_read_jobs,
run_write_jobs,
run_read_jobs_query,
run_write_jobs_query,
run_metric_job,
)
from metrics import Metrics, SDK_SERVICE_NAME

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,17 +91,25 @@ def run_slo(args, driver, tb_name):
logger.info("Max ID: %s", max_id)

metrics = Metrics(args.prom_pgw)

futures = (
*run_read_jobs(args, driver, tb_name, max_id, metrics),
*run_write_jobs(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
if SDK_SERVICE_NAME == "sync-python-table":
futures = (
*run_read_jobs(args, driver, tb_name, max_id, metrics),
*run_write_jobs(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
elif SDK_SERVICE_NAME == "sync-python-query":
futures = (
*run_read_jobs_query(args, driver, tb_name, max_id, metrics),
*run_write_jobs_query(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
else:
raise ValueError(f"Unsupported service: {SDK_SERVICE_NAME}")

for future in futures:
future.join()

metrics.reset()
# metrics.reset()


def run_cleanup(args, driver, tb_name):
Expand All @@ -114,7 +128,7 @@ def run_from_args(args):
table_name = path.join(args.db, args.table_name)

with ydb.Driver(driver_config) as driver:
driver.wait(timeout=5)
driver.wait(timeout=300)
try:
if args.subcommand == "create":
run_create(args, driver, table_name)
Expand Down
Loading
Loading