Skip to content

Bulk upsert integration test #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions include/ydb-cpp-sdk/util/datetime/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class TTimeBase {
return Value_;
}

auto operator<=>(const TTimeBase&) const = default;

protected:
TValue Value_; // microseconds count
};
Expand Down Expand Up @@ -581,36 +583,6 @@ ::NPrivate::TPrintableLocalTime<true, true> FormatIsoLocalUpToSeconds(TInstant i
::NPrivate::TPrintableLocalTime<true, false> FormatLocalUpToSeconds(TInstant instant);
///@}

template <class S>
static constexpr bool operator<(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() < r.GetValue();
}

template <class S>
static constexpr bool operator<=(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() <= r.GetValue();
}

template <class S>
static constexpr bool operator==(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() == r.GetValue();
}

template <class S>
static constexpr bool operator!=(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() != r.GetValue();
}

template <class S>
static constexpr bool operator>(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() > r.GetValue();
}

template <class S>
static constexpr bool operator>=(const TTimeBase<S>& l, const TTimeBase<S>& r) noexcept {
return l.GetValue() >= r.GetValue();
}

namespace NDateTimeHelpers {
template <typename T>
static constexpr T SumWithSaturation(T a, T b) {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
add_subdirectory(basic_example_it)
add_subdirectory(bulk_upsert_simple_it)
13 changes: 13 additions & 0 deletions tests/integration/bulk_upsert_simple_it/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
add_ydb_test(NAME bulk_upsert_simple_it
SOURCES
main.cpp
bulk_upsert.cpp
bulk_upsert.h
LINK_LIBRARIES
yutil
YDB-CPP-SDK::Table
library-getopt
GTest::gtest_main
LABELS
integration
)
179 changes: 179 additions & 0 deletions tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include "bulk_upsert.h"

#include <src/library/getopt/last_getopt.h>

#include <filesystem>

static constexpr size_t BATCH_SIZE = 1000;

static void ThrowOnError(const TStatus& status) {
if (!status.IsSuccess()) {
throw TYdbErrorException(status) << status;
}
}

static std::string JoinPath(const std::string& basePath, const std::string& path) {
if (basePath.empty()) {
return path;
}

std::filesystem::path prefixPathSplit(basePath);
prefixPathSplit /= path;

return prefixPathSplit;
}

TRunArgs GetRunArgs() {

std::string database = std::getenv("YDB_DATABASE");
std::string endpoint = std::getenv("YDB_ENDPOINT");

auto driverConfig = TDriverConfig()
.SetEndpoint(endpoint)
.SetDatabase(database)
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");

TDriver driver(driverConfig);
return {driver, JoinPath(database, "bulk")};
}

TStatus CreateLogTable(TTableClient& client, const std::string& table) {
std::cerr << "Create table " << table << "\n";

TRetryOperationSettings settings;
auto status = client.RetryOperationSync([&table](TSession session) {
auto tableDesc = TTableBuilder()
.AddNullableColumn("App", EPrimitiveType::Utf8)
.AddNullableColumn("Timestamp", EPrimitiveType::Timestamp)
.AddNullableColumn("Host", EPrimitiveType::Utf8)
.AddNullableColumn("HttpCode", EPrimitiveType::Uint32)
.AddNullableColumn("Message", EPrimitiveType::Utf8)
.SetPrimaryKeyColumns({"App", "Host", "Timestamp"})
.Build();

return session.CreateTable(table, std::move(tableDesc)).GetValueSync();
}, settings);

return status;
}

TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage>& setMessage) {
logBatch.clear();
uint32_t correctSumApp = 0;
uint32_t correctSumHost = 0;
uint32_t correctRowCount = 0;

for (size_t i = 0; i < BATCH_SIZE; ++i) {
TLogMessage message;
message.pk.App = "App_" + ToString(logOffset % 10);
message.pk.Host = "192.168.0." + ToString(logOffset % 11);
message.pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000);
message.HttpCode = 200;
message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1";
logBatch.emplace_back(message);

if (!setMessage.contains(message)) {
correctSumApp += logOffset % 10;
correctSumHost += logOffset % 11;
++correctRowCount;
setMessage.insert(message);
}

}
return {correctSumApp, correctSumHost, correctRowCount};
}

TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
const TRetryOperationSettings& retrySettings) {
TValueBuilder rows;
rows.BeginList();
for (const auto& message : logBatch) {
rows.AddListItem()
.BeginStruct()
.AddMember("App").Utf8(message.pk.App)
.AddMember("Host").Utf8(message.pk.Host)
.AddMember("Timestamp").Timestamp(message.pk.Timestamp)
.AddMember("HttpCode").Uint32(message.HttpCode)
.AddMember("Message").Utf8(message.Message)
.EndStruct();
}
rows.EndList();
auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) {
TValue r = rowsValue;
auto status = tableClient.BulkUpsert(table, std::move(r));
return status.GetValueSync();
};

auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings);
return status;
}

static TStatus ScanQuerySelect(TTableClient& client, const std::string& path, std::vector <TResultSet>& vectorResultSet) {
std::filesystem::path filesystemPath(path);
auto query = std::format(R"(
--!syntax_v1
PRAGMA TablePathPrefix("{}");

SELECT *
FROM {}
)", filesystemPath.parent_path().c_str(), filesystemPath.filename().c_str());

auto resultScanQuery = client.StreamExecuteScanQuery(query).GetValueSync();

if (!resultScanQuery.IsSuccess()) {
return resultScanQuery;
}

bool eos = false;

while (!eos) {
auto streamPart = resultScanQuery.ReadNext().ExtractValueSync();

if (!streamPart.IsSuccess()) {
eos = true;
if (!streamPart.EOS()) {
return streamPart;
}
continue;
}

if (streamPart.HasResultSet()) {
auto rs = streamPart.ExtractResultSet();
vectorResultSet.push_back(rs);
}
}
return TStatus(EStatus::SUCCESS, NYql::TIssues());
}

TStatistic ScanQuerySelect(TTableClient& client, const std::string& path) {
std::vector <TResultSet> vectorResultSet;
ThrowOnError(client.RetryOperationSync([path, &vectorResultSet](TTableClient& client) {
return ScanQuerySelect(client, path, vectorResultSet);
}));

uint32_t sumApp = 0;
uint32_t sumHost = 0;
uint32_t rowCount = 0;

for (TResultSet& resultSet : vectorResultSet) {
TResultSetParser parser(resultSet);

while (parser.TryNextRow()) {

++rowCount;
sumApp += ToString(parser.ColumnParser("App").GetOptionalUtf8()).back() - '0';
std::string strHost = ToString(parser.ColumnParser("Host").GetOptionalUtf8());
char penCharStrHost = strHost[strHost.size() - 2];
sumHost += strHost.back() - '0' + (penCharStrHost == '.' ? 0 : (penCharStrHost - '0') * 10);
}

}

return {sumApp, sumHost, rowCount};
}

void DropTable(TTableClient& client, const std::string& path) {
ThrowOnError(client.RetryOperationSync([path](TSession session) {
return session.DropTable(path).ExtractValueSync();
}));
}
49 changes: 49 additions & 0 deletions tests/integration/bulk_upsert_simple_it/bulk_upsert.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <ydb-cpp-sdk/client/driver/driver.h>
#include <ydb-cpp-sdk/client/table/table.h>

using namespace NYdb;
using namespace NYdb::NTable;

struct TRunArgs {
TDriver driver;
std::string path;
};

class TLogMessage {
struct TPrimaryKeyLogMessage {
std::string App;
std::string Host;
TInstant Timestamp;
auto operator<=>(const TPrimaryKeyLogMessage&) const = default;
};

public:
TPrimaryKeyLogMessage pk;
uint32_t HttpCode;
std::string Message;
auto operator<=>(const TLogMessage& o) const {return pk <=> o.pk;};
};

class TYdbErrorException : public yexception {
public:
TYdbErrorException(const NYdb::TStatus& status)
: Status(status) {}

NYdb::TStatus Status;
};

struct TStatistic {
uint32_t sumApp;
uint32_t sumHost;
uint32_t rowCount;
};

TRunArgs GetRunArgs();
TStatus CreateLogTable(TTableClient& client, const std::string& table);
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage>& setMessage);
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
const TRetryOperationSettings& retrySettings);
TStatistic ScanQuerySelect(TTableClient& client, const std::string& path);
void DropTable(TTableClient& client, const std::string& path);
49 changes: 49 additions & 0 deletions tests/integration/bulk_upsert_simple_it/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "bulk_upsert.h"

#include <gtest/gtest.h>

#include <set>

TEST(Integration, BulkUpsert) {

uint32_t correctSumApp = 0;
uint32_t correctSumHost = 0;
std::set <TLogMessage> setMessage;

auto [driver, path] = GetRunArgs();

TTableClient client(driver);
uint32_t count = 1000;
TStatus statusCreate = CreateLogTable(client, path);
if (!statusCreate.IsSuccess()) {
FAIL() << "Create table failed with status: " << statusCreate << std::endl;
}

TRetryOperationSettings writeRetrySettings;
writeRetrySettings
.Idempotent(true)
.MaxRetries(20);

std::vector<TLogMessage> logBatch;
for (uint32_t offset = 0; offset < count; ++offset) {

auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, setMessage);
correctSumApp += batchSumApp;
correctSumHost += batchSumHost;

TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings);
if (!statusWrite.IsSuccess()) {
FAIL() << "Write failed with status: " << statusWrite << std::endl;
}
}

auto [sumApp, sumHost, rowCount] = ScanQuerySelect(client, path);

EXPECT_EQ(rowCount, setMessage.size());
EXPECT_EQ(sumApp, correctSumApp);
EXPECT_EQ(sumHost, correctSumHost);

DropTable(client, path);
driver.Stop(true);

}
Loading