Skip to content

Commit f1686eb

Browse files
authored
Stable 25 1 analytics comp scan fixes (#19410)
2 parents 90d1a21 + ffa0d7b commit f1686eb

File tree

85 files changed

+811
-433
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+811
-433
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArrayTrivial() con
127127
}
128128

129129
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray(const TColumnConstructionContext& context) const {
130+
if (context.GetStartIndex() || context.GetRecordsCount()) {
131+
const ui32 start = context.GetStartIndex().value_or(0);
132+
const ui32 count = context.GetRecordsCount().value_or(GetRecordsCount() - start);
133+
AFL_VERIFY(start + count <= GetRecordsCount())("start", start)("count", count)("records_count", GetRecordsCount());
134+
}
135+
136+
return DoGetChunkedArray(context);
137+
}
138+
139+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::DoGetChunkedArray(const TColumnConstructionContext& context) const {
130140
if (context.GetStartIndex() || context.GetRecordsCount()) {
131141
const ui32 start = context.GetStartIndex().value_or(0);
132142
const ui32 count = context.GetRecordsCount().value_or(GetRecordsCount() - start);

ydb/core/formats/arrow/accessor/abstract/accessor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ class IChunkedArray {
338338
AFL_VERIFY(false)("pos", position)("count", GetRecordsCount())("chunks_map", sb)("chunk_current", chunkCurrentInfo);
339339
}
340340

341+
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray(const TColumnConstructionContext& context) const;
342+
341343
public:
342344
std::shared_ptr<IChunkedArray> ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const;
343345

@@ -453,7 +455,7 @@ class IChunkedArray {
453455
return *result;
454456
}
455457

456-
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(
458+
std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(
457459
const TColumnConstructionContext& context = Default<TColumnConstructionContext>()) const;
458460
virtual ~IChunkedArray() = default;
459461

ydb/core/formats/arrow/accessor/composite/accessor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ std::optional<bool> TCompositeChunkedArray::DoCheckOneValueAccessor(std::shared_
110110
return true;
111111
}
112112

113-
std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::GetChunkedArray(const TColumnConstructionContext& context) const {
113+
std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::DoGetChunkedArray(const TColumnConstructionContext& context) const {
114114
ui32 pos = 0;
115115
std::vector<std::shared_ptr<arrow::Array>> chunks;
116116
for (auto&& i : Chunks) {

ydb/core/formats/arrow/accessor/composite/accessor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
2222
private:
2323
YDB_READONLY_DEF(std::vector<std::shared_ptr<IChunkedArray>>, Chunks);
2424

25-
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(const TColumnConstructionContext& context) const override;
25+
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray(const TColumnConstructionContext& context) const override;
2626

2727
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
2828
for (auto&& i : Chunks) {

ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnCon
257257
return NArrow::FinishBuilder(std::move(builder));
258258
}
259259

260-
std::shared_ptr<arrow::ChunkedArray> TSubColumnsArray::GetChunkedArray(const TColumnConstructionContext& context) const {
260+
std::shared_ptr<arrow::ChunkedArray> TSubColumnsArray::DoGetChunkedArray(const TColumnConstructionContext& context) const {
261261
auto chunk = BuildBJsonArray(context);
262262
if (chunk->length()) {
263263
return std::make_shared<arrow::ChunkedArray>(chunk);

ydb/core/formats/arrow/accessor/sub_columns/accessor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ class TSubColumnsArray: public IChunkedArray {
6464

6565
std::shared_ptr<arrow::Array> BuildBJsonArray(const TColumnConstructionContext& context) const;
6666

67-
public:
68-
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(
67+
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray(
6968
const TColumnConstructionContext& context = Default<TColumnConstructionContext>()) const override;
69+
70+
public:
7071
virtual void DoVisitValues(const std::function<void(std::shared_ptr<arrow::Array>)>& /*visitor*/) const override {
7172
AFL_VERIFY(false);
7273
}

ydb/core/formats/arrow/save_load/loader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ std::optional<NSplitter::TSimpleSerializationStat> TColumnLoader::TryBuildColumn
7070
if constexpr (switcher.IsCType) {
7171
using CType = typename decltype(switcher)::ValueType;
7272
result = NSplitter::TSimpleSerializationStat(std::max<ui32>(1, sizeof(CType) / 2), 1, sizeof(CType));
73+
result->SetOrigination(NSplitter::TSimpleSerializationStat::EOrigination::Loader);
7374
}
7475
return true;
7576
});

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3302,7 +3302,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
33023302
auto alterQuery =
33033303
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
33043304
{"levels" : [{"class_name" : "Zero", "expected_blobs_size" : 1, "portions_count_available" : 3},
3305-
{"class_name" : "Zero"}]}`);
3305+
{"class_name" : "Zero", "expected_blobs_size" : 1}]}`);
33063306
)";
33073307
auto result = session.ExecuteQuery(alterQuery, NQuery::TTxControl::NoTx()).GetValueSync();
33083308
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());

ydb/core/tx/columnshard/columnshard__write_index.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ class TDiskResourcesRequest: public NLimiter::IResourceRequest {
1717
private:
1818
using TBase = NLimiter::IResourceRequest;
1919
std::shared_ptr<NOlap::TCompactedWriteController> WriteController;
20+
const std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
2021
const ui64 TabletId;
2122

2223
private:
2324
virtual void DoOnResourceAllocated() override {
25+
Changes->SetStage(NOlap::NChanges::EStage::Writing);
2426
NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max()));
2527
}
2628

2729
public:
28-
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId)
30+
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId,
31+
const std::shared_ptr<NOlap::TColumnEngineChanges>& changes)
2932
: TBase(writeController->GetWriteVolume())
3033
, WriteController(writeController)
34+
, Changes(changes)
3135
, TabletId(tabletId)
3236
{
3337

@@ -38,29 +42,33 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
3842
auto putStatus = ev->Get()->GetPutStatus();
3943

4044
if (putStatus == NKikimrProto::UNKNOWN) {
45+
const auto change = ev->Get()->IndexChanges;
4146
if (IsAnyChannelYellowStop()) {
4247
ACFL_ERROR("event", "TEvWriteIndex failed")("reason", "channel yellow stop");
4348

4449
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
4550
ev->Get()->SetPutStatus(NKikimrProto::TRYLATER);
4651
NOlap::TChangesFinishContext context("out of disk space");
47-
ev->Get()->IndexChanges->Abort(*this, context);
52+
change->Abort(*this, context);
4853
ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
4954
} else {
50-
ACFL_DEBUG("event", "TEvWriteIndex")("count", ev->Get()->IndexChanges->GetWritePortionsCount());
51-
AFL_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount());
52-
const bool needDiskLimiter = ev->Get()->IndexChanges->NeedDiskWriteLimiter();
55+
ACFL_DEBUG("event", "TEvWriteIndex")("count", change->GetWritePortionsCount());
56+
AFL_VERIFY(change->GetWritePortionsCount());
57+
const bool needDiskLimiter = change->NeedDiskWriteLimiter();
5358
auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release());
5459
const TConclusion<bool> needDraftTransaction = writeController->GetBlobsAction().NeedDraftWritingTransaction();
5560
AFL_VERIFY(needDraftTransaction.IsSuccess())("error", needDraftTransaction.GetErrorMessage());
5661
if (*needDraftTransaction) {
5762
ACFL_DEBUG("event", "TTxWriteDraft");
63+
change->SetStage(NOlap::NChanges::EStage::WriteDraft);
5864
Execute(new TTxWriteDraft(this, writeController));
5965
} else if (needDiskLimiter) {
6066
ACFL_DEBUG("event", "Limiter");
61-
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
67+
change->SetStage(NOlap::NChanges::EStage::AskDiskQuota);
68+
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID(), change));
6269
} else {
6370
ACFL_DEBUG("event", "WriteActor");
71+
change->SetStage(NOlap::NChanges::EStage::Writing);
6472
Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
6573
}
6674
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
592592
}
593593
TxEvent->IndexChanges->Blobs = ExtractBlobsData();
594594
const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
595-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
595+
TxEvent->IndexChanges->SetStage(NOlap::NChanges::EStage::ReadyForConstruct);
596+
std::shared_ptr<NConveyor::ITask> task =
597+
std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
596598
if (isInsert) {
597599
NConveyor::TInsertServiceOperator::SendTaskToExecute(task);
598600
} else {
@@ -849,21 +851,27 @@ void TColumnShard::SetupCompaction(const std::set<TInternalPathId>& pathIds) {
849851
class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITask {
850852
private:
851853
using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
854+
std::shared_ptr<NOlap::TColumnEngineChanges> ChangeTask;
852855
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
853856
std::shared_ptr<TDataAccessorsSubscriberBase> Subscriber;
854857
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
855858

856859
virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
857860
Subscriber->SetResourcesGuard(guard);
858861
Request->RegisterSubscriber(Subscriber);
862+
if (ChangeTask) {
863+
ChangeTask->SetStage(NOlap::NChanges::EStage::AskAccessors);
864+
}
859865
DataAccessorsManager->AskData(Request);
860866
}
861867

862868
public:
863869
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
864870
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriberBase>& subscriber,
865-
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
871+
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
872+
const std::shared_ptr<NOlap::TColumnEngineChanges>& changeTask)
866873
: TBase(0, memory, externalTaskId, context)
874+
, ChangeTask(changeTask)
867875
, Request(std::move(request))
868876
, Subscriber(subscriber)
869877
, DataAccessorsManager(dataAccessorsManager) {
@@ -879,6 +887,7 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
879887
const TString externalTaskId = Changes->GetTaskIdentifier();
880888
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);
881889

890+
Changes->SetStage(NOlap::NChanges::EStage::ReadBlobs);
882891
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
883892
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
884893
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
@@ -898,21 +907,23 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
898907
return;
899908
}
900909

901-
auto compaction = dynamic_pointer_cast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges>(indexChanges);
902-
compaction->SetActivityFlag(GetTabletActivity());
903-
compaction->SetQueueGuard(guard);
904-
compaction->Start(*this);
910+
auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
911+
compaction.SetActivityFlag(GetTabletActivity());
912+
compaction.SetQueueGuard(guard);
913+
compaction.Start(*this);
905914

906915
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
907-
auto request = compaction->ExtractDataAccessorsRequest();
916+
auto request = compaction.ExtractDataAccessorsRequest();
908917
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) +
909918
indexChanges->CalcMemoryForUsage();
910919
const auto subscriber = std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
911920
Settings.CacheDataAfterCompaction, SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx(),
912921
CompactTaskSubscription);
913-
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
914-
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(),
915-
CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
922+
compaction.SetStage(NOlap::NChanges::EStage::AskResources);
923+
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
924+
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(), CompactTaskSubscription,
925+
std::move(request),
926+
subscriber, DataAccessorsManager.GetObjectPtrVerified(), indexChanges));
916927
}
917928

918929
class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead {
@@ -981,7 +992,7 @@ void TColumnShard::SetupMetadata() {
981992
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
982993
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription,
983994
std::shared_ptr<NOlap::TDataAccessorsRequest>(i.GetRequest()),
984-
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified()));
995+
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified(), nullptr));
985996
}
986997
}
987998

@@ -1020,7 +1031,7 @@ bool TColumnShard::SetupTtl() {
10201031
request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + memoryUsage;
10211032
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
10221033
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier(), TTLTaskSubscription,
1023-
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
1034+
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), i));
10241035
}
10251036
return true;
10261037
}
@@ -1069,7 +1080,7 @@ void TColumnShard::SetupCleanupPortions() {
10691080

10701081
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
10711082
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier(), TTLTaskSubscription,
1072-
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
1083+
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), changes));
10731084
}
10741085

10751086
void TColumnShard::SetupCleanupTables() {

0 commit comments

Comments
 (0)