Skip to content

Commit 06b2c92

Browse files
bulk upsert test
1 parent ab43cd3 commit 06b2c92

File tree

4 files changed

+290
-0
lines changed

4 files changed

+290
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
add_ydb_test(NAME bulk_upsert_simple_it
2+
SOURCES
3+
main.cpp
4+
bulk_upsert.cpp
5+
bulk_upsert.h
6+
LINK_LIBRARIES
7+
yutil
8+
YDB-CPP-SDK::Table
9+
library-getopt
10+
GTest::gtest_main
11+
LABELS
12+
integration
13+
)
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#include "bulk_upsert.h"
2+
3+
#include <src/library/getopt/last_getopt.h>
4+
5+
#include <filesystem>
6+
7+
static constexpr size_t BATCH_SIZE = 1000;
8+
9+
static void ThrowOnError(const TStatus& status) {
10+
if (!status.IsSuccess()) {
11+
throw TYdbErrorException(status) << status;
12+
}
13+
}
14+
15+
static std::string JoinPath(const std::string& basePath, const std::string& path) {
16+
if (basePath.empty()) {
17+
return path;
18+
}
19+
20+
std::filesystem::path prefixPathSplit(basePath);
21+
prefixPathSplit /= path;
22+
23+
return prefixPathSplit;
24+
}
25+
26+
TRunArgs GetRunArgs() {
27+
28+
std::string database = std::getenv("YDB_DATABASE");
29+
std::string endpoint = std::getenv("YDB_ENDPOINT");
30+
31+
auto driverConfig = TDriverConfig()
32+
.SetEndpoint(endpoint)
33+
.SetDatabase(database)
34+
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");
35+
36+
TDriver driver(driverConfig);
37+
return {driver, JoinPath(database, "bulk")};
38+
}
39+
40+
TStatus CreateLogTable(TTableClient& client, const std::string& table) {
41+
std::cerr << "Create table " << table << "\n";
42+
43+
TRetryOperationSettings settings;
44+
auto status = client.RetryOperationSync([&table](TSession session) {
45+
auto tableDesc = TTableBuilder()
46+
.AddNullableColumn("App", EPrimitiveType::Utf8)
47+
.AddNullableColumn("Timestamp", EPrimitiveType::Timestamp)
48+
.AddNullableColumn("Host", EPrimitiveType::Utf8)
49+
.AddNullableColumn("HttpCode", EPrimitiveType::Uint32)
50+
.AddNullableColumn("Message", EPrimitiveType::Utf8)
51+
.SetPrimaryKeyColumns({"App", "Host", "Timestamp"})
52+
.Build();
53+
54+
return session.CreateTable(table, std::move(tableDesc)).GetValueSync();
55+
}, settings);
56+
57+
return status;
58+
}
59+
60+
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage>& setMessage) {
61+
logBatch.clear();
62+
uint32_t correctSumApp = 0;
63+
uint32_t correctSumHost = 0;
64+
uint32_t correctRowCount = 0;
65+
66+
for (size_t i = 0; i < BATCH_SIZE; ++i) {
67+
TLogMessage message;
68+
message.pk.App = "App_" + ToString(logOffset % 10);
69+
message.pk.Host = "192.168.0." + ToString(logOffset % 11);
70+
message.pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000);
71+
message.HttpCode = 200;
72+
message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1";
73+
logBatch.emplace_back(message);
74+
75+
if (!setMessage.contains(message)) {
76+
correctSumApp += logOffset % 10;
77+
correctSumHost += logOffset % 11;
78+
++correctRowCount;
79+
setMessage.insert(message);
80+
}
81+
82+
}
83+
return {correctSumApp, correctSumHost, correctRowCount};
84+
}
85+
86+
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
87+
const TRetryOperationSettings& retrySettings) {
88+
TValueBuilder rows;
89+
rows.BeginList();
90+
for (const auto& message : logBatch) {
91+
rows.AddListItem()
92+
.BeginStruct()
93+
.AddMember("App").Utf8(message.pk.App)
94+
.AddMember("Host").Utf8(message.pk.Host)
95+
.AddMember("Timestamp").Timestamp(message.pk.Timestamp)
96+
.AddMember("HttpCode").Uint32(message.HttpCode)
97+
.AddMember("Message").Utf8(message.Message)
98+
.EndStruct();
99+
}
100+
rows.EndList();
101+
auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) {
102+
TValue r = rowsValue;
103+
auto status = tableClient.BulkUpsert(table, std::move(r));
104+
return status.GetValueSync();
105+
};
106+
107+
auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings);
108+
return status;
109+
}
110+
111+
static TStatus ScanQuerySelect(TTableClient& client, const std::string& path, std::vector <TResultSet>& vectorResultSet) {
112+
std::filesystem::path filesystemPath(path);
113+
auto query = std::format(R"(
114+
--!syntax_v1
115+
PRAGMA TablePathPrefix("{}");
116+
117+
SELECT *
118+
FROM {}
119+
)", filesystemPath.parent_path().c_str(), filesystemPath.filename().c_str());
120+
121+
auto resultScanQuery = client.StreamExecuteScanQuery(query).GetValueSync();
122+
123+
if (!resultScanQuery.IsSuccess()) {
124+
return resultScanQuery;
125+
}
126+
127+
bool eos = false;
128+
129+
while (!eos) {
130+
auto streamPart = resultScanQuery.ReadNext().ExtractValueSync();
131+
132+
if (!streamPart.IsSuccess()) {
133+
eos = true;
134+
if (!streamPart.EOS()) {
135+
return streamPart;
136+
}
137+
continue;
138+
}
139+
140+
if (streamPart.HasResultSet()) {
141+
auto rs = streamPart.ExtractResultSet();
142+
vectorResultSet.push_back(rs);
143+
}
144+
}
145+
return TStatus(EStatus::SUCCESS, NYql::TIssues());
146+
}
147+
148+
TStatistic ScanQuerySelect(TTableClient& client, const std::string& path) {
149+
std::vector <TResultSet> vectorResultSet;
150+
ThrowOnError(client.RetryOperationSync([path, &vectorResultSet](TTableClient& client) {
151+
return ScanQuerySelect(client, path, vectorResultSet);
152+
}));
153+
154+
uint32_t sumApp = 0;
155+
uint32_t sumHost = 0;
156+
uint32_t rowCount = 0;
157+
158+
for (TResultSet& resultSet : vectorResultSet) {
159+
TResultSetParser parser(resultSet);
160+
161+
while (parser.TryNextRow()) {
162+
163+
++rowCount;
164+
sumApp += ToString(parser.ColumnParser("App").GetOptionalUtf8()).back() - '0';
165+
std::string strHost = ToString(parser.ColumnParser("Host").GetOptionalUtf8());
166+
char penCharStrHost = strHost[strHost.size() - 2];
167+
sumHost += strHost.back() - '0' + (penCharStrHost == '.' ? 0 : (penCharStrHost - '0') * 10);
168+
}
169+
170+
}
171+
172+
return {sumApp, sumHost, rowCount};
173+
}
174+
175+
void DropTable(TTableClient& client, const std::string& path) {
176+
ThrowOnError(client.RetryOperationSync([path](TSession session) {
177+
return session.DropTable(path).ExtractValueSync();
178+
}));
179+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/client/driver/driver.h>
4+
#include <ydb-cpp-sdk/client/table/table.h>
5+
6+
using namespace NYdb;
7+
using namespace NYdb::NTable;
8+
9+
struct TRunArgs {
10+
TDriver driver;
11+
std::string path;
12+
};
13+
14+
class TLogMessage {
15+
struct TPrimaryKeyLogMessage {
16+
std::string App;
17+
std::string Host;
18+
TInstant Timestamp;
19+
auto operator<=>(const TPrimaryKeyLogMessage&) const = default;
20+
};
21+
22+
public:
23+
TPrimaryKeyLogMessage pk;
24+
uint32_t HttpCode;
25+
std::string Message;
26+
auto operator<=>(const TLogMessage& o) const {return pk <=> o.pk;};
27+
};
28+
29+
class TYdbErrorException : public yexception {
30+
public:
31+
TYdbErrorException(const NYdb::TStatus& status)
32+
: Status(status) {}
33+
34+
NYdb::TStatus Status;
35+
};
36+
37+
struct TStatistic {
38+
uint32_t sumApp;
39+
uint32_t sumHost;
40+
uint32_t rowCount;
41+
};
42+
43+
TRunArgs GetRunArgs();
44+
TStatus CreateLogTable(TTableClient& client, const std::string& table);
45+
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage>& setMessage);
46+
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
47+
const TRetryOperationSettings& retrySettings);
48+
TStatistic ScanQuerySelect(TTableClient& client, const std::string& path);
49+
void DropTable(TTableClient& client, const std::string& path);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#include "bulk_upsert.h"
2+
3+
#include <gtest/gtest.h>
4+
5+
#include <set>
6+
7+
TEST(Integration, BulkUpsert) {
8+
9+
uint32_t correctSumApp = 0;
10+
uint32_t correctSumHost = 0;
11+
std::set <TLogMessage> setMessage;
12+
13+
auto [driver, path] = GetRunArgs();
14+
15+
TTableClient client(driver);
16+
uint32_t count = 1000;
17+
TStatus statusCreate = CreateLogTable(client, path);
18+
if (!statusCreate.IsSuccess()) {
19+
FAIL() << "Create table failed with status: " << statusCreate << std::endl;
20+
}
21+
22+
TRetryOperationSettings writeRetrySettings;
23+
writeRetrySettings
24+
.Idempotent(true)
25+
.MaxRetries(20);
26+
27+
std::vector<TLogMessage> logBatch;
28+
for (uint32_t offset = 0; offset < count; ++offset) {
29+
30+
auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, setMessage);
31+
correctSumApp += batchSumApp;
32+
correctSumHost += batchSumHost;
33+
34+
TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings);
35+
if (!statusWrite.IsSuccess()) {
36+
FAIL() << "Write failed with status: " << statusWrite << std::endl;
37+
}
38+
}
39+
40+
auto [sumApp, sumHost, rowCount] = ScanQuerySelect(client, path);
41+
42+
EXPECT_EQ(rowCount, setMessage.size());
43+
EXPECT_EQ(sumApp, correctSumApp);
44+
EXPECT_EQ(sumHost, correctSumHost);
45+
46+
DropTable(client, path);
47+
driver.Stop(true);
48+
49+
}

0 commit comments

Comments
 (0)