Skip to content

Commit 456154d

Browse files
scan policy sql control (#12400)
1 parent d299ffe commit 456154d

File tree

13 files changed

+73
-31
lines changed

13 files changed

+73
-31
lines changed

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_opt.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
1010
return TConclusionStatus::Fail("Incorrect value for SCHEME_NEED_ACTUALIZATION: cannot parse as boolean");
1111
}
1212
SchemeNeedActualization = *value;
13-
ExternalGuaranteeExclusivePK = features.Extract<bool>("EXTERNAL_GUARANTEE_EXCLUSIVE_PK");
13+
ScanReaderPolicyName = features.Extract<TString>("SCAN_READER_POLICY_NAME");
14+
if (ScanReaderPolicyName) {
15+
if (*ScanReaderPolicyName != "PLAIN" && *ScanReaderPolicyName != "SIMPLE") {
16+
return TConclusionStatus::Fail("SCAN_READER_POLICY_NAME have to be in ['PLAIN', 'SIMPLE']");
17+
}
18+
}
1419
if (const auto className = features.Extract<TString>("COMPACTION_PLANNER.CLASS_NAME")) {
1520
if (!CompactionPlannerConstructor.Initialize(*className)) {
1621
return TConclusionStatus::Fail("incorrect class name for compaction planner:" + *className);
@@ -52,8 +57,8 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
5257

5358
void TUpsertOptionsOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
5459
schemaData.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
55-
if (ExternalGuaranteeExclusivePK) {
56-
schemaData.MutableOptions()->SetExternalGuaranteeExclusivePK(*ExternalGuaranteeExclusivePK);
60+
if (ScanReaderPolicyName) {
61+
schemaData.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
5762
}
5863
if (CompactionPlannerConstructor.HasObject()) {
5964
CompactionPlannerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableCompactionPlannerConstructor());

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_opt.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class TUpsertOptionsOperation: public ITableStoreOperation {
1313
static inline const auto Registrator = TFactory::TRegistrator<TUpsertOptionsOperation>(GetTypeName());
1414
private:
1515
bool SchemeNeedActualization = false;
16-
std::optional<bool> ExternalGuaranteeExclusivePK;
16+
std::optional<TString> ScanReaderPolicyName;
1717
NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer CompactionPlannerConstructor;
1818
NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer MetadataManagerConstructor;
1919
public:

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,33 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
7575
Cout << result << Endl;
7676
CompareYson(result, R"([[23000u;]])");
7777
}
78+
79+
{
80+
auto alterQuery = TStringBuilder() <<
81+
R"(
82+
ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
83+
)";
84+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
85+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
86+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
87+
}
88+
89+
{
90+
auto it = tableClient
91+
.StreamExecuteScanQuery(R"(
92+
--!syntax_v1
93+
94+
SELECT
95+
COUNT(*)
96+
FROM `/Root/olapStore/olapTable`
97+
)")
98+
.GetValueSync();
99+
100+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
101+
TString result = StreamResultToYson(it);
102+
Cout << result << Endl;
103+
CompareYson(result, R"([[23000u;]])");
104+
}
78105
}
79106

80107
Y_UNIT_TEST(AggregationCountPushdown) {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,9 @@ message TMetadataManagerConstructorContainer {
516516

517517
message TColumnTableSchemeOptions {
518518
optional bool SchemeNeedActualization = 1 [default = false];
519-
optional bool ExternalGuaranteeExclusivePK = 2 [default = false];
520519
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
521520
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
521+
optional string ScanReaderPolicyName = 5;
522522
}
523523

524524
message TColumnTableSchema {
@@ -560,9 +560,9 @@ message TColumnTableSchemaDiff {
560560

561561
message TColumnTableRequestedOptions {
562562
optional bool SchemeNeedActualization = 1 [default = false];
563-
optional bool ExternalGuaranteeExclusivePK = 2;
564563
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
565564
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
565+
optional string ScanReaderPolicyName = 5;
566566
}
567567

568568
message TAlterColumnTableSchema {

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ class TReadMetadataBase {
116116
return ResultIndexSchema;
117117
}
118118

119-
bool HasGuaranteeExclusivePK() const {
120-
return GetIndexInfo().GetExternalGuaranteeExclusivePK();
121-
}
122-
123119
ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;
124120

125121
const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
8585
break;
8686
}
8787
}
88-
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
88+
if ((MergingContext->IsExclusiveInterval()) &&
8989
sourcesInMemory) {
9090
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
9191
auto& container = Sources.begin()->second->GetStageResult().GetBatch();

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
7070
}
7171

7272
TConclusionStatus TScanHead::Start() {
73-
const bool guaranteeExclusivePK = Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK();
7473
TScanContext context;
7574
for (auto itPoint = BorderPoints.begin(); itPoint != BorderPoints.end(); ++itPoint) {
7675
auto& point = itPoint->second;
@@ -82,8 +81,7 @@ TConclusionStatus TScanHead::Start() {
8281
}
8382
const bool isExclusive = context.GetCurrentSources().size() == 1;
8483
for (auto&& i : context.GetCurrentSources()) {
85-
i.second->SetExclusiveIntervalOnly(
86-
(isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()) || guaranteeExclusivePK);
84+
i.second->SetExclusiveIntervalOnly((isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()));
8785
}
8886

8987
for (auto&& i : point.GetFinishSources()) {

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,22 @@ void TTxScan::Complete(const TActorContext& ctx) {
6363
read.PathId = request.GetLocalPathId();
6464
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
6565
read.TableName = table;
66+
6667
const TString defaultReader =
67-
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
68+
[&]() {
69+
const TString defGlobal =
70+
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
71+
if (Self->HasIndex()) {
72+
return Self->GetIndexAs<TColumnEngineForLogs>()
73+
.GetVersionedIndex()
74+
.GetLastSchema()
75+
->GetIndexInfo()
76+
.GetScanReaderPolicyName()
77+
.value_or(defGlobal);
78+
} else {
79+
return defGlobal;
80+
}
81+
}();
6882
std::unique_ptr<IScannerConstructor> scannerConstructor = [&]() {
6983
auto sysViewPolicy = NSysView::NAbstract::ISysViewPolicy::BuildByPath(read.TableName);
7084
if (!sysViewPolicy) {

ydb/core/tx/columnshard/engines/scheme/index_info.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId)
181181
void TIndexInfo::DeserializeOptionsFromProto(const NKikimrSchemeOp::TColumnTableSchemeOptions& optionsProto) {
182182
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Options");
183183
SchemeNeedActualization = optionsProto.GetSchemeNeedActualization();
184-
ExternalGuaranteeExclusivePK = optionsProto.GetExternalGuaranteeExclusivePK();
184+
if (optionsProto.HasScanReaderPolicyName()) {
185+
ScanReaderPolicyName = optionsProto.GetScanReaderPolicyName();
186+
}
185187
if (optionsProto.HasCompactionPlannerConstructor()) {
186188
auto container =
187189
NStorageOptimizer::TOptimizerPlannerConstructorContainer::BuildFromProto(optionsProto.GetCompactionPlannerConstructor());

ydb/core/tx/columnshard/engines/scheme/index_info.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ struct TIndexInfo: public IIndexInfo {
104104
bool SchemeNeedActualization = false;
105105
std::shared_ptr<NStorageOptimizer::IOptimizerPlannerConstructor> CompactionPlannerConstructor;
106106
std::shared_ptr<NDataAccessorControl::IManagerConstructor> MetadataManagerConstructor;
107-
bool ExternalGuaranteeExclusivePK = false;
107+
std::optional<TString> ScanReaderPolicyName;
108108

109109
ui64 Version = 0;
110110
std::vector<ui32> SchemaColumnIds;
@@ -215,8 +215,8 @@ struct TIndexInfo: public IIndexInfo {
215215
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueVerified(const ui32 colId) const;
216216
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueByIndexVerified(const ui32 colIndex) const;
217217

218-
bool GetExternalGuaranteeExclusivePK() const {
219-
return ExternalGuaranteeExclusivePK;
218+
const std::optional<TString>& GetScanReaderPolicyName() const {
219+
return ScanReaderPolicyName;
220220
}
221221

222222
const TColumnFeatures& GetColumnFeaturesVerified(const ui32 columnId) const {

0 commit comments

Comments
 (0)