Skip to content

Commit 4b0fe7f

Browse files
committed
update query processing for benchmark
1 parent b8cf852 commit 4b0fe7f

File tree

3 files changed

+47
-33
lines changed

3 files changed

+47
-33
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
152152
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
153153
, FreeSpace(freeSpace)
154154
, TopicClient(Driver, GetTopicClientSettings())
155+
, File("data/result/" + std::get<TString>(TxId), EOpenModeFlag::CreateAlways | EOpenModeFlag::WrOnly)
155156
{
156157
EgressStats.Level = statsLevel;
157158
}
@@ -193,9 +194,11 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
193194

194195
TString data(dataCol.AsStringRef());
195196

197+
TString logData(TStringBuilder() << Now() << " " << TxId << ": " << data << "\n");
198+
File.Write(logData.data(), logData.size());
199+
196200
LWPROBE(PqWriteDataToSend, TString(TStringBuilder() << TxId), SinkParams.GetTopicPath(), data);
197201
SINK_LOG_T("Received data for sending: " << data);
198-
Cerr << Now() << " -------------- PQ sink: send data, count " << data << Endl;
199202

200203
const auto messageSize = GetItemSize(data);
201204
if (messageSize > MaxMessageSize) {
@@ -484,6 +487,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
484487
std::queue<TString> Buffer;
485488
std::queue<TAckInfo> WaitingAcks; // Size of items which are waiting for acks (used to update free space)
486489
std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
490+
TFile File;
487491
};
488492

489493
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@
9999
#include <library/cpp/digest/md5/md5.h>
100100
#include <ydb/library/actors/http/http_proxy.h>
101101

102-
#include <util/folder/iterator.h>
103102
#include <util/generic/string.h>
104103
#include <util/generic/hash.h>
105104
#include <util/generic/scope.h>
@@ -1134,11 +1133,11 @@ int RunMain(int argc, const char* argv[])
11341133
);
11351134

11361135
TVector<TProgramPtr> programs;
1137-
for (const auto& entry : TDirIterator(progFiles)) {
1138-
if (entry.fts_type != FTS_F) {
1139-
continue;
1136+
for (int i = 0;; ++i) {
1137+
auto progFile = TString("data/query/" + std::to_string(i) + ".txt");
1138+
if (!NFs::Exists(progFile)) {
1139+
break;
11401140
}
1141-
const auto& progFile = entry.fts_path;
11421141

11431142
TProgramPtr program;
11441143
if (res.Has("replay") && res.Has("capture")) {

ydb/library/yql/tools/dqrun/gen.py

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66

77

88
OUTPUT_DIR = "/home/vokayndzop/ydb/ydb/library/yql/tools/dqrun/data/"
9-
QUERIES_NUM = 2
9+
QUERIES_NUM = 100
1010
TABLE_NAME = "pq.`match`"
11-
ROW_NUM = 100
12-
UINT64_COLUMN_NUM = 3#750
13-
STR32_COLUMN_NUM = 0#375
14-
STR64_COLUMN_NUM = 0#375
11+
ROW_NUM = 1000
12+
UINT64_COLUMN_NUM = 10
13+
STR32_COLUMN_NUM = 5
14+
STR64_COLUMN_NUM = 5
1515
COLUMN_NUM = UINT64_COLUMN_NUM + STR32_COLUMN_NUM + STR64_COLUMN_NUM
16-
FILLED_COLUMN_NUM = 2
17-
MAX_VALUE = 2
16+
FILLED_COLUMN_NUM = 20
1817
Row = List
1918

2019

@@ -33,8 +32,8 @@ class Table:
3332

3433

3534
def validate() -> None:
36-
assert(0 < FILLED_COLUMN_NUM < COLUMN_NUM)
37-
assert(0 < MAX_VALUE < 2 ** 64)
35+
assert(QUERIES_NUM < ROW_NUM)
36+
assert(0 < FILLED_COLUMN_NUM <= COLUMN_NUM)
3837

3938

4039
def gen_column_names(column_num: int) -> List[str]:
@@ -61,31 +60,29 @@ def type_to_sql(column_type: str) -> str:
6160
def int_to_str(n: int, length: int) -> str:
6261
result = ""
6362
while n > 0:
64-
div, mod = divmod(n, 10)
63+
div, mod = divmod(n, 26)
6564
result += chr(ord('a') + mod)
6665
n = div
67-
result += '_' * (length - len(result))
66+
result += 'a' * (length - len(result))
6867
return result
6968

7069

71-
def gen_value(column_type: str):
72-
value = random.randint(0, MAX_VALUE - 1)
70+
def gen_cell(row_index: int, column_type: str):
71+
value = 2 * row_index
7372
if column_type == "uint64":
7473
return value
7574
elif column_type == "str32":
7675
return int_to_str(value, 32)
77-
# return "".join(random.choices(string.ascii_lowercase, k=32))
7876
elif column_type == "str64":
7977
return int_to_str(value, 64)
80-
# return "".join(random.choices(string.ascii_lowercase, k=64))
8178
else:
8279
raise RuntimeError()
8380

8481

85-
def gen_row(column_types: List[str]) -> Row:
82+
def gen_row(row_index: int, column_types: List[str]) -> Row:
8683
are_filled = [i < FILLED_COLUMN_NUM for i in range(len(column_types))]
8784
random.shuffle(are_filled)
88-
return [gen_value(column_type) if is_filled else None for is_filled, column_type in zip(are_filled, column_types)]
85+
return [gen_cell(row_index, column_type) if is_filled else None for is_filled, column_type in zip(are_filled, column_types)]
8986

9087

9188
def gen_table(name: str, column_names: List[str], column_types: List[str], rows: List[Row]) -> Table:
@@ -105,18 +102,33 @@ def write_table(filename: str, table: Table) -> None:
105102
file.write('\n')
106103

107104

108-
def gen_query(table: Table, column_index: int, value) -> str:
109-
return f"""SELECT *
105+
def gen_value(query_index: int, table: Table) -> str:
106+
column_type = table.column_types[query_index % len(table.column_types)]
107+
value = query_index % (2 * ROW_NUM)
108+
if column_type == "uint64":
109+
return f"{value}UL"
110+
elif column_type == "str32":
111+
return f'"{int_to_str(value, 32)}"'
112+
elif column_type == "str64":
113+
return f'"{int_to_str(value, 64)}"'
114+
else:
115+
raise RuntimeError()
116+
117+
118+
def gen_query(query_index: int, table: Table, value: str) -> str:
119+
return f"""$match = SELECT *
110120
FROM {table.name}
111121
WITH (
112122
FORMAT=json_each_row,
113123
SCHEMA
114124
(
115-
{",".join([name + " " + type_to_sql(type) + "?" for name, type in zip(table.column_names, table.column_types)])}
125+
{",".join([name + " " + type_to_sql(type) for name, type in zip(table.column_names, table.column_types)])}
116126
)
117127
)
118-
WHERE {table.column_names[column_index]} IS NOT NULL AND {table.column_names[column_index]} == {value}
119-
LIMIT 20;
128+
WHERE {table.column_names[query_index % len(table.column_names)]} == {value};
129+
130+
INSERT INTO pq.`match`
131+
SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) FROM $match;
120132
"""
121133

122134

@@ -130,15 +142,14 @@ def main():
130142

131143
column_names = gen_column_names(COLUMN_NUM)
132144
column_types = gen_column_types(COLUMN_NUM)
133-
rows = [gen_row(column_types) for _ in range(ROW_NUM)]
145+
rows = [gen_row(row_index, column_types) for row_index in range(ROW_NUM)]
134146
table = gen_table(TABLE_NAME, column_names, column_types, rows)
135147
write_table(f"{OUTPUT_DIR}/data.txt", table)
136148

137149
pathlib.Path(f"{OUTPUT_DIR}/query/").mkdir(parents=True, exist_ok=True)
138-
for i in range(QUERIES_NUM):
139-
column_index = i % COLUMN_NUM
140-
value = gen_value(column_types[column_index])
141-
write_query(f"{OUTPUT_DIR}/query/{i}.txt", gen_query(table, column_index, value))
150+
for query_index in range(QUERIES_NUM):
151+
value = gen_value(query_index, table)
152+
write_query(f"{OUTPUT_DIR}/query/{query_index}.txt", gen_query(query_index, table, value))
142153

143154

144155
if __name__ == "__main__":

0 commit comments

Comments
 (0)