Skip to content

Commit baf6413

Browse files
authored
Add scan/insert while alter test (#8094)
1 parent a334994 commit baf6413

File tree

1 file changed

+146
-31
lines changed

1 file changed

+146
-31
lines changed

ydb/tools/olap_workload/__main__.py

Lines changed: 146 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import ydb
44
import time
55
import os
6+
import random
7+
import string
68

79
ydb.interceptor.monkey_patch_event_handler()
810

@@ -15,12 +17,31 @@ def table_name_with_timestamp():
1517
return os.path.join("column_table_" + str(timestamp()))
1618

1719

20+
def random_string(length):
21+
letters = string.ascii_lowercase
22+
return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8')
23+
24+
25+
def random_type():
26+
return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])
27+
28+
29+
def random_value(type):
30+
if isinstance(type, ydb.OptionalType):
31+
return random_value(type.item)
32+
if type == ydb.PrimitiveType.Int64:
33+
return random.randint(0, 1 << 31)
34+
if type == ydb.PrimitiveType.String:
35+
return random_string(random.randint(1, 32))
36+
37+
1838
class Workload(object):
19-
def __init__(self, endpoint, database, duration):
39+
def __init__(self, endpoint, database, duration, batch_size):
2040
self.database = database
2141
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
2242
self.pool = ydb.SessionPool(self.driver, size=200)
2343
self.duration = duration
44+
self.batch_size = batch_size
2445

2546
def __enter__(self):
2647
return self
@@ -29,47 +50,140 @@ def __exit__(self, exc_type, exc_val, exc_tb):
2950
self.pool.stop()
3051
self.driver.stop()
3152

53+
def run_query_ignore_errors(self, callee):
54+
try:
55+
self.pool.retry_operation_sync(callee)
56+
except Exception as e:
57+
print(type(e), e)
58+
3259
def create_table(self, table_name):
33-
with self.pool.checkout() as s:
34-
try:
35-
s.execute_scheme(
36-
"""
37-
CREATE TABLE %s (
38-
id Int64 NOT NULL,
39-
i64Val Int64,
40-
PRIMARY KEY(id)
41-
)
42-
PARTITION BY HASH(id)
43-
WITH (
44-
STORE = COLUMN
45-
)
46-
"""
47-
% table_name
60+
print(f"Create table {table_name}")
61+
62+
def callee(session):
63+
session.execute_scheme(
64+
f"""
65+
CREATE TABLE {table_name} (
66+
id Int64 NOT NULL,
67+
i64Val Int64,
68+
PRIMARY KEY(id)
69+
)
70+
PARTITION BY HASH(id)
71+
WITH (
72+
STORE = COLUMN
4873
)
74+
"""
75+
)
4976

50-
print("Table %s created" % table_name)
51-
except ydb.SchemeError as e:
52-
print(e)
77+
self.run_query_ignore_errors(callee)
5378

5479
def drop_table(self, table_name):
55-
with self.pool.checkout() as s:
56-
try:
57-
s.drop_table(self.database + "/" + table_name)
80+
print(f"Drop table {table_name}")
81+
82+
def callee(session):
83+
session.drop_table(self.database + "/" + table_name)
84+
85+
self.run_query_ignore_errors(callee)
86+
87+
def add_column(self, table_name, col_name, col_type):
88+
print(f"Add column {table_name}.{col_name} {str(col_type)}")
89+
90+
def callee(session):
91+
session.execute_scheme(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {str(col_type)}")
92+
93+
self.run_query_ignore_errors(callee)
94+
95+
def drop_column(self, table_name, col_name):
96+
print(f"Drop column {table_name}.{col_name}")
97+
98+
def callee(session):
99+
session.execute_scheme(f"ALTER TABLE {table_name} DROP COLUMN {col_name}")
100+
101+
self.run_query_ignore_errors(callee)
102+
103+
def generate_batch(self, schema):
104+
data = []
105+
106+
for i in range(self.batch_size):
107+
data.append({c.name: random_value(c.type) for c in schema})
108+
109+
return data
110+
111+
def add_batch(self, table_name, schema):
112+
print(f"Add batch {table_name}")
58113

59-
print("Table %s dropped" % table_name)
60-
except ydb.SchemeError as e:
61-
print(e)
114+
column_types = ydb.BulkUpsertColumns()
115+
116+
for c in schema:
117+
column_types.add_column(c.name, c.type)
118+
119+
batch = self.generate_batch(schema)
120+
121+
self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types)
122+
123+
def list_tables(self):
124+
db = self.driver.scheme_client.list_directory(self.database)
125+
return [t.name for t in db.children if t.type == ydb.SchemeEntryType.COLUMN_TABLE]
126+
127+
def list_columns(self, table_name):
128+
path = self.database + "/" + table_name
129+
130+
def callee(session):
131+
return session.describe_table(path).columns
132+
133+
return self.pool.retry_operation_sync(callee)
134+
135+
def rows_count(self, table_name):
136+
return self.driver.table_client.scan_query(f"SELECT count(*) FROM {table_name}").next().result_set.rows[0][0]
137+
138+
def select_n(self, table_name, limit):
139+
print(f"Select {limit} from {table_name}")
140+
self.driver.table_client.scan_query(f"SELECT * FROM {table_name} limit {limit}").next()
141+
142+
def drop_all_tables(self):
143+
for t in self.list_tables():
144+
if t.startswith("column_table_"):
145+
self.drop_table(t)
146+
147+
def drop_all_columns(self, table_name):
148+
for c in self.list_columns(table_name):
149+
if c.name != "id":
150+
self.drop_column(table_name, c.name)
151+
152+
def queries_while_alter(self):
153+
table_name = "queries_while_alter"
154+
155+
schema = self.list_columns(table_name)
156+
157+
self.select_n(table_name, 1000)
158+
self.add_batch(table_name, schema)
159+
self.select_n(table_name, 100)
160+
self.add_batch(table_name, schema)
161+
self.select_n(table_name, 300)
162+
163+
if len(schema) > 50:
164+
self.drop_all_columns(table_name)
165+
166+
if self.rows_count(table_name) > 100000:
167+
self.drop_table(table_name)
168+
169+
col = "col_" + str(timestamp())
170+
self.add_column(table_name, col, random_type())
62171

63172
def run(self):
64173
started_at = time.time()
65174

66175
while time.time() - started_at < self.duration:
67-
table_name = table_name_with_timestamp()
68-
self.create_table(table_name)
176+
try:
177+
self.create_table("queries_while_alter")
69178

70-
time.sleep(5)
179+
self.drop_all_tables()
71180

72-
self.drop_table(table_name)
181+
self.queries_while_alter()
182+
183+
table_name = table_name_with_timestamp()
184+
self.create_table(table_name)
185+
except Exception as e:
186+
print(type(e), e)
73187

74188

75189
if __name__ == '__main__':
@@ -78,7 +192,8 @@ def run(self):
78192
)
79193
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
80194
parser.add_argument('--database', default=None, required=True, help='A database to connect')
81-
parser.add_argument('--duration', default=10**9, type=lambda x: int(x), help='A duration of workload in seconds.')
195+
parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds.')
196+
parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
82197
args = parser.parse_args()
83-
with Workload(args.endpoint, args.database, args.duration) as workload:
198+
with Workload(args.endpoint, args.database, args.duration, args.batch_size) as workload:
84199
workload.run()

0 commit comments

Comments
 (0)