Skip to content

Commit 54d0643

Browse files
committed
Added stress tests for the transfer (#18812)
1 parent 56201a6 commit 54d0643

File tree

7 files changed

+225
-0
lines changed

7 files changed

+225
-0
lines changed

ydb/tests/stress/transfer/__main__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
from ydb.tests.stress.transfer.workload import Workload
4+
5+
if __name__ == '__main__':
6+
text = """\033[92mTransfer workload\x1b[0m"""
7+
parser = argparse.ArgumentParser(description=text, formatter_class=argparse.RawDescriptionHelpFormatter)
8+
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
9+
parser.add_argument('--database', default=None, required=True, help='A database to connect')
10+
parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.')
11+
parser.add_argument('--mode', default="row", choices=["row", "column"], help='STORE mode for CREATE TABLE')
12+
args = parser.parse_args()
13+
with Workload(args.endpoint, args.database, args.duration, args.mode) as workload:
14+
workload.loop()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# -*- coding: utf-8 -*-
2+
import os
3+
import pytest
4+
import yatest
5+
6+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
7+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
8+
from ydb.tests.library.common.types import Erasure
9+
10+
11+
class TestYdbTransferWorkload(object):
12+
@classmethod
13+
def setup_class(cls):
14+
cls.cluster = KiKiMR(KikimrConfigGenerator(
15+
erasure=Erasure.MIRROR_3_DC,
16+
extra_feature_flags={
17+
"enable_topic_transfer": True,
18+
}
19+
))
20+
cls.cluster.start()
21+
22+
@classmethod
23+
def teardown_class(cls):
24+
cls.cluster.stop()
25+
26+
@pytest.mark.parametrize("store_type", ["row", "column"])
27+
def test(self, store_type):
28+
cmd = [
29+
yatest.common.binary_path(os.getenv("YDB_TEST_PATH")),
30+
"--endpoint", f'grpc://localhost:{self.cluster.nodes[1].grpc_port}',
31+
"--database", "/Root",
32+
"--duration", "60",
33+
"--mode", store_type
34+
]
35+
yatest.common.execute(cmd, wait=True)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
IF (NOT WITH_VALGRIND)
2+
PY3TEST()
3+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
4+
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
5+
ENV(YDB_ERASURE=mirror_3_dc)
6+
ENV(YDB_USE_IN_MEMORY_PDISKS=true)
7+
ENV(YDB_TEST_PATH="ydb/tests/stress/transfer/transfer")
8+
9+
TEST_SRCS(
10+
test_workload.py
11+
)
12+
13+
IF (SANITIZER_TYPE)
14+
REQUIREMENTS(ram:32)
15+
ENDIF()
16+
17+
SIZE(MEDIUM)
18+
19+
DEPENDS(
20+
ydb/apps/ydbd
21+
ydb/apps/ydb
22+
ydb/tests/stress/transfer
23+
)
24+
25+
PEERDIR(
26+
ydb/tests/library
27+
ydb/tests/stress/transfer/workload
28+
)
29+
30+
31+
END()
32+
33+
ENDIF()
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# -*- coding: utf-8 -*-
2+
import ydb
3+
4+
import time
5+
import unittest
6+
import uuid
7+
8+
9+
class Workload(unittest.TestCase):
10+
def __init__(self, endpoint, database, duration, mode):
11+
self.database = database
12+
self.endpoint = endpoint
13+
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
14+
self.pool = ydb.QuerySessionPool(self.driver)
15+
self.duration = duration
16+
self.mode = mode
17+
self.id = f"{uuid.uuid1()}".replace("-", "_")
18+
self.table_name = f"transfer_target_table_{mode}_{self.id}"
19+
self.topic_name = f"transfer_source_topic_{mode}_{self.id}"
20+
self.transfer_name = f"transfer_{mode}_{self.id}"
21+
22+
def create_table(self):
23+
self.pool.execute_with_retries(
24+
f"""
25+
CREATE TABLE {self.table_name} (
26+
partition Uint32 NOT NULL,
27+
offset Uint64 NOT NULL,
28+
message Utf8,
29+
PRIMARY KEY (partition, offset)
30+
) WITH (
31+
STORE = {self.mode}
32+
);
33+
"""
34+
)
35+
36+
def create_topic(self):
37+
self.pool.execute_with_retries(
38+
f"CREATE TOPIC {self.topic_name};"
39+
)
40+
41+
def create_transfer(self):
42+
lmb = '''
43+
$l = ($x) -> {
44+
return [
45+
<|
46+
partition:CAST($x._partition AS Uint32),
47+
offset:CAST($x._offset AS Uint64),
48+
message:CAST($x._data AS Utf8)
49+
|>
50+
];
51+
};
52+
'''
53+
54+
self.pool.execute_with_retries(
55+
f"""
56+
{lmb}
57+
58+
CREATE TRANSFER {self.transfer_name}
59+
FROM {self.topic_name} TO {self.table_name} USING $l
60+
WITH (
61+
CONNECTION_STRING = '{self.endpoint}/?database={self.database}',
62+
FLUSH_INTERVAL = Interval('PT1S'),
63+
BATCH_SIZE_BYTES = 8388608
64+
);
65+
"""
66+
)
67+
68+
def write_to_topic(self):
69+
finished_at = time.time() + self.duration
70+
self.message_count = 0
71+
72+
with self.driver.topic_client.writer(self.topic_name, producer_id="producer-id") as writer:
73+
while time.time() < finished_at:
74+
writer.write(ydb.TopicWriterMessage(f"message-{time.time()}"))
75+
self.message_count += 1
76+
77+
def wait_transfer_finished(self):
78+
iterations = 30
79+
80+
last_offset = -1
81+
82+
for i in range(iterations):
83+
time.sleep(1)
84+
85+
rss = self.pool.execute_with_retries(
86+
f"""
87+
SELECT MAX(offset) AS last_offset
88+
FROM {self.table_name};
89+
"""
90+
)
91+
rs = rss[0]
92+
last_offset = rs.rows[0].last_offset
93+
94+
if last_offset + 1 == self.message_count:
95+
return
96+
97+
raise Exception(f"Transfer still work after {iterations} seconds. Last offset is {last_offset}")
98+
99+
def loop(self):
100+
self.create_table()
101+
self.create_topic()
102+
self.create_transfer()
103+
104+
self.write_to_topic()
105+
106+
self.wait_transfer_finished()
107+
108+
def __enter__(self):
109+
return self
110+
111+
def __exit__(self, exc_type, exc_val, exc_tb):
112+
self.pool.stop()
113+
self.driver.stop()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
PY3_LIBRARY()
2+
3+
PY_SRCS(
4+
__init__.py
5+
)
6+
7+
PEERDIR(
8+
library/python/monlib
9+
ydb/public/sdk/python
10+
ydb/public/sdk/python/enable_v3_new_behavior
11+
)
12+
13+
END()

ydb/tests/stress/transfer/ya.make

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
PY3_PROGRAM(transfer)
2+
3+
PY_SRCS(
4+
__main__.py
5+
)
6+
7+
PEERDIR(
8+
ydb/tests/stress/transfer/workload
9+
)
10+
11+
END()
12+
13+
RECURSE_FOR_TESTS(
14+
tests
15+
)
16+

ydb/tests/stress/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ RECURSE(
55
oltp_workload
66
simple_queue
77
statistics_workload
8+
transfer
89
)

0 commit comments

Comments
 (0)