Skip to content

Commit e5f6c12

Browse files
authored
Simple queue improvements (#12426)
1 parent 34cd26c commit e5f6c12

File tree

1 file changed

+79
-92
lines changed

1 file changed

+79
-92
lines changed

ydb/tools/simple_queue/__main__.py

Lines changed: 79 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,18 @@
99
import threading
1010
import collections
1111
import itertools
12-
import six
13-
from six.moves import queue
12+
import queue
1413
import ydb
1514
from library.python.monlib.metric_registry import MetricRegistry
1615
import socket
1716

18-
ydb.interceptor.monkey_patch_event_handler()
19-
2017
logger = logging.getLogger(__name__)
2118

2219
BLOB_MIN_SIZE = 128 * 1024
23-
WINDOW_SIZE = 5000
2420

2521

2622
def random_string(size):
27-
return ''.join([random.choice(string.ascii_lowercase) for _ in six.moves.range(size)])
23+
return ''.join([random.choice(string.ascii_lowercase) for _ in range(size)])
2824

2925

3026
def generate_blobs(count=32):
@@ -84,7 +80,7 @@ def get_table_description(table_name):
8480
return """
8581
CREATE TABLE `{table_name}` (
8682
key Uint64 NOT NULL,
87-
`timestamp` Timestamp NOT NULL,
83+
`timestamp` Timestamp, -- NOT NULL, -- not working for now
8884
value Utf8 FAMILY lz4_family NOT NULL,
8985
PRIMARY KEY (key),
9086
FAMILY lz4_family (
@@ -158,17 +154,27 @@ def save_event(self, ev_kind, details=None):
158154

159155
def print_stats(self):
160156
report = ["=" * 120]
161-
for event_kind, stats in six.iteritems(self.by_events_stats):
162-
for response_kind, responses_count in six.iteritems(stats):
157+
for event_kind, stats in self.by_events_stats.items():
158+
something_appended = False
159+
total_response_count = sum([responses_count.get() for responses_count in stats.values()])
160+
if total_response_count == 0:
161+
continue
162+
163+
for response_kind, responses_count in stats.items():
163164
value = responses_count.get()
164-
if value > 0:
165-
report.append(
166-
"EventKind: {event_kind}, {response_kind} responses count: {responses_count}".format(
167-
event_kind=event_kind,
168-
response_kind=response_kind,
169-
responses_count=value
170-
)
165+
is_success = response_kind == status_code_to_label()
166+
if value > 0 or is_success:
167+
something_appended = True
168+
line = "EventKind: {event_kind}, {response_kind} responses count: {responses_count}".format(
169+
event_kind=event_kind,
170+
response_kind=response_kind,
171+
responses_count=value,
171172
)
173+
if is_success:
174+
line += " ({:.2f}%)".format(100.0 * value / total_response_count)
175+
report.append(line)
176+
if something_appended:
177+
report.append("")
172178
report.append("=" * 120)
173179
print("\n".join(report))
174180

@@ -187,77 +193,31 @@ def __init__(self, idx, database, stats, driver, pool):
187193
self.outdated_period = 60 * 2
188194
self.database = database
189195
self.ops = ydb.BaseRequestSettings().with_operation_timeout(19).with_timeout(20)
190-
self.prepare_test()
191-
self.initialize_queries()
196+
self.driver.scheme_client.make_directory(self.working_dir)
197+
self.driver.scheme_client.make_directory(self.copies_dir)
198+
print("Working dir %s" % self.working_dir)
199+
f = self.prepare_new_queue(self.table_name)
200+
f.result()
192201
# a queue with tables to drop
193202
self.drop_queue = collections.deque()
194203
# a set with keys that are ready to be removed
195204
self.outdated_keys = collections.deque()
196-
# a number that stores largest outdated key. That helps avoiding duplicates in deque.
197-
self.largest_outdated_key = 0
198205
self.outdated_keys_max_size = 50
199206

200207
def table_name_with_timestamp(self, working_dir=None):
201208
if working_dir is not None:
202209
return os.path.join(working_dir, "queue_" + str(timestamp()))
203210
return os.path.join(self.working_dir, "queue_" + str(timestamp()))
204211

205-
def prepare_test(self):
206-
self.driver.scheme_client.make_directory(self.working_dir)
207-
self.driver.scheme_client.make_directory(self.copies_dir)
208-
print("Working dir %s" % self.working_dir)
209-
f = self.prepare_new_queue(self.table_name)
210-
f.result()
211-
212212
def prepare_new_queue(self, table_name=None):
213213
session = self.pool.acquire()
214214
table_name = self.table_name_with_timestamp() if table_name is None else table_name
215215
f = session.async_execute_scheme(get_table_description(table_name), settings=self.ops)
216216
f.add_done_callback(lambda x: self.on_received_response(session, x, 'create'))
217217
return f
218218

219-
def initialize_queries(self):
220-
self.queries = {
221-
# use reverse iteration here
222-
EventKind.WRITE: ydb.DataQuery(
223-
"""
224-
--!syntax_v1
225-
DECLARE $key as Uint64;
226-
DECLARE $value as Utf8;
227-
DECLARE $timestamp as Uint64;
228-
UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, CAST($timestamp as Timestamp), $value);
229-
""".format(self.table_name), {
230-
'$key': ydb.PrimitiveType.Uint64.proto,
231-
'$value': ydb.PrimitiveType.Utf8.proto,
232-
'$timestamp': ydb.PrimitiveType.Uint64.proto,
233-
}
234-
),
235-
EventKind.FIND_OUTDATED: ydb.DataQuery(
236-
"""
237-
--!syntax_v1
238-
DECLARE $key as Uint64;
239-
SELECT `key` FROM `{}`
240-
WHERE `key` <= $key
241-
ORDER BY `key`
242-
LIMIT 50;
243-
""".format(self.table_name), {
244-
'$key': ydb.PrimitiveType.Uint64.proto
245-
}
246-
),
247-
EventKind.REMOVE_OUTDATED: ydb.DataQuery(
248-
"""
249-
--!syntax_v1
250-
DECLARE $keys as List<Struct<key: Uint64>>;
251-
DELETE FROM `{}` ON SELECT `key` FROM AS_TABLE($keys);
252-
""".format(self.table_name), {
253-
'$keys': ydb.ListType(ydb.StructType().add_member('key', ydb.PrimitiveType.Uint64)).proto
254-
}
255-
)
256-
}
257-
258219
def switch(self, switch_to):
259220
self.table_name = switch_to
260-
self.initialize_queries()
261221
self.outdated_keys.clear()
262222

263223
def on_received_response(self, session, response, event, callback=None):
@@ -272,13 +232,13 @@ def on_received_response(self, session, response, event, callback=None):
272232
except ydb.Error as e:
273233
self.stats.save_event(event, e.status)
274234

275-
def send_query(self, query, params, callback=None):
235+
def send_query(self, query, parameters, event_kind, callback=None):
276236
session = self.pool.acquire()
277237
f = session.transaction().async_execute(
278-
self.queries[query], parameters=params, commit_tx=True, settings=self.ops)
238+
query, parameters=parameters, commit_tx=True, settings=self.ops)
279239
f.add_done_callback(
280240
lambda response: self.on_received_response(
281-
session, response, query, callback
241+
session, response, event_kind, callback
282242
)
283243
)
284244
return f
@@ -328,13 +288,20 @@ def remove_outdated(self):
328288
except IndexError:
329289
return
330290

331-
return
332-
return self.send_query(
333-
EventKind.REMOVE_OUTDATED,
334-
params={
335-
'$keys': keys_set
291+
query = ydb.DataQuery(
292+
"""
293+
--!syntax_v1
294+
DECLARE $keys as List<Struct<key: Uint64>>;
295+
DELETE FROM `{}` ON SELECT `key` FROM AS_TABLE($keys);
296+
""".format(self.table_name), {
297+
'$keys': ydb.ListType(ydb.StructType().add_member('key', ydb.PrimitiveType.Uint64)).proto
336298
}
337299
)
300+
parameters = {
301+
'$keys': keys_set
302+
}
303+
304+
return self.send_query(query=query, event_kind=EventKind.REMOVE_OUTDATED, parameters=parameters)
338305

339306
def on_find_outdated(self, resp):
340307
try:
@@ -349,24 +316,40 @@ def find_outdated(self):
349316
return
350317

351318
outdated_timestamp = timestamp() - self.outdated_period
352-
return self.send_query(
353-
EventKind.FIND_OUTDATED,
354-
callback=self.on_find_outdated,
355-
params={
356-
'$key': outdated_timestamp
357-
}
358-
)
319+
query = """
320+
--!syntax_v1
321+
SELECT `key` FROM `{table_name}`
322+
WHERE `key` <= {outdated_timestamp}
323+
ORDER BY `key`
324+
LIMIT 50;
325+
""".format(table_name=self.table_name, outdated_timestamp=outdated_timestamp)
326+
parameters = None
327+
return self.send_query(query=query, event_kind=EventKind.FIND_OUTDATED, parameters=parameters, callback=self.on_find_outdated)
359328

360329
def write(self):
361330
current_timestamp = timestamp()
362331
blob = next(self.blobs_iter)
363-
return self.send_query(
364-
EventKind.WRITE, {
365-
'$key': current_timestamp,
366-
'$value': blob,
367-
'$timestamp': current_timestamp,
332+
query = ydb.DataQuery(
333+
"""
334+
--!syntax_v1
335+
DECLARE $key as Uint64;
336+
DECLARE $value as Utf8;
337+
DECLARE $timestamp as Uint64;
338+
UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, CAST($timestamp as Timestamp), $value);
339+
""".format(self.table_name),
340+
{
341+
'$key': ydb.PrimitiveType.Uint64.proto,
342+
'$value': ydb.PrimitiveType.Utf8.proto,
343+
'$timestamp': ydb.PrimitiveType.Uint64.proto,
368344
}
369345
)
346+
parameters = {
347+
'$key': current_timestamp,
348+
'$value': blob,
349+
'$timestamp': current_timestamp,
350+
}
351+
352+
return self.send_query(query=query, event_kind=EventKind.WRITE, parameters=parameters)
370353

371354
def move_iterator(self, it, callback):
372355
next_f = next(it)
@@ -406,9 +389,12 @@ def start_read_table(self):
406389

407390
def alter_table(self):
408391
session = self.pool.acquire()
409-
add_column = ydb.Column('column_%d' % random.randint(1, 100000), ydb.OptionalType(ydb.PrimitiveType.Utf8))
410-
f = session.async_alter_table(
411-
self.table_name, add_columns=(add_column, ), drop_columns=(), settings=self.ops)
392+
query = "ALTER TABLE `{table_name}` ADD COLUMN column_{val} Utf8".format(
393+
table_name=self.table_name,
394+
val=random.randint(1, 100000),
395+
)
396+
397+
f = session.async_execute_scheme(query, settings=self.ops)
412398
f.add_done_callback(
413399
lambda response: self.on_received_response(
414400
session, response, EventKind.ALTER_TABLE,
@@ -469,6 +455,7 @@ def loop(self):
469455
for ydb_queue in self.ydb_queues:
470456
ydb_queue.list_working_dir()
471457
ydb_queue.list_copies_dir()
458+
print("Table name: %s" % ydb_queue.table_name)
472459

473460
round_id = next(round_id_it)
474461
if round_id % 10 == 0:
@@ -487,8 +474,8 @@ def loop(self):
487474
schedule = collections.deque(list(sorted(schedule)))
488475

489476
print("Starting round_id %d" % round_id)
490-
print("Round schedule %s", schedule)
491-
for step_id in six.moves.range(self.round_size):
477+
print("Round schedule %s" % schedule)
478+
for step_id in range(self.round_size):
492479

493480
if time.time() - started_at > self.duration:
494481
break

0 commit comments

Comments
 (0)