Skip to content

Commit 7e9ca33

Browse files
Split portion and chunks (#11386)
1 parent de53af3 commit 7e9ca33

File tree

127 files changed

+2890
-1325
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+2890
-1325
lines changed

.github/config/muted_ya.txt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
1818
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
1919
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
2020
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-Streaming
21+
ydb/core/tx/columnshard/engines/ut TColumnEngineTestLogs.IndexTtl
2122
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
2223
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
2324
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts
@@ -31,10 +32,7 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
3132
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
3233
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
3334
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
34-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
35-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
36-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
37-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesModificationError
35+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
3836
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
3937
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
4038
ydb/core/kqp/ut/olap [*/*] chunk chunk

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_opt.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,24 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
2929
}
3030
}
3131

32+
if (const auto className = features.Extract<TString>("METADATA_MEMORY_MANAGER.CLASS_NAME")) {
33+
if (!MetadataManagerConstructor.Initialize(*className)) {
34+
return TConclusionStatus::Fail("incorrect class name for metadata manager:" + *className);
35+
}
36+
37+
NJson::TJsonValue jsonData = NJson::JSON_MAP;
38+
auto fValue = features.Extract("METADATA_MEMORY_MANAGER.FEATURES");
39+
if (fValue) {
40+
if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) {
41+
return TConclusionStatus::Fail("incorrect json in request METADATA_MEMORY_MANAGER.FEATURES parameter");
42+
}
43+
}
44+
auto result = MetadataManagerConstructor->DeserializeFromJson(jsonData);
45+
if (result.IsFail()) {
46+
return result;
47+
}
48+
}
49+
3250
return TConclusionStatus::Success();
3351
}
3452

@@ -40,6 +58,9 @@ void TUpsertOptionsOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTab
4058
if (CompactionPlannerConstructor.HasObject()) {
4159
CompactionPlannerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableCompactionPlannerConstructor());
4260
}
61+
if (MetadataManagerConstructor.HasObject()) {
62+
MetadataManagerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableMetadataManagerConstructor());
63+
}
4364
}
4465

4566
}

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_opt.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#include "abstract.h"
22
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
3+
#include <ydb/core/tx/columnshard/data_accessor/abstract/constructor.h>
34

45
namespace NKikimr::NKqp {
56

6-
class TUpsertOptionsOperation : public ITableStoreOperation {
7+
class TUpsertOptionsOperation: public ITableStoreOperation {
78
private:
89
static TString GetTypeName() {
910
return "UPSERT_OPTIONS";
@@ -14,6 +15,7 @@ class TUpsertOptionsOperation : public ITableStoreOperation {
1415
bool SchemeNeedActualization = false;
1516
std::optional<bool> ExternalGuaranteeExclusivePK;
1617
NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer CompactionPlannerConstructor;
18+
NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer MetadataManagerConstructor;
1719
public:
1820
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
1921

ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ PEERDIR(
1515
ydb/services/metadata/manager
1616
ydb/core/formats/arrow/serializer
1717
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
18+
ydb/core/tx/columnshard/data_accessor/abstract
1819
ydb/core/kqp/gateway/utils
1920
ydb/core/protos
2021
)

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2719,6 +2719,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
27192719

27202720
}
27212721

2722+
Y_UNIT_TEST(MetadataMemoryManager) {
2723+
auto settings = TKikimrSettings().SetWithSampleTables(false);
2724+
TKikimrRunner kikimr(settings);
2725+
2726+
TLocalHelper(kikimr).CreateTestOlapTable();
2727+
auto tableClient = kikimr.GetTableClient();
2728+
2729+
// Tests::NCommon::TLoggerInit(kikimr).Initialize();
2730+
2731+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
2732+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
2733+
2734+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
2735+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
2736+
{
2737+
auto it = tableClient
2738+
.StreamExecuteScanQuery(R"(
2739+
--!syntax_v1
2740+
2741+
SELECT
2742+
COUNT(*)
2743+
FROM `/Root/olapStore/olapTable`
2744+
)")
2745+
.GetValueSync();
2746+
2747+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2748+
TString result = StreamResultToYson(it);
2749+
Cout << result << Endl;
2750+
CompareYson(result, R"([[20000u;]])");
2751+
}
2752+
{
2753+
auto alterQuery =
2754+
TStringBuilder() <<
2755+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `METADATA_MEMORY_MANAGER.CLASS_NAME`=`local_db`,
2756+
`METADATA_MEMORY_MANAGER.FEATURES`=`{"memory_cache_size" : 0}`);
2757+
)";
2758+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2759+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2760+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
2761+
}
2762+
{
2763+
auto it = tableClient
2764+
.StreamExecuteScanQuery(R"(
2765+
--!syntax_v1
2766+
2767+
SELECT
2768+
COUNT(*)
2769+
FROM `/Root/olapStore/olapTable`
2770+
)")
2771+
.GetValueSync();
2772+
2773+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2774+
TString result = StreamResultToYson(it);
2775+
Cout << result << Endl;
2776+
CompareYson(result, R"([[20000u;]])");
2777+
}
2778+
}
2779+
27222780
Y_UNIT_TEST(NormalizeAbsentColumn) {
27232781
auto settings = TKikimrSettings().SetWithSampleTables(false);
27242782
TKikimrRunner kikimr(settings);

ydb/core/protos/counters_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,5 @@ enum ETxTypes {
202202
TXTYPE_APPLY_NORMALIZER = 35 [(TxTypeOpts) = {Name: "TxApplyNormalizer"}];
203203
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
204204
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
205+
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
205206
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,28 @@ message TCompactionPlannerConstructorContainer {
465465
}
466466
}
467467

468+
message TMetadataManagerConstructorContainer {
469+
optional string ClassName = 1;
470+
471+
message TInMem {
472+
}
473+
474+
message TLocalDB {
475+
optional uint64 MemoryCacheSize = 1 [default = 128000000];
476+
optional bool FetchOnStart = 2 [default = false];
477+
}
478+
479+
oneof Implementation {
480+
TInMem InMem = 20;
481+
TLocalDB LocalDB = 21;
482+
}
483+
}
484+
468485
message TColumnTableSchemeOptions {
469486
optional bool SchemeNeedActualization = 1 [default = false];
470487
optional bool ExternalGuaranteeExclusivePK = 2 [default = false];
471488
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
489+
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
472490
}
473491

474492
message TColumnTableSchema {
@@ -508,6 +526,7 @@ message TColumnTableRequestedOptions {
508526
optional bool SchemeNeedActualization = 1 [default = false];
509527
optional bool ExternalGuaranteeExclusivePK = 2;
510528
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
529+
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
511530
}
512531

513532
message TAlterColumnTableSchema {

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
2525
for (auto&& portion : pack.MutablePortions()) {
2626
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
2727
static TAtomicCounter Counter = 0;
28-
portion.GetPortionInfoConstructor()->SetInsertWriteId((TInsertWriteId)Counter.Inc());
28+
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId((TInsertWriteId)Counter.Inc());
2929
} else {
30-
portion.GetPortionInfoConstructor()->SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
30+
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
3131
}
32-
pack.AddInsertWriteId(portion.GetPortionInfoConstructor()->GetInsertWriteIdVerified());
32+
pack.AddInsertWriteId(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
3333
portion.Finalize(Self, txc);
3434
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
3535
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
@@ -99,7 +99,7 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
9999
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
100100
}
101101
}
102-
granule.InsertPortionOnComplete(portion.GetPortionInfo().MutablePortionInfoPtr());
102+
granule.InsertPortionOnComplete(portion.GetPortionInfo(), index);
103103
}
104104
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
105105
AFL_VERIFY(CommitSnapshot);

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
5757
void TColumnShard::SwitchToWork(const TActorContext& ctx) {
5858
{
5959
const TLogContextGuard gLogging =
60-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId());
60+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork");
6161
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");
6262

6363
for (auto&& i : TablesManager.GetTables()) {
@@ -109,7 +109,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
109109
ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
110110
BufferizationWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId()));
111111
DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId()));
112-
DataAccessorsManager = std::make_shared<NOlap::NDataAccessorControl::TActorAccessorsManager>(DataAccessorsControlActorId),
112+
DataAccessorsManager = std::make_shared<NOlap::NDataAccessorControl::TActorAccessorsManager>(DataAccessorsControlActorId, SelfId()),
113113

114114
PrioritizationClientId = NPrioritiesQueue::TCompServiceOperator::RegisterClient();
115115
Execute(CreateTxInitSchema(), ctx);

0 commit comments

Comments
 (0)