From e4398a692717cdd9d631e1464dc3642cdb5dde98 Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Mon, 7 Jul 2025 17:49:03 +0200 Subject: [PATCH 1/2] Use arenas when BulkUpsert --- ydb/library/workload/tpcc/import.cpp | 121 ++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 23 deletions(-) diff --git a/ydb/library/workload/tpcc/import.cpp b/ydb/library/workload/tpcc/import.cpp index d5ee3533d15a..49b732504bdb 100644 --- a/ydb/library/workload/tpcc/import.cpp +++ b/ydb/library/workload/tpcc/import.cpp @@ -18,6 +18,8 @@ #include #include +#include + #include #include #include @@ -120,7 +122,12 @@ int GetRandomCount(int warehouseId, int customerId, int districtId) { //----------------------------------------------------------------------------- -NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& tableFullPath, TLog* Log) { +NTable::TBulkUpsertResult LoadItems( + NTable::TTableClient& client, + const TString& tableFullPath, + google::protobuf::Arena& arena, + TLog* Log) +{ LOG_T("Loading " << ITEM_COUNT << " items..."); auto valueBuilder = TValueBuilder(); @@ -151,7 +158,12 @@ NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -161,6 +173,7 @@ NTable::TBulkUpsertResult LoadWarehouses( const TString& tableFullPath, int startId, int lastId, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading warehouses " << startId << " to " << lastId); @@ -184,7 +197,12 @@ NTable::TBulkUpsertResult LoadWarehouses( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -195,6 +213,7 @@ NTable::TBulkUpsertResult LoadStock( int wh, int itemId, int itemsToLoad, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading stock for warehouse " << wh << " items " << itemId << " to " << (itemId + itemsToLoad - 1)); @@ -242,7 +261,12 @@ NTable::TBulkUpsertResult LoadStock( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -252,6 +276,7 @@ NTable::TBulkUpsertResult LoadDistricts( const TString& tableFullPath, int startId, int lastId, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading districts for warehouses " << startId << " to " << lastId); @@ -279,7 +304,12 @@ NTable::TBulkUpsertResult LoadDistricts( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -289,6 +319,7 @@ NTable::TBulkUpsertResult LoadCustomers( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading customers for warehouse " << wh << " district " << district); @@ -333,7 +364,12 @@ NTable::TBulkUpsertResult LoadCustomers( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -343,6 +379,7 @@ NTable::TBulkUpsertResult LoadCustomerHistory( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading customer history for warehouse " << wh << " district " << district); @@ -375,7 +412,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -385,6 +427,7 @@ NTable::TBulkUpsertResult LoadOpenOrders( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading open orders for warehouse " << wh << " district " << district); @@ -419,7 +462,12 @@ NTable::TBulkUpsertResult LoadOpenOrders( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -429,6 +477,7 @@ NTable::TBulkUpsertResult LoadNewOrders( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading new orders for warehouse " << wh << " district " << district); @@ -450,7 +499,12 @@ NTable::TBulkUpsertResult LoadNewOrders( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -460,6 +514,7 @@ NTable::TBulkUpsertResult LoadOrderLines( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading order lines for warehouse " << wh << " district " << district); @@ -503,7 +558,12 @@ NTable::TBulkUpsertResult LoadOrderLines( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -545,7 +605,13 @@ void ExecuteWithRetry(const TString& operationName, LoadFunc loadFunc, TLog* Log //----------------------------------------------------------------------------- -void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, TLog* Log) { +void LoadSmallTables( + TDriver& driver, + const TString& path, + int warehouseCount, + google::protobuf::Arena& arena, + TLog* Log) +{ NTable::TTableClient tableClient(driver); TString itemTablePath = path + "/" + TABLE_ITEM; @@ -553,13 +619,13 @@ void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, T TString districtTablePath = path + "/" + TABLE_DISTRICT; ExecuteWithRetry("LoadItems", [&]() { - return LoadItems(tableClient, itemTablePath, Log); + return LoadItems(tableClient, itemTablePath, arena, Log); }, Log); ExecuteWithRetry("LoadWarehouses", [&]() { - return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, Log); + return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, arena, Log); }, Log); ExecuteWithRetry("LoadDistricts", [&]() { - return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, Log); + return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, arena, Log); }, Log); } @@ -609,7 +675,15 @@ struct TLoadState { //----------------------------------------------------------------------------- -void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLoadState& state, TLog* Log) { +void LoadRange( + TDriver& driver, + const TString& path, + int whStart, + int whEnd, + TLoadState& state, + google::protobuf::Arena& arena, + TLog* Log) +{ NTable::TTableClient tableClient(driver); static_assert(ITEM_COUNT % 10 == 0, "ITEM_COUNT must be divisible by 10"); @@ -638,10 +712,10 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) { ExecuteWithRetry("LoadCustomers", [&]() { - return LoadCustomers(tableClient, customerTablePath, wh, district, Log); + return LoadCustomers(tableClient, customerTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadOpenOrders", [&]() { - return LoadOpenOrders(tableClient, oorderTablePath, wh, district, Log); + return LoadOpenOrders(tableClient, oorderTablePath, wh, district, arena, Log); }, Log); } state.DataSizeLoaded.fetch_add(indexedPerWh, std::memory_order_relaxed); @@ -661,19 +735,19 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo int startItemId = batch * itemBatchSize + 1; int itemsToLoad = itemBatchSize; ExecuteWithRetry("LoadStock", [&]() { - return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, Log); + return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, arena, Log); }, Log); } for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) { ExecuteWithRetry("LoadOrderLines", [&]() { - return LoadOrderLines(tableClient, orderLineTablePath, wh, district, Log); + return LoadOrderLines(tableClient, orderLineTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadCustomerHistory", [&]() { - return LoadCustomerHistory(tableClient, historyTablePath, wh, district, Log); + return LoadCustomerHistory(tableClient, historyTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadNewOrders", [&]() { - return LoadNewOrders(tableClient, newOrderTablePath, wh, district, Log); + return LoadNewOrders(tableClient, newOrderTablePath, wh, district, arena, Log); }, Log); } @@ -856,13 +930,14 @@ class TPCCLoader { int whEnd = (threadId + 1) * Config.WarehouseCount / threadCount; threads.emplace_back([threadId, &drivers, driverCount, this, whStart, whEnd]() { + google::protobuf::Arena arena; auto& driver = drivers[threadId % driverCount]; if (threadId == 0) { - LoadSmallTables(driver, Config.Path, Config.WarehouseCount, Log.get()); + LoadSmallTables(driver, Config.Path, Config.WarehouseCount, arena, Log.get()); } else { std::this_thread::sleep_for(std::chrono::milliseconds(threadId)); } - LoadRange(driver, Config.Path, whStart, whEnd, LoadState, Log.get()); + LoadRange(driver, Config.Path, whStart, whEnd, LoadState, arena, Log.get()); }); } From 62faf2fbc7406926da5b99079878afab174a07ae Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Mon, 7 Jul 2025 19:57:49 +0200 Subject: [PATCH 2/2] Add arena to value builder --- ydb/library/workload/tpcc/import.cpp | 18 ++++---- .../include/ydb-cpp-sdk/client/value/value.h | 6 ++- ydb/public/sdk/cpp/src/client/value/value.cpp | 42 +++++++++++++++---- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/ydb/library/workload/tpcc/import.cpp b/ydb/library/workload/tpcc/import.cpp index 49b732504bdb..93e88533d734 100644 --- a/ydb/library/workload/tpcc/import.cpp +++ b/ydb/library/workload/tpcc/import.cpp @@ -130,7 +130,7 @@ NTable::TBulkUpsertResult LoadItems( { LOG_T("Loading " << ITEM_COUNT << " items..."); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int i = 1; i <= ITEM_COUNT; ++i) { @@ -178,7 +178,7 @@ NTable::TBulkUpsertResult LoadWarehouses( { LOG_T("Loading warehouses " << startId << " to " << lastId); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) { @@ -218,7 +218,7 @@ NTable::TBulkUpsertResult LoadStock( { LOG_T("Loading stock for warehouse " << wh << " items " << itemId << " to " << (itemId + itemsToLoad - 1)); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int i = 0; i < itemsToLoad; ++i) { @@ -281,7 +281,7 @@ NTable::TBulkUpsertResult LoadDistricts( { LOG_T("Loading districts for warehouses " << startId << " to " << lastId); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) { @@ -324,7 +324,7 @@ NTable::TBulkUpsertResult LoadCustomers( { LOG_T("Loading customers for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int customerId = 1; customerId <= CUSTOMERS_PER_DISTRICT; ++customerId) { @@ -384,7 +384,7 @@ NTable::TBulkUpsertResult LoadCustomerHistory( { LOG_T("Loading customer history for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); i64 prevTs = 0; @@ -432,7 +432,7 @@ NTable::TBulkUpsertResult LoadOpenOrders( { LOG_T("Loading open orders for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); // TPC-C 4.3.3.1: o_c_id must be a permutation of [1, customersPerDistrict] @@ -482,7 +482,7 @@ NTable::TBulkUpsertResult LoadNewOrders( { LOG_T("Loading new orders for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); static_assert(FIRST_UNPROCESSED_O_ID < CUSTOMERS_PER_DISTRICT, @@ -519,7 +519,7 @@ NTable::TBulkUpsertResult LoadOrderLines( { LOG_T("Loading order lines for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int orderId = 1; orderId <= CUSTOMERS_PER_DISTRICT; ++orderId) { diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h index 3b24aaca9047..797cba3b42f4 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h @@ -4,6 +4,8 @@ #include +#include + #include #include @@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly { protected: TValueBuilderBase(TValueBuilderBase&&); - TValueBuilderBase(); + TValueBuilderBase(google::protobuf::Arena* arena = nullptr); TValueBuilderBase(const TType& type); @@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly { class TValueBuilder : public TValueBuilderBase { public: - TValueBuilder(); + TValueBuilder(google::protobuf::Arena* arena = nullptr); TValueBuilder(const TType& type); diff --git a/ydb/public/sdk/cpp/src/client/value/value.cpp b/ydb/public/sdk/cpp/src/client/value/value.cpp index 47ae325b35ff..fab21bb6b305 100644 --- a/ydb/public/sdk/cpp/src/client/value/value.cpp +++ b/ydb/public/sdk/cpp/src/client/value/value.cpp @@ -2061,12 +2061,24 @@ class TValueBuilderImpl { public: TValueBuilderImpl() : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) + { + PushPath(ProtoValue_); + } + + TValueBuilderImpl(google::protobuf::Arena* arena) + : TypeBuilder_() + , Arena(arena) + , ProtoValue_(*google::protobuf::Arena::CreateMessage(Arena)) { PushPath(ProtoValue_); } TValueBuilderImpl(const TType& type) : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(ProtoValue_); GetType().CopyFrom(type.GetProto()); @@ -2074,6 +2086,8 @@ class TValueBuilderImpl { TValueBuilderImpl(Ydb::Type& type, Ydb::Value& value) : TypeBuilder_(type) + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(value); } @@ -2088,10 +2102,15 @@ class TValueBuilderImpl { TValue BuildValue() { CheckValue(); - Ydb::Value value; - value.Swap(&ProtoValue_); - - return TValue(TypeBuilder_.Build(), std::move(value)); + if (Arena) { + auto* value = google::protobuf::Arena::CreateMessage(Arena); + value->Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), value); + } else { + Ydb::Value value; + value.Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), std::move(value)); + } } void Bool(bool value) { @@ -2803,7 +2822,12 @@ class TValueBuilderImpl { private: //TTypeBuilder TypeBuilder_; TTypeBuilder::TImpl TypeBuilder_; - Ydb::Value ProtoValue_; + google::protobuf::Arena* Arena; + Ydb::Value ProtoValueHeap; + + // either ProtoValueHeap or a reference to the arena allocated protobuf + Ydb::Value& ProtoValue_; + std::map StructsMap_; TStackVec Path_; @@ -2819,8 +2843,8 @@ template TValueBuilderBase::~TValueBuilderBase() = default; template -TValueBuilderBase::TValueBuilderBase() - : Impl_(new TValueBuilderImpl()) {} +TValueBuilderBase::TValueBuilderBase(google::protobuf::Arena* arena) + : Impl_(new TValueBuilderImpl(arena)) {} template TValueBuilderBase::TValueBuilderBase(const TType& type) @@ -3382,8 +3406,8 @@ template class TValueBuilderBase; //////////////////////////////////////////////////////////////////////////////// -TValueBuilder::TValueBuilder() - : TValueBuilderBase() {} +TValueBuilder::TValueBuilder(google::protobuf::Arena* arena) + : TValueBuilderBase(arena) {} TValueBuilder::TValueBuilder(const TType& type) : TValueBuilderBase(type) {}