Skip to content

Commit a3a2bc8

Browse files
committed
a
2 parents 51b1fa6 + 4b0fe7f commit a3a2bc8

File tree

3 files changed

+66
-36
lines changed

3 files changed

+66
-36
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
152152
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
153153
, FreeSpace(freeSpace)
154154
, TopicClient(Driver, GetTopicClientSettings())
155-
, File(std::get<TString>(TxId), EOpenModeFlag::CreateAlways | EOpenModeFlag::WrOnly)
156-
{
155+
, File("data/result/" + std::get<TString>(TxId), EOpenModeFlag::CreateAlways | EOpenModeFlag::WrOnly)
156+
{
157157
EgressStats.Level = statsLevel;
158158
}
159159

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@
9999
#include <library/cpp/digest/md5/md5.h>
100100
#include <ydb/library/actors/http/http_proxy.h>
101101

102-
#include <util/folder/iterator.h>
103102
#include <util/generic/string.h>
104103
#include <util/generic/hash.h>
105104
#include <util/generic/scope.h>
@@ -1133,11 +1132,11 @@ int RunMain(int argc, const char* argv[])
11331132
);
11341133

11351134
TVector<TProgramPtr> programs;
1136-
for (const auto& entry : TDirIterator(progFiles)) {
1137-
if (entry.fts_type != FTS_F) {
1138-
continue;
1135+
for (int i = 0;; ++i) {
1136+
auto progFile = TString("data/query/" + std::to_string(i) + ".txt");
1137+
if (!NFs::Exists(progFile)) {
1138+
break;
11391139
}
1140-
const auto& progFile = entry.fts_path;
11411140

11421141
TProgramPtr program;
11431142
if (res.Has("replay") && res.Has("capture")) {

ydb/library/yql/tools/dqrun/gen.py

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,31 @@
77
import random
88
from typing import List
99

10-
1110
OUTPUT_DIR = "/home/kardymon-d/ydb3/ydb/ydb/library/yql/tools/dqrun/data"
12-
QUERIES_NUM = 2
11+
QUERIES_NUM = 200
1312
TABLE_NAME = "pq.`match`"
14-
ROW_NUM = 1000
15-
UINT64_COLUMN_NUM = 3#750
16-
STR32_COLUMN_NUM = 0#375
17-
STR64_COLUMN_NUM = 0#375
13+
ROW_NUM = 550000
14+
UINT64_COLUMN_NUM = 20
15+
STR32_COLUMN_NUM = 10
16+
STR64_COLUMN_NUM = 10
1817
COLUMN_NUM = UINT64_COLUMN_NUM + STR32_COLUMN_NUM + STR64_COLUMN_NUM
19-
FILLED_COLUMN_NUM = 2
20-
MAX_VALUE = 2
18+
FILLED_COLUMN_NUM = 40
2119
Row = List
2220

2321

22+
23+
# OUTPUT_DIR = "/home/kardymon-d/ydb3/ydb/ydb/library/yql/tools/dqrun/data"
24+
# QUERIES_NUM = 1
25+
# TABLE_NAME = "pq.`match`"
26+
# ROW_NUM = 600000
27+
# UINT64_COLUMN_NUM = 3
28+
# STR32_COLUMN_NUM = 1
29+
# STR64_COLUMN_NUM = 1
30+
# COLUMN_NUM = UINT64_COLUMN_NUM + STR32_COLUMN_NUM + STR64_COLUMN_NUM
31+
# FILLED_COLUMN_NUM = 5
32+
# Row = List
33+
34+
2435
def get_timer() -> int:
2536
get_timer.timer += 1
2637
return get_timer.timer
@@ -36,8 +47,8 @@ class Table:
3647

3748

3849
def validate() -> None:
39-
assert(0 < FILLED_COLUMN_NUM < COLUMN_NUM)
40-
assert(0 < MAX_VALUE < 2 ** 64)
50+
assert(QUERIES_NUM < ROW_NUM)
51+
assert(0 < FILLED_COLUMN_NUM <= COLUMN_NUM)
4152

4253

4354
def gen_column_names(column_num: int) -> List[str]:
@@ -64,31 +75,29 @@ def type_to_sql(column_type: str) -> str:
6475
def int_to_str(n: int, length: int) -> str:
6576
result = ""
6677
while n > 0:
67-
div, mod = divmod(n, 10)
78+
div, mod = divmod(n, 26)
6879
result += chr(ord('a') + mod)
6980
n = div
70-
result += '_' * (length - len(result))
81+
result += 'a' * (length - len(result))
7182
return result
7283

7384

74-
def gen_value(column_type: str):
75-
value = random.randint(0, MAX_VALUE - 1)
85+
def gen_cell(row_index: int, column_type: str):
86+
value = 2 * row_index
7687
if column_type == "uint64":
7788
return value
7889
elif column_type == "str32":
7990
return int_to_str(value, 32)
80-
# return "".join(random.choices(string.ascii_lowercase, k=32))
8191
elif column_type == "str64":
82-
return int_to_str(value, 64)
83-
# return "".join(random.choices(string.ascii_lowercase, k=64))
92+
return int_to_str(value, 100)
8493
else:
8594
raise RuntimeError()
8695

8796

88-
def gen_row(column_types: List[str]) -> Row:
97+
def gen_row(row_index: int, column_types: List[str]) -> Row:
8998
are_filled = [i < FILLED_COLUMN_NUM for i in range(len(column_types))]
9099
random.shuffle(are_filled)
91-
return [gen_value(column_type) if is_filled else None for is_filled, column_type in zip(are_filled, column_types)]
100+
return [gen_cell(row_index, column_type) if is_filled else None for is_filled, column_type in zip(are_filled, column_types)]
92101

93102

94103
def gen_table(name: str, column_names: List[str], column_types: List[str], rows: List[Row]) -> Table:
@@ -108,18 +117,41 @@ def write_table(filename: str, table: Table) -> None:
108117
file.write('\n')
109118

110119

111-
def gen_query(table: Table, column_index: int, value) -> str:
112-
return f"""SELECT *
120+
def gen_value(query_index: int, table: Table) -> str:
121+
index = query_index % len(table.column_types)
122+
column_type = table.column_types[index]
123+
124+
l = len(table.rows)
125+
print(f"ffff {l}")
126+
value = table.rows[l - 2][index]
127+
print(f" value {value} type {column_type}")
128+
#return value
129+
130+
#value = query_index % (2 * ROW_NUM)
131+
if column_type == "uint64":
132+
return f"{value}UL"
133+
elif column_type == "str32":
134+
return f'"{value}"'
135+
elif column_type == "str64":
136+
return f'"{value}"'
137+
else:
138+
raise RuntimeError()
139+
140+
141+
def gen_query(query_index: int, table: Table, value: str) -> str:
142+
return f"""$match = SELECT *
113143
FROM {table.name}
114144
WITH (
115145
FORMAT=json_each_row,
116146
SCHEMA
117147
(
118-
{",".join([name + " " + type_to_sql(type) + "?" for name, type in zip(table.column_names, table.column_types)])}
148+
{",".join([name + " " + type_to_sql(type) for name, type in zip(table.column_names, table.column_types)])}
119149
)
120150
)
121-
WHERE {table.column_names[column_index]} IS NOT NULL AND {table.column_names[column_index]} == {value}
122-
LIMIT 20;
151+
WHERE {table.column_names[query_index % len(table.column_names)]} == {value};
152+
153+
INSERT INTO pq.`match`
154+
SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) FROM $match;
123155
"""
124156

125157

@@ -133,15 +165,14 @@ def main():
133165

134166
column_names = gen_column_names(COLUMN_NUM)
135167
column_types = gen_column_types(COLUMN_NUM)
136-
rows = [gen_row(column_types) for _ in range(ROW_NUM)]
168+
rows = [gen_row(row_index, column_types) for row_index in range(ROW_NUM)]
137169
table = gen_table(TABLE_NAME, column_names, column_types, rows)
138170
write_table(f"{OUTPUT_DIR}/data.txt", table)
139171

140172
pathlib.Path(f"{OUTPUT_DIR}/query/").mkdir(parents=True, exist_ok=True)
141-
for i in range(QUERIES_NUM):
142-
column_index = i % COLUMN_NUM
143-
value = gen_value(column_types[column_index])
144-
write_query(f"{OUTPUT_DIR}/query/{i}.txt", gen_query(table, column_index, value))
173+
for query_index in range(QUERIES_NUM):
174+
value = gen_value(query_index, table)
175+
write_query(f"{OUTPUT_DIR}/query/{query_index}.txt", gen_query(query_index, table, value))
145176

146177

147178
if __name__ == "__main__":

0 commit comments

Comments
 (0)