diff --git a/ydb/library/workload/tpcc/import.cpp b/ydb/library/workload/tpcc/import.cpp index d5ee3533d15a..93e88533d734 100644 --- a/ydb/library/workload/tpcc/import.cpp +++ b/ydb/library/workload/tpcc/import.cpp @@ -18,6 +18,8 @@ #include #include +#include + #include #include #include @@ -120,10 +122,15 @@ int GetRandomCount(int warehouseId, int customerId, int districtId) { //----------------------------------------------------------------------------- -NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& tableFullPath, TLog* Log) { +NTable::TBulkUpsertResult LoadItems( + NTable::TTableClient& client, + const TString& tableFullPath, + google::protobuf::Arena& arena, + TLog* Log) +{ LOG_T("Loading " << ITEM_COUNT << " items..."); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int i = 1; i <= ITEM_COUNT; ++i) { @@ -151,7 +158,12 @@ NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -161,11 +173,12 @@ NTable::TBulkUpsertResult LoadWarehouses( const TString& tableFullPath, int startId, int lastId, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading warehouses " << startId << " to " << lastId); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) { @@ -184,7 +197,12 @@ NTable::TBulkUpsertResult LoadWarehouses( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -195,11 +213,12 @@ NTable::TBulkUpsertResult LoadStock( int wh, int itemId, int itemsToLoad, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading stock for warehouse " << wh << " items " << itemId << " to " << (itemId + itemsToLoad - 1)); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int i = 0; i < itemsToLoad; ++i) { @@ -242,7 +261,12 @@ NTable::TBulkUpsertResult LoadStock( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -252,11 +276,12 @@ NTable::TBulkUpsertResult LoadDistricts( const TString& tableFullPath, int startId, int lastId, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading districts for warehouses " << startId << " to " << lastId); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) { @@ -279,7 +304,12 @@ NTable::TBulkUpsertResult LoadDistricts( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -289,11 +319,12 @@ NTable::TBulkUpsertResult LoadCustomers( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading customers for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int customerId = 1; customerId <= CUSTOMERS_PER_DISTRICT; ++customerId) { @@ -333,7 +364,12 @@ NTable::TBulkUpsertResult LoadCustomers( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -343,11 +379,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading customer history for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); i64 prevTs = 0; @@ -375,7 +412,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -385,11 +427,12 @@ NTable::TBulkUpsertResult LoadOpenOrders( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading open orders for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); // TPC-C 4.3.3.1: o_c_id must be a permutation of [1, customersPerDistrict] @@ -419,7 +462,12 @@ NTable::TBulkUpsertResult LoadOpenOrders( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -429,11 +477,12 @@ NTable::TBulkUpsertResult LoadNewOrders( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading new orders for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); static_assert(FIRST_UNPROCESSED_O_ID < CUSTOMERS_PER_DISTRICT, @@ -450,7 +499,12 @@ NTable::TBulkUpsertResult LoadNewOrders( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -460,11 +514,12 @@ NTable::TBulkUpsertResult LoadOrderLines( const TString& tableFullPath, int wh, int district, + google::protobuf::Arena& arena, TLog* Log) { LOG_T("Loading order lines for warehouse " << wh << " district " << district); - auto valueBuilder = TValueBuilder(); + auto valueBuilder = TValueBuilder(&arena); valueBuilder.BeginList(); for (int orderId = 1; orderId <= CUSTOMERS_PER_DISTRICT; ++orderId) { @@ -503,7 +558,12 @@ NTable::TBulkUpsertResult LoadOrderLines( } valueBuilder.EndList(); - return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync(); + + NTable::TBulkUpsertSettings bulkSettings; + bulkSettings.Arena(&arena); + auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync(); + arena.Reset(); + return result; } //----------------------------------------------------------------------------- @@ -545,7 +605,13 @@ void ExecuteWithRetry(const TString& operationName, LoadFunc loadFunc, TLog* Log //----------------------------------------------------------------------------- -void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, TLog* Log) { +void LoadSmallTables( + TDriver& driver, + const TString& path, + int warehouseCount, + google::protobuf::Arena& arena, + TLog* Log) +{ NTable::TTableClient tableClient(driver); TString itemTablePath = path + "/" + TABLE_ITEM; @@ -553,13 +619,13 @@ void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, T TString districtTablePath = path + "/" + TABLE_DISTRICT; ExecuteWithRetry("LoadItems", [&]() { - return LoadItems(tableClient, itemTablePath, Log); + return LoadItems(tableClient, itemTablePath, arena, Log); }, Log); ExecuteWithRetry("LoadWarehouses", [&]() { - return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, Log); + return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, arena, Log); }, Log); ExecuteWithRetry("LoadDistricts", [&]() { - return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, Log); + return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, arena, Log); }, Log); } @@ -609,7 +675,15 @@ struct TLoadState { //----------------------------------------------------------------------------- -void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLoadState& state, TLog* Log) { +void LoadRange( + TDriver& driver, + const TString& path, + int whStart, + int whEnd, + TLoadState& state, + google::protobuf::Arena& arena, + TLog* Log) +{ NTable::TTableClient tableClient(driver); static_assert(ITEM_COUNT % 10 == 0, "ITEM_COUNT must be divisible by 10"); @@ -638,10 +712,10 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) { ExecuteWithRetry("LoadCustomers", [&]() { - return LoadCustomers(tableClient, customerTablePath, wh, district, Log); + return LoadCustomers(tableClient, customerTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadOpenOrders", [&]() { - return LoadOpenOrders(tableClient, oorderTablePath, wh, district, Log); + return LoadOpenOrders(tableClient, oorderTablePath, wh, district, arena, Log); }, Log); } state.DataSizeLoaded.fetch_add(indexedPerWh, std::memory_order_relaxed); @@ -661,19 +735,19 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo int startItemId = batch * itemBatchSize + 1; int itemsToLoad = itemBatchSize; ExecuteWithRetry("LoadStock", [&]() { - return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, Log); + return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, arena, Log); }, Log); } for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) { ExecuteWithRetry("LoadOrderLines", [&]() { - return LoadOrderLines(tableClient, orderLineTablePath, wh, district, Log); + return LoadOrderLines(tableClient, orderLineTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadCustomerHistory", [&]() { - return LoadCustomerHistory(tableClient, historyTablePath, wh, district, Log); + return LoadCustomerHistory(tableClient, historyTablePath, wh, district, arena, Log); }, Log); ExecuteWithRetry("LoadNewOrders", [&]() { - return LoadNewOrders(tableClient, newOrderTablePath, wh, district, Log); + return LoadNewOrders(tableClient, newOrderTablePath, wh, district, arena, Log); }, Log); } @@ -856,13 +930,14 @@ class TPCCLoader { int whEnd = (threadId + 1) * Config.WarehouseCount / threadCount; threads.emplace_back([threadId, &drivers, driverCount, this, whStart, whEnd]() { + google::protobuf::Arena arena; auto& driver = drivers[threadId % driverCount]; if (threadId == 0) { - LoadSmallTables(driver, Config.Path, Config.WarehouseCount, Log.get()); + LoadSmallTables(driver, Config.Path, Config.WarehouseCount, arena, Log.get()); } else { std::this_thread::sleep_for(std::chrono::milliseconds(threadId)); } - LoadRange(driver, Config.Path, whStart, whEnd, LoadState, Log.get()); + LoadRange(driver, Config.Path, whStart, whEnd, LoadState, arena, Log.get()); }); } diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h index 3b24aaca9047..797cba3b42f4 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h @@ -4,6 +4,8 @@ #include +#include + #include #include @@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly { protected: TValueBuilderBase(TValueBuilderBase&&); - TValueBuilderBase(); + TValueBuilderBase(google::protobuf::Arena* arena = nullptr); TValueBuilderBase(const TType& type); @@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly { class TValueBuilder : public TValueBuilderBase { public: - TValueBuilder(); + TValueBuilder(google::protobuf::Arena* arena = nullptr); TValueBuilder(const TType& type); diff --git a/ydb/public/sdk/cpp/src/client/value/value.cpp b/ydb/public/sdk/cpp/src/client/value/value.cpp index 47ae325b35ff..fab21bb6b305 100644 --- a/ydb/public/sdk/cpp/src/client/value/value.cpp +++ b/ydb/public/sdk/cpp/src/client/value/value.cpp @@ -2061,12 +2061,24 @@ class TValueBuilderImpl { public: TValueBuilderImpl() : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) + { + PushPath(ProtoValue_); + } + + TValueBuilderImpl(google::protobuf::Arena* arena) + : TypeBuilder_() + , Arena(arena) + , ProtoValue_(*google::protobuf::Arena::CreateMessage(Arena)) { PushPath(ProtoValue_); } TValueBuilderImpl(const TType& type) : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(ProtoValue_); GetType().CopyFrom(type.GetProto()); @@ -2074,6 +2086,8 @@ class TValueBuilderImpl { TValueBuilderImpl(Ydb::Type& type, Ydb::Value& value) : TypeBuilder_(type) + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(value); } @@ -2088,10 +2102,15 @@ class TValueBuilderImpl { TValue BuildValue() { CheckValue(); - Ydb::Value value; - value.Swap(&ProtoValue_); - - return TValue(TypeBuilder_.Build(), std::move(value)); + if (Arena) { + auto* value = google::protobuf::Arena::CreateMessage(Arena); + value->Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), value); + } else { + Ydb::Value value; + value.Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), std::move(value)); + } } void Bool(bool value) { @@ -2803,7 +2822,12 @@ class TValueBuilderImpl { private: //TTypeBuilder TypeBuilder_; TTypeBuilder::TImpl TypeBuilder_; - Ydb::Value ProtoValue_; + google::protobuf::Arena* Arena; + Ydb::Value ProtoValueHeap; + + // either ProtoValueHeap or a reference to the arena allocated protobuf + Ydb::Value& ProtoValue_; + std::map StructsMap_; TStackVec Path_; @@ -2819,8 +2843,8 @@ template TValueBuilderBase::~TValueBuilderBase() = default; template -TValueBuilderBase::TValueBuilderBase() - : Impl_(new TValueBuilderImpl()) {} +TValueBuilderBase::TValueBuilderBase(google::protobuf::Arena* arena) + : Impl_(new TValueBuilderImpl(arena)) {} template TValueBuilderBase::TValueBuilderBase(const TType& type) @@ -3382,8 +3406,8 @@ template class TValueBuilderBase; //////////////////////////////////////////////////////////////////////////////// -TValueBuilder::TValueBuilder() - : TValueBuilderBase() {} +TValueBuilder::TValueBuilder(google::protobuf::Arena* arena) + : TValueBuilderBase(arena) {} TValueBuilder::TValueBuilder(const TType& type) : TValueBuilderBase(type) {}