Skip to content

Commit 1429695

Browse files
authored
rework stress tests (#14551)
1 parent 745a010 commit 1429695

File tree

18 files changed

+1020
-1005
lines changed

18 files changed

+1020
-1005
lines changed

ydb/tests/stress/olap_workload/__main__.py

Lines changed: 2 additions & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -1,242 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import argparse
3-
import ydb
4-
import time
5-
import random
6-
import threading
7-
from enum import Enum
8-
9-
from ydb.tests.stress.common.common import WorkloadBase, YdbClient
10-
11-
ydb.interceptor.monkey_patch_event_handler()
12-
13-
supported_pk_types = [
14-
# Bool https://github.com/ydb-platform/ydb/issues/13037
15-
"Int8",
16-
"Int16",
17-
"Int32",
18-
"Int64",
19-
"Uint8",
20-
"Uint16",
21-
"Uint32",
22-
"Uint64",
23-
"Decimal(22,9)",
24-
# "DyNumber", https://github.com/ydb-platform/ydb/issues/13048
25-
26-
"String",
27-
"Utf8",
28-
# Uuid", https://github.com/ydb-platform/ydb/issues/13047
29-
30-
"Date",
31-
"Datetime",
32-
"Datetime64",
33-
"Timestamp",
34-
# "Interval", https://github.com/ydb-platform/ydb/issues/13050
35-
]
36-
37-
supported_types = supported_pk_types + [
38-
"Float",
39-
"Double",
40-
"Json",
41-
"JsonDocument",
42-
"Yson"
43-
]
44-
45-
46-
class WorkloadTablesCreateDrop(WorkloadBase):
47-
class TableStatus(Enum):
48-
CREATING = "Creating",
49-
AVAILABLE = "Available",
50-
DELITING = "Deleting"
51-
52-
def __init__(self, client, prefix, stop, allow_nullables_in_pk):
53-
super().__init__(client, prefix, "create_drop", stop)
54-
self.allow_nullables_in_pk = allow_nullables_in_pk
55-
self.created = 0
56-
self.deleted = 0
57-
self.tables = {}
58-
self.lock = threading.Lock()
59-
60-
def get_stat(self):
61-
with self.lock:
62-
return f"Created: {self.created}, Deleted: {self.deleted}, Exists: {len(self.tables)}"
63-
64-
def _generate_new_table_n(self):
65-
while True:
66-
r = random.randint(1, 40000)
67-
with self.lock:
68-
if r not in self.tables:
69-
self.tables[r] = WorkloadTablesCreateDrop.TableStatus.CREATING
70-
return r
71-
72-
def _get_table_to_delete(self):
73-
with self.lock:
74-
for n, s in self.tables.items():
75-
if s == WorkloadTablesCreateDrop.TableStatus.AVAILABLE:
76-
self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELITING
77-
return n
78-
return None
79-
80-
def create_table(self, table):
81-
path = self.get_table_path(table)
82-
column_n = random.randint(1, 10000)
83-
primary_key_column_n = random.randint(1, column_n)
84-
partition_key_column_n = random.randint(1, primary_key_column_n)
85-
column_defs = []
86-
for i in range(column_n):
87-
if i < primary_key_column_n:
88-
c = random.choice(supported_pk_types)
89-
if not self.allow_nullables_in_pk or random.choice([False, True]):
90-
c += " NOT NULL"
91-
else:
92-
c = random.choice(supported_types)
93-
if random.choice([False, True]):
94-
c += " NOT NULL"
95-
column_defs.append(c)
96-
97-
stmt = f"""
98-
CREATE TABLE `{path}` (
99-
{", ".join(["c" + str(i) + " " + column_defs[i] for i in range(column_n)])},
100-
PRIMARY KEY({", ".join(["c" + str(i) for i in range(primary_key_column_n)])})
101-
)
102-
PARTITION BY HASH({", ".join(["c" + str(i) for i in range(partition_key_column_n)])})
103-
WITH (
104-
STORE = COLUMN
105-
)
106-
"""
107-
self.client.query(stmt, True)
108-
109-
def _create_tables_loop(self):
110-
while not self.is_stop_requested():
111-
n = self._generate_new_table_n()
112-
self.create_table(str(n))
113-
with self.lock:
114-
self.tables[n] = WorkloadTablesCreateDrop.TableStatus.AVAILABLE
115-
self.created += 1
116-
117-
def _delete_tables_loop(self):
118-
while not self.is_stop_requested():
119-
n = self._get_table_to_delete()
120-
if n is None:
121-
print("create_drop: No tables to delete")
122-
time.sleep(10)
123-
continue
124-
self.client.drop_table(self.get_table_path(str(n)))
125-
with self.lock:
126-
del self.tables[n]
127-
self.deleted += 1
128-
129-
def get_workload_thread_funcs(self):
130-
r = [self._create_tables_loop for x in range(0, 10)]
131-
r.append(self._delete_tables_loop)
132-
return r
133-
134-
135-
class WorkloadInsertDelete(WorkloadBase):
136-
def __init__(self, client, prefix, stop):
137-
super().__init__(client, prefix, "insert_delete", stop)
138-
self.inserted = 0
139-
self.current = 0
140-
self.table_name = "table"
141-
self.lock = threading.Lock()
142-
143-
def get_stat(self):
144-
with self.lock:
145-
return f"Inserted: {self.inserted}, Current: {self.current}"
146-
147-
def _loop(self):
148-
table_path = self.get_table_path(self.table_name)
149-
self.client.query(
150-
f"""
151-
CREATE TABLE `{table_path}` (
152-
id Int64 NOT NULL,
153-
i64Val Int64,
154-
PRIMARY KEY(id)
155-
)
156-
PARTITION BY HASH(id)
157-
WITH (
158-
STORE = COLUMN
159-
)
160-
""",
161-
True,
162-
)
163-
i = 1
164-
while not self.is_stop_requested():
165-
self.client.query(
166-
f"""
167-
INSERT INTO `{table_path}` (`id`, `i64Val`)
168-
VALUES
169-
({i * 2}, {i * 10}),
170-
({i * 2 + 1}, {i * 10 + 1})
171-
""",
172-
False,
173-
)
174-
175-
self.client.query(
176-
f"""
177-
DELETE FROM `{table_path}`
178-
WHERE i64Val % 2 == 1
179-
""",
180-
False,
181-
)
182-
183-
actual = self.client.query(
184-
f"""
185-
SELECT COUNT(*) as cnt, SUM(i64Val) as vals, SUM(id) as ids FROM `{table_path}`
186-
""",
187-
False,
188-
)[0].rows[0]
189-
expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)}
190-
if actual != expected:
191-
raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}")
192-
i += 1
193-
with self.lock:
194-
self.inserted += 2
195-
self.current = actual["cnt"]
196-
197-
def get_workload_thread_funcs(self):
198-
return [self._loop]
199-
200-
201-
class WorkloadRunner:
202-
def __init__(self, client, name, duration, allow_nullables_in_pk):
203-
self.client = client
204-
self.name = args.path
205-
self.tables_prefix = "/".join([self.client.database, self.name])
206-
self.duration = args.duration
207-
self.allow_nullables_in_pk = allow_nullables_in_pk
208-
209-
def __enter__(self):
210-
self._cleanup()
211-
return self
212-
213-
def __exit__(self, exc_type, exc_value, traceback):
214-
self._cleanup()
215-
216-
def _cleanup(self):
217-
print(f"Cleaning up {self.tables_prefix}...")
218-
deleted = client.remove_recursively(self.tables_prefix)
219-
print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted")
220-
221-
def run(self):
222-
stop = threading.Event()
223-
workloads = [
224-
WorkloadTablesCreateDrop(self.client, self.name, stop, self.allow_nullables_in_pk),
225-
WorkloadInsertDelete(self.client, self.name, stop),
226-
]
227-
for w in workloads:
228-
w.start()
229-
started_at = started_at = time.time()
230-
while time.time() - started_at < self.duration:
231-
print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:")
232-
for w in workloads:
233-
print(f"\t{w.name}: {w.get_stat()}")
234-
time.sleep(10)
235-
stop.set()
236-
print("Waiting for stop...")
237-
for w in workloads:
238-
w.join()
239-
print("Waiting for stop... stopped")
3+
from ydb.tests.stress.common.common import YdbClient
4+
from ydb.tests.stress.olap_workload.workload import WorkloadRunner
2405

2416

2427
if __name__ == "__main__":
Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# -*- coding: utf-8 -*-
2-
import yatest
3-
42
from ydb.tests.library.harness.kikimr_runner import KiKiMR
53
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
64
from ydb.tests.library.common.types import Erasure
5+
from ydb.tests.stress.common.common import YdbClient
6+
from ydb.tests.stress.olap_workload.workload import WorkloadRunner
77

88

99
class TestYdbWorkload(object):
@@ -22,14 +22,7 @@ def teardown_class(cls):
2222
cls.cluster.stop()
2323

2424
def test(self):
25-
workload_path = yatest.common.build_path("ydb/tests/stress/olap_workload/olap_workload")
26-
yatest.common.execute(
27-
[
28-
workload_path,
29-
"--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}",
30-
"--database=/Root",
31-
"--duration", "120",
32-
"--allow-nullables-in-pk", "1",
33-
],
34-
wait=True
35-
)
25+
client = YdbClient(f'grpc://localhost:{self.cluster.nodes[1].grpc_port}', '/Root', True)
26+
client.wait_connection()
27+
with WorkloadRunner(client, 'olap_workload', 120, True) as runner:
28+
runner.run()

ydb/tests/stress/olap_workload/tests/ya.make

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ SIZE(MEDIUM)
1313

1414
DEPENDS(
1515
ydb/apps/ydbd
16-
ydb/apps/ydb
17-
ydb/tests/stress/olap_workload
1816
)
1917

2018
PEERDIR(
2119
ydb/tests/library
20+
ydb/tests/stress/common
21+
ydb/tests/stress/olap_workload/workload
2222
)
2323

2424

0 commit comments

Comments
 (0)