Skip to content

Commit 8c54519

Browse files
Bulk upsert integration test (#280)
1 parent 12555eb commit 8c54519

File tree

5 files changed

+277
-0
lines changed

5 files changed

+277
-0
lines changed

tests/integration/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
add_subdirectory(basic_example_it)
2+
add_subdirectory(bulk_upsert_simple_it)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
add_ydb_test(NAME bulk_upsert_simple_it
2+
SOURCES
3+
main.cpp
4+
bulk_upsert.cpp
5+
bulk_upsert.h
6+
LINK_LIBRARIES
7+
yutil
8+
YDB-CPP-SDK::Table
9+
library-getopt
10+
GTest::gtest_main
11+
LABELS
12+
integration
13+
)
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
#include "bulk_upsert.h"
2+
3+
#include <filesystem>
4+
5+
static constexpr size_t BATCH_SIZE = 1000;
6+
7+
static void ThrowOnError(const TStatus& status) {
8+
if (!status.IsSuccess()) {
9+
throw TYdbErrorException(status) << status;
10+
}
11+
}
12+
13+
static std::string JoinPath(const std::string& basePath, const std::string& path) {
14+
if (basePath.empty()) {
15+
return path;
16+
}
17+
18+
std::filesystem::path prefixPathSplit(basePath);
19+
prefixPathSplit /= path;
20+
21+
return prefixPathSplit;
22+
}
23+
24+
TRunArgs GetRunArgs() {
25+
26+
std::string database = std::getenv("YDB_DATABASE");
27+
std::string endpoint = std::getenv("YDB_ENDPOINT");
28+
29+
auto driverConfig = TDriverConfig()
30+
.SetEndpoint(endpoint)
31+
.SetDatabase(database)
32+
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");
33+
34+
TDriver driver(driverConfig);
35+
return {driver, JoinPath(database, "bulk")};
36+
}
37+
38+
TStatus CreateTable(TTableClient& client, const std::string& table) {
39+
TRetryOperationSettings settings;
40+
auto status = client.RetryOperationSync([&table](TSession session) {
41+
auto tableDesc = TTableBuilder()
42+
.AddNullableColumn("App", EPrimitiveType::Utf8)
43+
.AddNullableColumn("Timestamp", EPrimitiveType::Timestamp)
44+
.AddNullableColumn("Host", EPrimitiveType::Utf8)
45+
.AddNonNullableColumn("Id", EPrimitiveType::Uint64)
46+
.AddNullableColumn("HttpCode", EPrimitiveType::Uint32)
47+
.AddNullableColumn("Message", EPrimitiveType::Utf8)
48+
.SetPrimaryKeyColumns({"App", "Timestamp", "Host", "Id"})
49+
.Build();
50+
51+
return session.CreateTable(table, std::move(tableDesc)).GetValueSync();
52+
}, settings);
53+
54+
return status;
55+
}
56+
57+
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, uint32_t lastNumber) {
58+
logBatch.clear();
59+
uint32_t correctSumApp = 0;
60+
uint32_t correctSumHost = 0;
61+
uint32_t correctRowCount = 0;
62+
63+
for (size_t i = 0; i < BATCH_SIZE; ++i) {
64+
TLogMessage message;
65+
message.Pk.Id = correctRowCount + lastNumber;
66+
message.Pk.App = "App_" + std::to_string(logOffset % 10);
67+
message.Pk.Host = "192.168.0." + std::to_string(logOffset % 11);
68+
message.Pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000);
69+
message.HttpCode = 200;
70+
message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1";
71+
logBatch.emplace_back(message);
72+
73+
correctSumApp += logOffset % 10;
74+
correctSumHost += logOffset % 11;
75+
++correctRowCount;
76+
77+
}
78+
return {correctSumApp, correctSumHost, correctRowCount};
79+
}
80+
81+
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
82+
const TRetryOperationSettings& retrySettings) {
83+
TValueBuilder rows;
84+
rows.BeginList();
85+
for (const auto& message : logBatch) {
86+
rows.AddListItem()
87+
.BeginStruct()
88+
.AddMember("Id").Uint64(message.Pk.Id)
89+
.AddMember("App").Utf8(message.Pk.App)
90+
.AddMember("Host").Utf8(message.Pk.Host)
91+
.AddMember("Timestamp").Timestamp(message.Pk.Timestamp)
92+
.AddMember("HttpCode").Uint32(message.HttpCode)
93+
.AddMember("Message").Utf8(message.Message)
94+
.EndStruct();
95+
}
96+
rows.EndList();
97+
auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) {
98+
TValue r = rowsValue;
99+
auto status = tableClient.BulkUpsert(table, std::move(r));
100+
return status.GetValueSync();
101+
};
102+
103+
auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings);
104+
return status;
105+
}
106+
107+
static TStatus SelectTransaction(TSession session, const std::string& path,
108+
std::optional<TResultSet>& resultSet) {
109+
std::filesystem::path filesystemPath(path);
110+
auto query = std::format(R"(
111+
PRAGMA TablePathPrefix("{}");
112+
113+
SELECT
114+
SUM(CAST(SUBSTRING(CAST(App as string), 4) as Int32)),
115+
SUM(CAST(SUBSTRING(CAST(Host as string), 10) as Int32)),
116+
COUNT(*)
117+
FROM {}
118+
)", filesystemPath.parent_path().string(), filesystemPath.filename().string());
119+
120+
auto txControl =
121+
TTxControl::BeginTx(TTxSettings::SerializableRW())
122+
.CommitTx();
123+
124+
auto result = session.ExecuteDataQuery(query, txControl).GetValueSync();
125+
126+
if (result.IsSuccess()) {
127+
resultSet = result.GetResultSet(0);
128+
}
129+
130+
return result;
131+
}
132+
133+
TStatistic Select(TTableClient& client, const std::string& path) {
134+
std::optional<TResultSet> resultSet;
135+
ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) {
136+
return SelectTransaction(session, path, resultSet);
137+
}));
138+
139+
TResultSetParser parser(*resultSet);
140+
141+
uint64_t sumApp = 0;
142+
uint64_t sumHost = 0;
143+
uint64_t rowCount = 0;
144+
145+
if (parser.ColumnsCount() != 3 || parser.RowsCount() != 1) {
146+
throw TYdbErrorException(TStatus(EStatus::GENERIC_ERROR,
147+
{NYql::TIssue("The number of columns should be: 3.\nThe number of rows should be: 1")}));
148+
}
149+
150+
if (parser.TryNextRow()) {
151+
sumApp = *parser.ColumnParser("column0").GetOptionalInt64();
152+
sumHost = *parser.ColumnParser("column1").GetOptionalInt64();
153+
rowCount = parser.ColumnParser("column2").GetUint64();
154+
}
155+
156+
return {sumApp, sumHost, rowCount};
157+
}
158+
159+
void DropTable(TTableClient& client, const std::string& path) {
160+
ThrowOnError(client.RetryOperationSync([path](TSession session) {
161+
return session.DropTable(path).ExtractValueSync();
162+
}));
163+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/client/driver/driver.h>
4+
#include <ydb-cpp-sdk/client/table/table.h>
5+
#include <src/library/getopt/last_getopt.h>
6+
7+
using namespace NYdb;
8+
using namespace NYdb::NTable;
9+
10+
struct TRunArgs {
11+
TDriver Driver;
12+
std::string Path;
13+
};
14+
15+
struct TLogMessage {
16+
struct TPrimaryKeyLogMessage {
17+
std::string App;
18+
std::string Host;
19+
TInstant Timestamp;
20+
uint64_t Id;
21+
};
22+
23+
TPrimaryKeyLogMessage Pk;
24+
uint32_t HttpCode;
25+
std::string Message;
26+
};
27+
28+
class TYdbErrorException : public yexception {
29+
public:
30+
TYdbErrorException(const NYdb::TStatus& status)
31+
: Status(status) {}
32+
33+
NYdb::TStatus Status;
34+
};
35+
36+
struct TStatistic {
37+
uint64_t SumApp;
38+
uint64_t SumHost;
39+
uint64_t RowCount;
40+
};
41+
42+
TRunArgs GetRunArgs();
43+
TStatus CreateTable(TTableClient& client, const std::string& table);
44+
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, uint32_t lastNumber);
45+
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
46+
const TRetryOperationSettings& retrySettings);
47+
TStatistic Select(TTableClient& client, const std::string& path);
48+
void DropTable(TTableClient& client, const std::string& path);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include "bulk_upsert.h"
2+
3+
#include <gtest/gtest.h>
4+
5+
TEST(Integration, BulkUpsert) {
6+
7+
uint32_t correctSumApp = 0;
8+
uint32_t correctSumHost = 0;
9+
uint32_t correctRowCount = 0;
10+
11+
auto [driver, path] = GetRunArgs();
12+
13+
TTableClient client(driver);
14+
uint32_t count = 1000;
15+
TStatus statusCreate = CreateTable(client, path);
16+
if (!statusCreate.IsSuccess()) {
17+
FAIL() << "Create table failed with status: " << statusCreate << std::endl;
18+
}
19+
20+
TRetryOperationSettings writeRetrySettings;
21+
writeRetrySettings
22+
.Idempotent(true)
23+
.MaxRetries(20);
24+
25+
std::vector<TLogMessage> logBatch;
26+
for (uint32_t offset = 0; offset < count; ++offset) {
27+
28+
auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, correctRowCount);
29+
correctSumApp += batchSumApp;
30+
correctSumHost += batchSumHost;
31+
correctRowCount += batchRowCount;
32+
33+
TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings);
34+
if (!statusWrite.IsSuccess()) {
35+
FAIL() << "Write failed with status: " << statusWrite << std::endl;
36+
}
37+
}
38+
39+
try {
40+
auto [sumApp, sumHost, rowCount] = Select(client, path);
41+
EXPECT_EQ(rowCount, correctRowCount);
42+
EXPECT_EQ(sumApp, correctSumApp);
43+
EXPECT_EQ(sumHost, correctSumHost);
44+
} catch (const TYdbErrorException& e) {
45+
driver.Stop(true);
46+
FAIL() << "Execution failed due to fatal error:\nStatus: " << ToString(e.Status.GetStatus()) << std::endl << e.Status.GetIssues().ToString();
47+
}
48+
49+
DropTable(client, path);
50+
driver.Stop(true);
51+
52+
}

0 commit comments

Comments
 (0)