From 06b2c928aa9e035848e769a76436ac8df6dfcce1 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Wed, 31 Jul 2024 17:30:29 +0300 Subject: [PATCH 1/7] bulk upsert test --- .../bulk_upsert_simple_it/CMakeLists.txt | 13 ++ .../bulk_upsert_simple_it/bulk_upsert.cpp | 179 ++++++++++++++++++ .../bulk_upsert_simple_it/bulk_upsert.h | 49 +++++ .../bulk_upsert_simple_it/main.cpp | 49 +++++ 4 files changed, 290 insertions(+) create mode 100644 tests/integration/bulk_upsert_simple_it/CMakeLists.txt create mode 100644 tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp create mode 100644 tests/integration/bulk_upsert_simple_it/bulk_upsert.h create mode 100644 tests/integration/bulk_upsert_simple_it/main.cpp diff --git a/tests/integration/bulk_upsert_simple_it/CMakeLists.txt b/tests/integration/bulk_upsert_simple_it/CMakeLists.txt new file mode 100644 index 0000000000..027cf05071 --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/CMakeLists.txt @@ -0,0 +1,13 @@ +add_ydb_test(NAME bulk_upsert_simple_it + SOURCES + main.cpp + bulk_upsert.cpp + bulk_upsert.h + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Table + library-getopt + GTest::gtest_main + LABELS + integration +) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp new file mode 100644 index 0000000000..2016de903d --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -0,0 +1,179 @@ +#include "bulk_upsert.h" + +#include + +#include + +static constexpr size_t BATCH_SIZE = 1000; + +static void ThrowOnError(const TStatus& status) { + if (!status.IsSuccess()) { + throw TYdbErrorException(status) << status; + } +} + +static std::string JoinPath(const std::string& basePath, const std::string& path) { + if (basePath.empty()) { + return path; + } + + std::filesystem::path prefixPathSplit(basePath); + prefixPathSplit /= path; + + return prefixPathSplit; +} + +TRunArgs GetRunArgs() { + + std::string database = std::getenv("YDB_DATABASE"); + std::string endpoint = std::getenv("YDB_ENDPOINT"); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : ""); + + TDriver driver(driverConfig); + return {driver, JoinPath(database, "bulk")}; +} + +TStatus CreateLogTable(TTableClient& client, const std::string& table) { + std::cerr << "Create table " << table << "\n"; + + TRetryOperationSettings settings; + auto status = client.RetryOperationSync([&table](TSession session) { + auto tableDesc = TTableBuilder() + .AddNullableColumn("App", EPrimitiveType::Utf8) + .AddNullableColumn("Timestamp", EPrimitiveType::Timestamp) + .AddNullableColumn("Host", EPrimitiveType::Utf8) + .AddNullableColumn("HttpCode", EPrimitiveType::Uint32) + .AddNullableColumn("Message", EPrimitiveType::Utf8) + .SetPrimaryKeyColumns({"App", "Host", "Timestamp"}) + .Build(); + + return session.CreateTable(table, std::move(tableDesc)).GetValueSync(); + }, settings); + + return status; +} + +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage) { + logBatch.clear(); + uint32_t correctSumApp = 0; + uint32_t correctSumHost = 0; + uint32_t correctRowCount = 0; + + for (size_t i = 0; i < BATCH_SIZE; ++i) { + TLogMessage message; + message.pk.App = "App_" + ToString(logOffset % 10); + message.pk.Host = "192.168.0." + ToString(logOffset % 11); + message.pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); + message.HttpCode = 200; + message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1"; + logBatch.emplace_back(message); + + if (!setMessage.contains(message)) { + correctSumApp += logOffset % 10; + correctSumHost += logOffset % 11; + ++correctRowCount; + setMessage.insert(message); + } + + } + return {correctSumApp, correctSumHost, correctRowCount}; +} + +TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, + const TRetryOperationSettings& retrySettings) { + TValueBuilder rows; + rows.BeginList(); + for (const auto& message : logBatch) { + rows.AddListItem() + .BeginStruct() + .AddMember("App").Utf8(message.pk.App) + .AddMember("Host").Utf8(message.pk.Host) + .AddMember("Timestamp").Timestamp(message.pk.Timestamp) + .AddMember("HttpCode").Uint32(message.HttpCode) + .AddMember("Message").Utf8(message.Message) + .EndStruct(); + } + rows.EndList(); + auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) { + TValue r = rowsValue; + auto status = tableClient.BulkUpsert(table, std::move(r)); + return status.GetValueSync(); + }; + + auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings); + return status; +} + +static TStatus ScanQuerySelect(TTableClient& client, const std::string& path, std::vector & vectorResultSet) { + std::filesystem::path filesystemPath(path); + auto query = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + SELECT * + FROM {} + )", filesystemPath.parent_path().c_str(), filesystemPath.filename().c_str()); + + auto resultScanQuery = client.StreamExecuteScanQuery(query).GetValueSync(); + + if (!resultScanQuery.IsSuccess()) { + return resultScanQuery; + } + + bool eos = false; + + while (!eos) { + auto streamPart = resultScanQuery.ReadNext().ExtractValueSync(); + + if (!streamPart.IsSuccess()) { + eos = true; + if (!streamPart.EOS()) { + return streamPart; + } + continue; + } + + if (streamPart.HasResultSet()) { + auto rs = streamPart.ExtractResultSet(); + vectorResultSet.push_back(rs); + } + } + return TStatus(EStatus::SUCCESS, NYql::TIssues()); +} + +TStatistic ScanQuerySelect(TTableClient& client, const std::string& path) { + std::vector vectorResultSet; + ThrowOnError(client.RetryOperationSync([path, &vectorResultSet](TTableClient& client) { + return ScanQuerySelect(client, path, vectorResultSet); + })); + + uint32_t sumApp = 0; + uint32_t sumHost = 0; + uint32_t rowCount = 0; + + for (TResultSet& resultSet : vectorResultSet) { + TResultSetParser parser(resultSet); + + while (parser.TryNextRow()) { + + ++rowCount; + sumApp += ToString(parser.ColumnParser("App").GetOptionalUtf8()).back() - '0'; + std::string strHost = ToString(parser.ColumnParser("Host").GetOptionalUtf8()); + char penCharStrHost = strHost[strHost.size() - 2]; + sumHost += strHost.back() - '0' + (penCharStrHost == '.' ? 0 : (penCharStrHost - '0') * 10); + } + + } + + return {sumApp, sumHost, rowCount}; +} + +void DropTable(TTableClient& client, const std::string& path) { + ThrowOnError(client.RetryOperationSync([path](TSession session) { + return session.DropTable(path).ExtractValueSync(); + })); +} diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h new file mode 100644 index 0000000000..aafd21394f --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +struct TRunArgs { + TDriver driver; + std::string path; +}; + +class TLogMessage { + struct TPrimaryKeyLogMessage { + std::string App; + std::string Host; + TInstant Timestamp; + auto operator<=>(const TPrimaryKeyLogMessage&) const = default; + }; + +public: + TPrimaryKeyLogMessage pk; + uint32_t HttpCode; + std::string Message; + auto operator<=>(const TLogMessage& o) const {return pk <=> o.pk;}; +}; + +class TYdbErrorException : public yexception { +public: + TYdbErrorException(const NYdb::TStatus& status) + : Status(status) {} + + NYdb::TStatus Status; +}; + +struct TStatistic { + uint32_t sumApp; + uint32_t sumHost; + uint32_t rowCount; +}; + +TRunArgs GetRunArgs(); +TStatus CreateLogTable(TTableClient& client, const std::string& table); +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage); +TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, + const TRetryOperationSettings& retrySettings); +TStatistic ScanQuerySelect(TTableClient& client, const std::string& path); +void DropTable(TTableClient& client, const std::string& path); diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert_simple_it/main.cpp new file mode 100644 index 0000000000..d924340c8f --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/main.cpp @@ -0,0 +1,49 @@ +#include "bulk_upsert.h" + +#include + +#include + +TEST(Integration, BulkUpsert) { + + uint32_t correctSumApp = 0; + uint32_t correctSumHost = 0; + std::set setMessage; + + auto [driver, path] = GetRunArgs(); + + TTableClient client(driver); + uint32_t count = 1000; + TStatus statusCreate = CreateLogTable(client, path); + if (!statusCreate.IsSuccess()) { + FAIL() << "Create table failed with status: " << statusCreate << std::endl; + } + + TRetryOperationSettings writeRetrySettings; + writeRetrySettings + .Idempotent(true) + .MaxRetries(20); + + std::vector logBatch; + for (uint32_t offset = 0; offset < count; ++offset) { + + auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, setMessage); + correctSumApp += batchSumApp; + correctSumHost += batchSumHost; + + TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings); + if (!statusWrite.IsSuccess()) { + FAIL() << "Write failed with status: " << statusWrite << std::endl; + } + } + + auto [sumApp, sumHost, rowCount] = ScanQuerySelect(client, path); + + EXPECT_EQ(rowCount, setMessage.size()); + EXPECT_EQ(sumApp, correctSumApp); + EXPECT_EQ(sumHost, correctSumHost); + + DropTable(client, path); + driver.Stop(true); + +} From 68f0d4e326e1005f858063278b478d573e6d6309 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Wed, 31 Jul 2024 17:43:55 +0300 Subject: [PATCH 2/7] spaceship & added directory --- include/ydb-cpp-sdk/util/datetime/base.h | 32 ++---------------------- tests/integration/CMakeLists.txt | 1 + 2 files changed, 3 insertions(+), 30 deletions(-) diff --git a/include/ydb-cpp-sdk/util/datetime/base.h b/include/ydb-cpp-sdk/util/datetime/base.h index 389e39e46a..2b89d9350a 100644 --- a/include/ydb-cpp-sdk/util/datetime/base.h +++ b/include/ydb-cpp-sdk/util/datetime/base.h @@ -167,6 +167,8 @@ class TTimeBase { return Value_; } + auto operator<=>(const TTimeBase&) const = default; + protected: TValue Value_; // microseconds count }; @@ -581,36 +583,6 @@ ::NPrivate::TPrintableLocalTime FormatIsoLocalUpToSeconds(TInstant i ::NPrivate::TPrintableLocalTime FormatLocalUpToSeconds(TInstant instant); ///@} -template -static constexpr bool operator<(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() < r.GetValue(); -} - -template -static constexpr bool operator<=(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() <= r.GetValue(); -} - -template -static constexpr bool operator==(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() == r.GetValue(); -} - -template -static constexpr bool operator!=(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() != r.GetValue(); -} - -template -static constexpr bool operator>(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() > r.GetValue(); -} - -template -static constexpr bool operator>=(const TTimeBase& l, const TTimeBase& r) noexcept { - return l.GetValue() >= r.GetValue(); -} - namespace NDateTimeHelpers { template static constexpr T SumWithSaturation(T a, T b) { diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index a7a4e7c971..510ce5332f 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(basic_example_it) +add_subdirectory(bulk_upsert_simple_it) From 96a28114d9c7196dc9c561a1f34e2c736780d207 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 1 Aug 2024 13:30:21 +0300 Subject: [PATCH 3/7] fix after review --- include/ydb-cpp-sdk/util/datetime/base.h | 32 ++++++- .../bulk_upsert_simple_it/bulk_upsert.cpp | 86 ++++++++----------- .../bulk_upsert_simple_it/bulk_upsert.h | 17 ++-- .../bulk_upsert_simple_it/main.cpp | 8 +- 4 files changed, 80 insertions(+), 63 deletions(-) diff --git a/include/ydb-cpp-sdk/util/datetime/base.h b/include/ydb-cpp-sdk/util/datetime/base.h index 2b89d9350a..389e39e46a 100644 --- a/include/ydb-cpp-sdk/util/datetime/base.h +++ b/include/ydb-cpp-sdk/util/datetime/base.h @@ -167,8 +167,6 @@ class TTimeBase { return Value_; } - auto operator<=>(const TTimeBase&) const = default; - protected: TValue Value_; // microseconds count }; @@ -583,6 +581,36 @@ ::NPrivate::TPrintableLocalTime FormatIsoLocalUpToSeconds(TInstant i ::NPrivate::TPrintableLocalTime FormatLocalUpToSeconds(TInstant instant); ///@} +template +static constexpr bool operator<(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() < r.GetValue(); +} + +template +static constexpr bool operator<=(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() <= r.GetValue(); +} + +template +static constexpr bool operator==(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() == r.GetValue(); +} + +template +static constexpr bool operator!=(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() != r.GetValue(); +} + +template +static constexpr bool operator>(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() > r.GetValue(); +} + +template +static constexpr bool operator>=(const TTimeBase& l, const TTimeBase& r) noexcept { + return l.GetValue() >= r.GetValue(); +} + namespace NDateTimeHelpers { template static constexpr T SumWithSaturation(T a, T b) { diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp index 2016de903d..158bb0ae79 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -23,6 +23,10 @@ static std::string JoinPath(const std::string& basePath, const std::string& path return prefixPathSplit; } +bool TLogMessage::TPrimaryKeyLogMessage::operator<(const TLogMessage::TPrimaryKeyLogMessage& o) const { + return App < o.App || App == o.App && Host < o.Host || App == o.App && Host == o.Host && Timestamp < o.Timestamp; +} + TRunArgs GetRunArgs() { std::string database = std::getenv("YDB_DATABASE"); @@ -57,7 +61,7 @@ TStatus CreateLogTable(TTableClient& client, const std::string& table) { return status; } -TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage) { +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage) { logBatch.clear(); uint32_t correctSumApp = 0; uint32_t correctSumHost = 0; @@ -65,18 +69,18 @@ TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, s for (size_t i = 0; i < BATCH_SIZE; ++i) { TLogMessage message; - message.pk.App = "App_" + ToString(logOffset % 10); - message.pk.Host = "192.168.0." + ToString(logOffset % 11); + message.pk.App = "App_" + std::to_string(logOffset % 10); + message.pk.Host = "192.168.0." + std::to_string(logOffset % 11); message.pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); message.HttpCode = 200; message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1"; logBatch.emplace_back(message); - if (!setMessage.contains(message)) { + if (!setMessage.contains(message.pk)) { correctSumApp += logOffset % 10; correctSumHost += logOffset % 11; ++correctRowCount; - setMessage.insert(message); + setMessage.insert(message.pk); } } @@ -108,65 +112,49 @@ TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const return status; } -static TStatus ScanQuerySelect(TTableClient& client, const std::string& path, std::vector & vectorResultSet) { +static TStatus SelectTransaction(TSession session, const std::string& path, + std::optional& resultSet) { std::filesystem::path filesystemPath(path); auto query = std::format(R"( - --!syntax_v1 PRAGMA TablePathPrefix("{}"); - SELECT * + SELECT + SUM(CAST(SUBSTRING(CAST(App as string), 4) as Int32)), + SUM(CAST(SUBSTRING(CAST(Host as string), 10) as Int32)), + COUNT(*) FROM {} - )", filesystemPath.parent_path().c_str(), filesystemPath.filename().c_str()); + )", filesystemPath.parent_path().string(), filesystemPath.filename().string()); - auto resultScanQuery = client.StreamExecuteScanQuery(query).GetValueSync(); - - if (!resultScanQuery.IsSuccess()) { - return resultScanQuery; - } + auto txControl = + TTxControl::BeginTx(TTxSettings::SerializableRW()) + .CommitTx(); - bool eos = false; - - while (!eos) { - auto streamPart = resultScanQuery.ReadNext().ExtractValueSync(); - - if (!streamPart.IsSuccess()) { - eos = true; - if (!streamPart.EOS()) { - return streamPart; - } - continue; - } + auto result = session.ExecuteDataQuery(query, txControl).GetValueSync(); - if (streamPart.HasResultSet()) { - auto rs = streamPart.ExtractResultSet(); - vectorResultSet.push_back(rs); - } + if (result.IsSuccess()) { + resultSet = result.GetResultSet(0); } - return TStatus(EStatus::SUCCESS, NYql::TIssues()); + + return result; } -TStatistic ScanQuerySelect(TTableClient& client, const std::string& path) { - std::vector vectorResultSet; - ThrowOnError(client.RetryOperationSync([path, &vectorResultSet](TTableClient& client) { - return ScanQuerySelect(client, path, vectorResultSet); +TStatistic Select(TTableClient& client, const std::string& path) { + std::optional resultSet; + ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) { + return SelectTransaction(session, path, resultSet); })); - uint32_t sumApp = 0; - uint32_t sumHost = 0; - uint32_t rowCount = 0; + TResultSetParser parser(*resultSet); - for (TResultSet& resultSet : vectorResultSet) { - TResultSetParser parser(resultSet); - - while (parser.TryNextRow()) { + uint64_t sumApp = 0; + uint64_t sumHost = 0; + uint64_t rowCount = 0; - ++rowCount; - sumApp += ToString(parser.ColumnParser("App").GetOptionalUtf8()).back() - '0'; - std::string strHost = ToString(parser.ColumnParser("Host").GetOptionalUtf8()); - char penCharStrHost = strHost[strHost.size() - 2]; - sumHost += strHost.back() - '0' + (penCharStrHost == '.' ? 0 : (penCharStrHost - '0') * 10); - } - + if (parser.TryNextRow()) { + + sumApp = *parser.ColumnParser("column0").GetOptionalInt64(); + sumHost = *parser.ColumnParser("column1").GetOptionalInt64(); + rowCount = parser.ColumnParser("column2").GetUint64(); } return {sumApp, sumHost, rowCount}; diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h index aafd21394f..b82836c64f 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -11,19 +11,18 @@ struct TRunArgs { std::string path; }; -class TLogMessage { +struct TLogMessage { struct TPrimaryKeyLogMessage { std::string App; std::string Host; TInstant Timestamp; - auto operator<=>(const TPrimaryKeyLogMessage&) const = default; + bool operator<(const TPrimaryKeyLogMessage& o) const; }; -public: TPrimaryKeyLogMessage pk; uint32_t HttpCode; std::string Message; - auto operator<=>(const TLogMessage& o) const {return pk <=> o.pk;}; + bool operator<(const TLogMessage& o) const {return pk < o.pk;}; }; class TYdbErrorException : public yexception { @@ -35,15 +34,15 @@ class TYdbErrorException : public yexception { }; struct TStatistic { - uint32_t sumApp; - uint32_t sumHost; - uint32_t rowCount; + uint64_t sumApp; + uint64_t sumHost; + uint64_t rowCount; }; TRunArgs GetRunArgs(); TStatus CreateLogTable(TTableClient& client, const std::string& table); -TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage); +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage); TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, const TRetryOperationSettings& retrySettings); -TStatistic ScanQuerySelect(TTableClient& client, const std::string& path); +TStatistic Select(TTableClient& client, const std::string& path); void DropTable(TTableClient& client, const std::string& path); diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert_simple_it/main.cpp index d924340c8f..fe12e2a63c 100644 --- a/tests/integration/bulk_upsert_simple_it/main.cpp +++ b/tests/integration/bulk_upsert_simple_it/main.cpp @@ -8,7 +8,8 @@ TEST(Integration, BulkUpsert) { uint32_t correctSumApp = 0; uint32_t correctSumHost = 0; - std::set setMessage; + uint32_t correctRowCount = 0; + std::set setMessage; auto [driver, path] = GetRunArgs(); @@ -30,6 +31,7 @@ TEST(Integration, BulkUpsert) { auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, setMessage); correctSumApp += batchSumApp; correctSumHost += batchSumHost; + correctRowCount += batchRowCount; TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings); if (!statusWrite.IsSuccess()) { @@ -37,9 +39,9 @@ TEST(Integration, BulkUpsert) { } } - auto [sumApp, sumHost, rowCount] = ScanQuerySelect(client, path); + auto [sumApp, sumHost, rowCount] = Select(client, path); - EXPECT_EQ(rowCount, setMessage.size()); + EXPECT_EQ(rowCount, correctRowCount); EXPECT_EQ(sumApp, correctSumApp); EXPECT_EQ(sumHost, correctSumHost); From 16b6d5da47f84c9edebb7c28480a21fda1f8a8c1 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 1 Aug 2024 19:16:45 +0300 Subject: [PATCH 4/7] remove set --- .../bulk_upsert_simple_it/bulk_upsert.cpp | 36 +++++++++---------- .../bulk_upsert_simple_it/bulk_upsert.h | 17 ++++----- .../bulk_upsert_simple_it/main.cpp | 7 ++-- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp index 158bb0ae79..19d50933bf 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -23,10 +23,6 @@ static std::string JoinPath(const std::string& basePath, const std::string& path return prefixPathSplit; } -bool TLogMessage::TPrimaryKeyLogMessage::operator<(const TLogMessage::TPrimaryKeyLogMessage& o) const { - return App < o.App || App == o.App && Host < o.Host || App == o.App && Host == o.Host && Timestamp < o.Timestamp; -} - TRunArgs GetRunArgs() { std::string database = std::getenv("YDB_DATABASE"); @@ -41,18 +37,19 @@ TRunArgs GetRunArgs() { return {driver, JoinPath(database, "bulk")}; } -TStatus CreateLogTable(TTableClient& client, const std::string& table) { +TStatus CreateTable(TTableClient& client, const std::string& table) { std::cerr << "Create table " << table << "\n"; TRetryOperationSettings settings; auto status = client.RetryOperationSync([&table](TSession session) { auto tableDesc = TTableBuilder() + .AddNonNullableColumn("pk", EPrimitiveType::Uint64) .AddNullableColumn("App", EPrimitiveType::Utf8) .AddNullableColumn("Timestamp", EPrimitiveType::Timestamp) .AddNullableColumn("Host", EPrimitiveType::Utf8) .AddNullableColumn("HttpCode", EPrimitiveType::Uint32) .AddNullableColumn("Message", EPrimitiveType::Utf8) - .SetPrimaryKeyColumns({"App", "Host", "Timestamp"}) + .SetPrimaryKeyColumns({"pk"}) .Build(); return session.CreateTable(table, std::move(tableDesc)).GetValueSync(); @@ -61,27 +58,25 @@ TStatus CreateLogTable(TTableClient& client, const std::string& table) { return status; } -TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage) { +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, uint32_t lastNumber) { logBatch.clear(); uint32_t correctSumApp = 0; uint32_t correctSumHost = 0; uint32_t correctRowCount = 0; for (size_t i = 0; i < BATCH_SIZE; ++i) { - TLogMessage message; - message.pk.App = "App_" + std::to_string(logOffset % 10); - message.pk.Host = "192.168.0." + std::to_string(logOffset % 11); - message.pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); + TLogMessage message; + message.pk = correctRowCount + lastNumber; + message.App = "App_" + std::to_string(logOffset % 10); + message.Host = "192.168.0." + std::to_string(logOffset % 11); + message.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); message.HttpCode = 200; message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1"; logBatch.emplace_back(message); - if (!setMessage.contains(message.pk)) { - correctSumApp += logOffset % 10; - correctSumHost += logOffset % 11; - ++correctRowCount; - setMessage.insert(message.pk); - } + correctSumApp += logOffset % 10; + correctSumHost += logOffset % 11; + ++correctRowCount; } return {correctSumApp, correctSumHost, correctRowCount}; @@ -94,9 +89,10 @@ TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const for (const auto& message : logBatch) { rows.AddListItem() .BeginStruct() - .AddMember("App").Utf8(message.pk.App) - .AddMember("Host").Utf8(message.pk.Host) - .AddMember("Timestamp").Timestamp(message.pk.Timestamp) + .AddMember("pk").Uint64(message.pk) + .AddMember("App").Utf8(message.App) + .AddMember("Host").Utf8(message.Host) + .AddMember("Timestamp").Timestamp(message.Timestamp) .AddMember("HttpCode").Uint32(message.HttpCode) .AddMember("Message").Utf8(message.Message) .EndStruct(); diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h index b82836c64f..fb0f92c116 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -12,17 +12,12 @@ struct TRunArgs { }; struct TLogMessage { - struct TPrimaryKeyLogMessage { - std::string App; - std::string Host; - TInstant Timestamp; - bool operator<(const TPrimaryKeyLogMessage& o) const; - }; - - TPrimaryKeyLogMessage pk; + uint64_t pk; + std::string App; + std::string Host; + TInstant Timestamp; uint32_t HttpCode; std::string Message; - bool operator<(const TLogMessage& o) const {return pk < o.pk;}; }; class TYdbErrorException : public yexception { @@ -40,8 +35,8 @@ struct TStatistic { }; TRunArgs GetRunArgs(); -TStatus CreateLogTable(TTableClient& client, const std::string& table); -TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, std::set& setMessage); +TStatus CreateTable(TTableClient& client, const std::string& table); +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, uint32_t lastNumber); TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, const TRetryOperationSettings& retrySettings); TStatistic Select(TTableClient& client, const std::string& path); diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert_simple_it/main.cpp index fe12e2a63c..23ab59c0fd 100644 --- a/tests/integration/bulk_upsert_simple_it/main.cpp +++ b/tests/integration/bulk_upsert_simple_it/main.cpp @@ -2,20 +2,17 @@ #include -#include - TEST(Integration, BulkUpsert) { uint32_t correctSumApp = 0; uint32_t correctSumHost = 0; uint32_t correctRowCount = 0; - std::set setMessage; auto [driver, path] = GetRunArgs(); TTableClient client(driver); uint32_t count = 1000; - TStatus statusCreate = CreateLogTable(client, path); + TStatus statusCreate = CreateTable(client, path); if (!statusCreate.IsSuccess()) { FAIL() << "Create table failed with status: " << statusCreate << std::endl; } @@ -28,7 +25,7 @@ TEST(Integration, BulkUpsert) { std::vector logBatch; for (uint32_t offset = 0; offset < count; ++offset) { - auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, setMessage); + auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, correctRowCount); correctSumApp += batchSumApp; correctSumHost += batchSumHost; correctRowCount += batchRowCount; From 9f3244bc7fdcc7d4afcfdda12ee155308d6cfd7e Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 1 Aug 2024 23:18:13 +0300 Subject: [PATCH 5/7] fix after review --- .../bulk_upsert_simple_it/bulk_upsert.cpp | 23 ++++++++----------- .../bulk_upsert_simple_it/bulk_upsert.h | 23 +++++++++++-------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp index 19d50933bf..2d24cf8388 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -38,18 +38,16 @@ TRunArgs GetRunArgs() { } TStatus CreateTable(TTableClient& client, const std::string& table) { - std::cerr << "Create table " << table << "\n"; - TRetryOperationSettings settings; auto status = client.RetryOperationSync([&table](TSession session) { auto tableDesc = TTableBuilder() - .AddNonNullableColumn("pk", EPrimitiveType::Uint64) .AddNullableColumn("App", EPrimitiveType::Utf8) .AddNullableColumn("Timestamp", EPrimitiveType::Timestamp) .AddNullableColumn("Host", EPrimitiveType::Utf8) + .AddNonNullableColumn("Id", EPrimitiveType::Uint64) .AddNullableColumn("HttpCode", EPrimitiveType::Uint32) .AddNullableColumn("Message", EPrimitiveType::Utf8) - .SetPrimaryKeyColumns({"pk"}) + .SetPrimaryKeyColumns({"App", "Timestamp", "Host", "Id"}) .Build(); return session.CreateTable(table, std::move(tableDesc)).GetValueSync(); @@ -66,10 +64,10 @@ TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, u for (size_t i = 0; i < BATCH_SIZE; ++i) { TLogMessage message; - message.pk = correctRowCount + lastNumber; - message.App = "App_" + std::to_string(logOffset % 10); - message.Host = "192.168.0." + std::to_string(logOffset % 11); - message.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); + message.Pk.Id = correctRowCount + lastNumber; + message.Pk.App = "App_" + std::to_string(logOffset % 10); + message.Pk.Host = "192.168.0." + std::to_string(logOffset % 11); + message.Pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); message.HttpCode = 200; message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1"; logBatch.emplace_back(message); @@ -89,10 +87,10 @@ TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const for (const auto& message : logBatch) { rows.AddListItem() .BeginStruct() - .AddMember("pk").Uint64(message.pk) - .AddMember("App").Utf8(message.App) - .AddMember("Host").Utf8(message.Host) - .AddMember("Timestamp").Timestamp(message.Timestamp) + .AddMember("Id").Uint64(message.Pk.Id) + .AddMember("App").Utf8(message.Pk.App) + .AddMember("Host").Utf8(message.Pk.Host) + .AddMember("Timestamp").Timestamp(message.Pk.Timestamp) .AddMember("HttpCode").Uint32(message.HttpCode) .AddMember("Message").Utf8(message.Message) .EndStruct(); @@ -147,7 +145,6 @@ TStatistic Select(TTableClient& client, const std::string& path) { uint64_t rowCount = 0; if (parser.TryNextRow()) { - sumApp = *parser.ColumnParser("column0").GetOptionalInt64(); sumHost = *parser.ColumnParser("column1").GetOptionalInt64(); rowCount = parser.ColumnParser("column2").GetUint64(); diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h index fb0f92c116..5589126f15 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -7,15 +7,20 @@ using namespace NYdb; using namespace NYdb::NTable; struct TRunArgs { - TDriver driver; - std::string path; + TDriver Driver; + std::string Path; }; struct TLogMessage { - uint64_t pk; - std::string App; - std::string Host; - TInstant Timestamp; + struct TPrimaryKeyLogMessage { + std::string App; + std::string Host; + TInstant Timestamp; + uint64_t Id; + bool operator<(const TPrimaryKeyLogMessage& o) const; + }; + + TPrimaryKeyLogMessage Pk; uint32_t HttpCode; std::string Message; }; @@ -29,9 +34,9 @@ class TYdbErrorException : public yexception { }; struct TStatistic { - uint64_t sumApp; - uint64_t sumHost; - uint64_t rowCount; + uint64_t SumApp; + uint64_t SumHost; + uint64_t RowCount; }; TRunArgs GetRunArgs(); From b0aecbd57f1686d49e149c707ce5bc95917d1d8e Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 1 Aug 2024 23:45:39 +0300 Subject: [PATCH 6/7] valid query --- .../bulk_upsert_simple_it/bulk_upsert.cpp | 7 +++++-- .../bulk_upsert_simple_it/bulk_upsert.h | 1 + tests/integration/bulk_upsert_simple_it/main.cpp | 14 +++++++++----- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp index 2d24cf8388..90bf26207a 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -1,7 +1,5 @@ #include "bulk_upsert.h" -#include - #include static constexpr size_t BATCH_SIZE = 1000; @@ -144,6 +142,11 @@ TStatistic Select(TTableClient& client, const std::string& path) { uint64_t sumHost = 0; uint64_t rowCount = 0; + if (parser.ColumnsCount() != 3 || parser.RowsCount() != 1) { + throw TYdbErrorException(TStatus(EStatus::GENERIC_ERROR, + {NYql::TIssue("The number of columns should be: 3.\nThe number of rows should be: 1")})); + } + if (parser.TryNextRow()) { sumApp = *parser.ColumnParser("column0").GetOptionalInt64(); sumHost = *parser.ColumnParser("column1").GetOptionalInt64(); diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h index 5589126f15..180fefb08c 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -2,6 +2,7 @@ #include #include +#include using namespace NYdb; using namespace NYdb::NTable; diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert_simple_it/main.cpp index 23ab59c0fd..3fee4a942a 100644 --- a/tests/integration/bulk_upsert_simple_it/main.cpp +++ b/tests/integration/bulk_upsert_simple_it/main.cpp @@ -36,11 +36,15 @@ TEST(Integration, BulkUpsert) { } } - auto [sumApp, sumHost, rowCount] = Select(client, path); - - EXPECT_EQ(rowCount, correctRowCount); - EXPECT_EQ(sumApp, correctSumApp); - EXPECT_EQ(sumHost, correctSumHost); + try { + auto [sumApp, sumHost, rowCount] = Select(client, path); + EXPECT_EQ(rowCount, correctRowCount); + EXPECT_EQ(sumApp, correctSumApp); + EXPECT_EQ(sumHost, correctSumHost); + } catch (const TYdbErrorException& e) { + driver.Stop(true); + FAIL() << "Execution failed due to fatal error:\nStatus: " << ToString(e.Status.GetStatus()) << std::endl << e.Status.GetIssues().ToString(); + } DropTable(client, path); driver.Stop(true); From ffcb4ce8f241daadb5f19fa25b6cbeeebd5d0be4 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Fri, 2 Aug 2024 22:39:04 +0300 Subject: [PATCH 7/7] remove operator< --- tests/integration/bulk_upsert_simple_it/bulk_upsert.h | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h index 180fefb08c..3fcaf7d634 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -18,7 +18,6 @@ struct TLogMessage { std::string Host; TInstant Timestamp; uint64_t Id; - bool operator<(const TPrimaryKeyLogMessage& o) const; }; TPrimaryKeyLogMessage Pk;