Skip to content

Commit 03548c9

Browse files
authored
Add column mode in simple_queue (#12480)
1 parent 15933e4 commit 03548c9

File tree

1 file changed

+31
-11
lines changed

1 file changed

+31
-11
lines changed

ydb/tools/simple_queue/__main__.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,20 @@ def list(cls):
7676
)
7777

7878

79-
def get_table_description(table_name):
80-
return """
79+
def get_table_description(table_name, mode):
80+
if mode == "row":
81+
store_entry = "STORE = ROW,"
82+
ttl_entry = """TTL = Interval("PT240S") ON `timestamp` AS SECONDS,"""
83+
elif mode == "column":
84+
store_entry = "STORE = COLUMN,"
85+
ttl_entry = ""
86+
else:
87+
raise RuntimeError("Unkown mode: {}".format(mode))
88+
89+
return f"""
8190
CREATE TABLE `{table_name}` (
8291
key Uint64 NOT NULL,
83-
`timestamp` Timestamp, -- NOT NULL, -- not working for now
92+
`timestamp` Uint64 NOT NULL,
8493
value Utf8 FAMILY lz4_family NOT NULL,
8594
PRIMARY KEY (key),
8695
FAMILY lz4_family (
@@ -89,7 +98,8 @@ def get_table_description(table_name):
8998
INDEX by_timestamp GLOBAL ON (`timestamp`)
9099
)
91100
WITH (
92-
TTL = Interval("PT240S") ON `timestamp`,
101+
{store_entry}
102+
{ttl_entry}
93103
AUTO_PARTITIONING_BY_SIZE = ENABLED,
94104
AUTO_PARTITIONING_BY_LOAD = ENABLED,
95105
AUTO_PARTITIONING_PARTITION_SIZE_MB = 128,
@@ -100,7 +110,7 @@ def get_table_description(table_name):
100110

101111

102112
def timestamp():
103-
return int(1000 * time.time())
113+
return int(time.time())
104114

105115

106116
def extract_keys(response):
@@ -180,7 +190,7 @@ def print_stats(self):
180190

181191

182192
class YdbQueue(object):
183-
def __init__(self, idx, database, stats, driver, pool):
193+
def __init__(self, idx, database, stats, driver, pool, mode):
184194
self.working_dir = os.path.join(database, socket.gethostname().split('.')[0].replace('-', '_') + "_" + str(idx))
185195
self.copies_dir = os.path.join(self.working_dir, 'copies')
186196
self.table_name = self.table_name_with_timestamp()
@@ -195,6 +205,7 @@ def __init__(self, idx, database, stats, driver, pool):
195205
self.ops = ydb.BaseRequestSettings().with_operation_timeout(19).with_timeout(20)
196206
self.driver.scheme_client.make_directory(self.working_dir)
197207
self.driver.scheme_client.make_directory(self.copies_dir)
208+
self.mode = mode
198209
print("Working dir %s" % self.working_dir)
199210
f = self.prepare_new_queue(self.table_name)
200211
f.result()
@@ -212,7 +223,7 @@ def table_name_with_timestamp(self, working_dir=None):
212223
def prepare_new_queue(self, table_name=None):
213224
session = self.pool.acquire()
214225
table_name = self.table_name_with_timestamp() if table_name is None else table_name
215-
f = session.async_execute_scheme(get_table_description(table_name), settings=self.ops)
226+
f = session.async_execute_scheme(get_table_description(table_name, self.mode), settings=self.ops)
216227
f.add_done_callback(lambda x: self.on_received_response(session, x, 'create'))
217228
return f
218229

@@ -230,6 +241,12 @@ def on_received_response(self, session, response, event, callback=None):
230241
response.result()
231242
self.stats.save_event(event)
232243
except ydb.Error as e:
244+
debug = False
245+
if debug:
246+
print(event)
247+
print(e)
248+
print()
249+
233250
self.stats.save_event(event, e.status)
234251

235252
def send_query(self, query, parameters, event_kind, callback=None):
@@ -335,7 +352,7 @@ def write(self):
335352
DECLARE $key as Uint64;
336353
DECLARE $value as Utf8;
337354
DECLARE $timestamp as Uint64;
338-
UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, CAST($timestamp as Timestamp), $value);
355+
UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, $timestamp, $value);
339356
""".format(self.table_name),
340357
{
341358
'$key': ydb.PrimitiveType.Uint64.proto,
@@ -429,16 +446,18 @@ def copy_table(self):
429446

430447

431448
class Workload(object):
432-
def __init__(self, endpoint, database, duration):
449+
def __init__(self, endpoint, database, duration, mode):
433450
self.database = database
434451
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
435452
self.pool = ydb.SessionPool(self.driver, size=200)
436453
self.round_size = 1000
437454
self.duration = duration
438455
self.delayed_events = queue.Queue()
439456
self.workload_stats = WorkloadStats(*EventKind.list())
457+
# TODO: run both modes in parallel?
458+
self.mode = mode
440459
self.ydb_queues = [
441-
YdbQueue(idx, database, self.workload_stats, self.driver, self.pool)
460+
YdbQueue(idx, database, self.workload_stats, self.driver, self.pool, self.mode)
442461
for idx in range(2)
443462
]
444463

@@ -511,7 +530,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
511530
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
512531
parser.add_argument('--database', default=None, required=True, help='A database to connect')
513532
parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.')
533+
parser.add_argument('--mode', default="row", choices=["row", "column"], help='STORE mode for CREATE TABLE')
514534
args = parser.parse_args()
515-
with Workload(args.endpoint, args.database, args.duration) as workload:
535+
with Workload(args.endpoint, args.database, args.duration, args.mode) as workload:
516536
for handle in workload.loop():
517537
handle()

0 commit comments

Comments
 (0)