Skip to content

Commit cf983da

Browse files
authored
Move workloads to separate directory (tests/workloads) (#12883)
1 parent 9dd07f5 commit cf983da

File tree

10 files changed

+229
-8
lines changed

10 files changed

+229
-8
lines changed

ydb/tests/stability/library/ya.make

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ DEPENDS(
1414
)
1515

1616
BUNDLE(
17-
ydb/tools/simple_queue NAME simple_queue
17+
ydb/tests/workloads/simple_queue NAME simple_queue
1818
ydb/tools/olap_workload NAME olap_workload
19-
ydb/tools/statistics_workload NAME statistics_workload
19+
ydb/tests/workloads/statistics_workload NAME statistics_workload
2020
ydb/tools/cfg/bin NAME cfg
2121
ydb/tests/tools/nemesis/driver NAME nemesis
2222
)

ydb/tests/stability/ydb/test_stability.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ class TestSetupForStability(object):
5656
stress_binaries_deploy_path = '/Berkanavt/nemesis/bin/'
5757
artifacts = (
5858
yatest.common.binary_path('ydb/tests/tools/nemesis/driver/nemesis'),
59-
yatest.common.binary_path('ydb/tools/simple_queue/simple_queue'),
59+
yatest.common.binary_path('ydb/tests/workloads/simple_queue/simple_queue'),
6060
yatest.common.binary_path('ydb/tools/olap_workload/olap_workload'),
61-
yatest.common.binary_path('ydb/tools/statistics_workload/statistics_workload'),
61+
yatest.common.binary_path('ydb/tests/workloads/statistics_workload'),
6262
)
6363

6464
@classmethod

ydb/tests/stability/ydb/ya.make

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ DATA(
1212
)
1313

1414
DEPENDS(
15-
ydb/tools/simple_queue
15+
ydb/tests/workloads/simple_queue
1616
ydb/tools/olap_workload
17-
ydb/tools/statistics_workload
17+
ydb/tests/workloads/statistics_workload
1818
ydb/tools/cfg/bin
1919
ydb/tests/tools/nemesis/driver
2020
)
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
import ydb
4+
import logging
5+
import time
6+
import os
7+
import random
8+
import string
9+
from ydb.tests.library.clients.kikimr_client import kikimr_client_factory
10+
from ydb.tests.library.common.protobuf_ss import SchemeDescribeRequest
11+
12+
ydb.interceptor.monkey_patch_event_handler()
13+
14+
15+
logger = logging.getLogger("StatisticsWorkload")
16+
17+
18+
def table_name_with_prefix(table_prefix):
19+
table_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
20+
return os.path.join(table_prefix + "_" + table_suffix)
21+
22+
23+
def random_string(length):
24+
letters = string.ascii_lowercase
25+
return ''.join(random.choice(letters) for i in range(length))
26+
27+
28+
def random_type():
29+
return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])
30+
31+
32+
def random_value(type):
33+
if isinstance(type, ydb.OptionalType):
34+
return random_value(type.item)
35+
if type == ydb.PrimitiveType.Int64:
36+
return random.randint(0, 1 << 31)
37+
if type == ydb.PrimitiveType.String:
38+
return bytes(random_string(random.randint(1, 32)), encoding='utf8')
39+
40+
41+
class Workload(object):
42+
def __init__(self, host, port, database, duration, batch_size, batch_count):
43+
self.database = database
44+
self.driver = ydb.Driver(ydb.DriverConfig(f"{host}:{port}", database))
45+
self.kikimr_client = kikimr_client_factory(host, port)
46+
self.pool = ydb.SessionPool(self.driver, size=200)
47+
self.duration = duration
48+
self.batch_size = batch_size
49+
self.batch_count = batch_count
50+
51+
def __enter__(self):
52+
return self
53+
54+
def __exit__(self, exc_type, exc_val, exc_tb):
55+
self.pool.stop()
56+
self.driver.stop()
57+
58+
def run_query_ignore_errors(self, callee):
59+
try:
60+
self.pool.retry_operation_sync(callee)
61+
except Exception as e:
62+
logger.error(f'{type(e)}, {e}')
63+
64+
def generate_batch(self, schema):
65+
data = []
66+
for i in range(self.batch_size):
67+
data.append({c.name: random_value(c.type) for c in schema})
68+
return data
69+
70+
def create_table(self, table_name):
71+
def callee(session):
72+
session.execute_scheme(f"""
73+
CREATE TABLE `{table_name}` (
74+
id Int64 NOT NULL,
75+
value Int64,
76+
PRIMARY KEY(id)
77+
)
78+
PARTITION BY HASH(id)
79+
WITH (
80+
STORE = COLUMN
81+
)
82+
""")
83+
self.run_query_ignore_errors(callee)
84+
85+
def enable_statistics(self, table_name):
86+
def callee(session):
87+
session.execute_scheme(f"""
88+
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['id']}`);
89+
""")
90+
session.execute_scheme(f"""
91+
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['value']}`);
92+
""")
93+
self.run_query_ignore_errors(callee)
94+
95+
def drop_table(self, table_name):
96+
def callee(session):
97+
session.drop_table(table_name)
98+
self.run_query_ignore_errors(callee)
99+
100+
def list_columns(self, table_path):
101+
def callee(session):
102+
return session.describe_table(table_path).columns
103+
return self.pool.retry_operation_sync(callee)
104+
105+
def add_data(self, table_path, trace_id):
106+
logger.info(f"[{trace_id}] insert {self.batch_count} batches of {self.batch_size} bytes each")
107+
schema = self.list_columns(table_path)
108+
column_types = ydb.BulkUpsertColumns()
109+
110+
for c in schema:
111+
column_types.add_column(c.name, c.type)
112+
113+
for i in range(self.batch_count):
114+
logger.info(f"[{trace_id}] add batch #{i}")
115+
batch = self.generate_batch(schema)
116+
self.driver.table_client.bulk_upsert(table_path, batch, column_types)
117+
118+
def rows_count(self, table_name):
119+
return self.driver.table_client.scan_query(f"SELECT count(*) FROM `{table_name}`").next().result_set.rows[0][0]
120+
121+
def statistics_count(self, table_statistics, path_id):
122+
query = f"SELECT count(*) FROM `{table_statistics}` WHERE local_path_id = {path_id}"
123+
return self.driver.table_client.scan_query(query).next().result_set.rows[0][0]
124+
125+
def analyze(self, table_path):
126+
def callee(session):
127+
session.execute_scheme(f"ANALYZE `{table_path}`")
128+
self.run_query_ignore_errors(callee)
129+
130+
def execute(self):
131+
table_prefix = "test_table"
132+
table_name = table_name_with_prefix(table_prefix)
133+
table_path = self.database + "/" + table_name
134+
table_statistics = ".metadata/_statistics"
135+
trace_id = random_string(5)
136+
137+
try:
138+
logger.info(f"[{trace_id}] start new round")
139+
140+
self.pool.acquire()
141+
142+
logger.info(f"[{trace_id}] create table '{table_name}'")
143+
self.create_table(table_name)
144+
145+
scheme = self.kikimr_client.send(
146+
SchemeDescribeRequest(table_path).protobuf,
147+
method='SchemeDescribe'
148+
)
149+
path_id = scheme.PathDescription.Self.PathId
150+
logger.info(f"[{trace_id}] table '{table_name}' path id: {path_id}")
151+
152+
self.add_data(table_path, trace_id)
153+
count = self.rows_count(table_name)
154+
logger.info(f"[{trace_id}] number of rows in table '{table_name}' {count}")
155+
if count != self.batch_count*self.batch_size:
156+
raise Exception(f"[{trace_id}] the number of rows in the '{table_name}' does not match the expected")
157+
158+
logger.info(f"[{trace_id}] waiting to receive information about the table '{table_name}' from scheme shard")
159+
time.sleep(300)
160+
161+
logger.info(f"[{trace_id}] analyze '{table_name}'")
162+
self.analyze(table_path)
163+
164+
count = self.statistics_count(table_statistics, path_id)
165+
logger.info(f"[{trace_id}] number of rows in statistics table '{table_statistics}' {count}")
166+
if count == 0:
167+
raise Exception(f"[{trace_id}] statistics table '{table_statistics}' is empty")
168+
except Exception as e:
169+
logger.error(f"[{trace_id}] {type(e)}, {e}")
170+
171+
logger.info(f"[{trace_id}] drop table '{table_name}'")
172+
self.drop_table(table_path)
173+
174+
def run(self):
175+
started_at = time.time()
176+
177+
while time.time() - started_at < self.duration:
178+
self.execute()
179+
180+
181+
if __name__ == '__main__':
182+
parser = argparse.ArgumentParser(
183+
description="statistics stability workload", formatter_class=argparse.RawDescriptionHelpFormatter
184+
)
185+
parser.add_argument('--host', default='localhost', help="An host to be used")
186+
parser.add_argument('--port', default='2135', help="A port to be used")
187+
parser.add_argument('--database', default=None, required=True, help='A database to connect')
188+
parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
189+
parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
190+
parser.add_argument('--batch_count', default=3, help='The number of butches to be inserted')
191+
parser.add_argument('--log_file', default=None, help='Append log into specified file')
192+
193+
args = parser.parse_args()
194+
195+
if args.log_file:
196+
logging.basicConfig(
197+
filename=args.log_file,
198+
filemode='a',
199+
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
200+
datefmt='%H:%M:%S',
201+
level=logging.INFO
202+
)
203+
204+
with Workload(args.host, args.port, args.database, args.duration, args.batch_size, args.batch_count) as workload:
205+
workload.run()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
PY3_PROGRAM(statistics_workload)
2+
3+
PY_SRCS(
4+
__main__.py
5+
)
6+
7+
PEERDIR(
8+
ydb/tests/library
9+
ydb/public/sdk/python
10+
library/python/monlib
11+
)
12+
13+
END()

ydb/tests/workloads/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
RECURSE(
2+
simple_queue
3+
statistics_workload
4+
)

ydb/tests/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ RECURSE(
1010
stability
1111
supp
1212
tools
13+
workloads
1314
)

ydb/tools/ya.make

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ RECURSE(
33
cfg
44
query_replay
55
query_replay_yt
6-
simple_queue
76
olap_workload
8-
statistics_workload
97
stress_tool
108
tsserver
119
tstool

0 commit comments

Comments
 (0)