Skip to content

cs merge new changes from main #20757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 62 commits into
base: stable-25-1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
05ca37f
application of duplicate filters in simple reader (#18608)
swalrus1 May 28, 2025
3cb6663
duplicate filter construction task (#19120)
swalrus1 Jun 3, 2025
bf94acd
revert version normalizer to template... (#19332)
xyliganSereja Jun 6, 2025
67a71fa
Fix Missing Step in EvReadSetAck (#19424)
Vladilen Jun 6, 2025
e9b0051
one conveyor for insert and compaction. draft. (#18776)
ivanmorozov333 May 25, 2025
dd97321
Add RAM for test with sanitizers (#19326)
iddqdex Jun 4, 2025
a7d09cd
composite conveyor usage (#19462)
ivanmorozov333 Jun 9, 2025
3c67670
Fix categories usage with scope limits (#19665)
ivanmorozov333 Jun 15, 2025
49e482e
filter iterator corrections (#19359)
swalrus1 Jun 10, 2025
a059296
aborted scan counters (#19353)
swalrus1 Jun 10, 2025
db504fc
Experimental tiling compaction (#19499)
snaury Jun 10, 2025
24d1e43
fix deletion usage on compaction (#19576)
ivanmorozov333 Jun 10, 2025
f1f4a4e
Fix optimizer comparison when protobuf serializations are the same (#…
snaury Jun 11, 2025
cd18c9e
Add basic validation for tiling compaction settings (#19599)
snaury Jun 11, 2025
7b6f163
clean deprecated immediate write option (#19597)
ivanmorozov333 Jun 11, 2025
856e143
Remap path (#15991)
zverevgeny Jun 12, 2025
86ece5a
Add InternalPathId column to primary_index_granule_stats (#19644)
zverevgeny Jun 13, 2025
f912d95
remove insert table v1 (#19638)
ivanmorozov333 Jun 13, 2025
b39d884
add view for errors #18396 (#19468)
xyliganSereja Jun 13, 2025
0720c83
generate internal path id (#19660)
zverevgeny Jun 14, 2025
b8440b0
Add ColumnShard write statistics (#19642)
Vladilen Jun 16, 2025
b6756fa
special portions data fetcher (#19666)
ivanmorozov333 Jun 17, 2025
8dbab39
additional signals (#19745)
ivanmorozov333 Jun 18, 2025
0e1008e
Add operation increment in datashard (#19567)
r314-git Jun 18, 2025
3d2d4fb
Rename column table (#16961)
zverevgeny Jun 19, 2025
820f81f
add format for slice_limiter (#19870)
xyliganSereja Jun 19, 2025
287e48f
correct cpu conveyor (#19838)
ivanmorozov333 Jun 19, 2025
47f09bc
fix deadlock on special config (#19912)
ivanmorozov333 Jun 19, 2025
cad1adc
move slide_limiter to library #19682 (#19906)
xyliganSereja Jun 20, 2025
2764561
Add CS compaction ResourceBroker queues configuration to MemoryContro…
Vladilen Jun 20, 2025
682c4ce
Initial internal pathid based on TabletId (#19902)
zverevgeny Jun 21, 2025
6ab24e9
default portions count level limit (#19969)
ivanmorozov333 Jun 21, 2025
c82b9ee
clarify TLockFeatures (#20017)
zverevgeny Jun 23, 2025
376e436
Shared metadata cache (#18544)
XJIE6 Jun 23, 2025
17acab1
fix check MaxInternalPathId (#20071)
zverevgeny Jun 24, 2025
5105040
Fix use after free in MoveTableProgress (#20134)
zverevgeny Jun 25, 2025
d027f3c
general cache (#20004)
ivanmorozov333 Jun 25, 2025
0690efd
general cache requests abortion (#20190)
ivanmorozov333 Jun 26, 2025
9c26641
correct metadata control (#20070)
ivanmorozov333 Jun 26, 2025
2f5ac5c
fix portion accessor fetching for cleanup (#20252)
swalrus1 Jun 27, 2025
3e43f75
blobs portion data into portion accessor (#20281)
ivanmorozov333 Jun 27, 2025
54e5eb9
fix problem portions processing (#20297)
ivanmorozov333 Jun 27, 2025
6754d8d
fix signals and separate cache by source to kill processing (#20330)
ivanmorozov333 Jun 30, 2025
54274c8
cleaning and correct backgrounds inflight control (#20344)
ivanmorozov333 Jun 30, 2025
d96eb41
special different fixes (#20369)
ivanmorozov333 Jun 30, 2025
b6909b1
fix cursors usage (#20394)
ivanmorozov333 Jul 1, 2025
96262dd
Fix bulk CS stats (#20253)
Vladilen Jul 1, 2025
10b069f
wait time has been added (#16415)
dorooleg Jul 1, 2025
01b2ed5
Correct Scan Generation in case of retries (#20191) (#20347)
Hor911 Jun 30, 2025
4669b0b
tests for shards resolving (#20419)
ivanmorozov333 Jul 1, 2025
eed1c41
Add compaction page (#20274)
XJIE6 Jul 2, 2025
6bec071
cpu time has been fixed (#20495)
dorooleg Jul 2, 2025
e39fd0c
increasing wait time (#20676)
dorooleg Jul 7, 2025
3ce08ad
fix (#20686)
XJIE6 Jul 7, 2025
54a7110
Intersection tree with union-copy sets (#19427)
snaury Jun 9, 2025
27d464e
Use Y_ENSURE for usage errors in intersection tree (#19590)
snaury Jun 11, 2025
7186bd1
memory distribution control (#20173)
ivanmorozov333 Jun 26, 2025
1196e82
unused mark
dorooleg Jul 8, 2025
25f5f1d
cleanup
dorooleg Jul 8, 2025
1d4f6fb
is stopped
dorooleg Jul 8, 2025
d4f33a3
YQ-4222 added nested protos validation (#16975)
GrigoriyPA Apr 10, 2025
8a403ae
include has been fixed
dorooleg Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
ydb/core/kqp/ut/olap KqpOlapJson.DuplicationCompactionVariants
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
ydb/core/kqp/ut/batch_operations [*/*] chunk chunk
ydb/core/kqp/ut/federated_query/large_results KqpScriptExecResults.ExecuteScriptWithLargeFile
ydb/core/kqp/ut/federated_query/s3 sole chunk chunk
ydb/core/kqp/ut/indexes KqpMultishardIndex.WriteIntoRenamingSyncIndex
ydb/core/kqp/ut/olap [*/*] chunk chunk
ydb/core/kqp/ut/join KqpIndexLookupJoin.LeftJoinRightNullFilter+StreamLookup
ydb/core/kqp/ut/join KqpIndexLookupJoin.LeftJoinRightNullFilter-StreamLookup
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ struct TKikimrEvents : TEvents {
ES_FEATURE_FLAGS = 4262,
ES_PRIORITY_QUEUE = 4263,
ES_SOLOMON_PROVIDER = 4264,
ES_CONVEYOR_COMPOSITE = 4265,
ES_GENERAL_CACHE_PUBLIC = 4266,
ES_GENERAL_CACHE_SOURCE = 4267,

};
};

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/base/localdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,8 @@ const TString KqpResourceManagerTaskName = "kqp_query";
const TString KqpResourceManagerQueue = "queue_kqp_resource_manager";
const TString LegacyQueueIdTaskNamePrefix = "compaction_gen";

const TString ColumnShardCompactionIndexationQueue = "queue_cs_indexation";
const TString ColumnShardCompactionTtlQueue = "queue_cs_ttl";
const TString ColumnShardCompactionGeneralQueue = "queue_cs_general";
const TString ColumnShardCompactionNormalizerQueue = "queue_cs_normalizer";
}}
4 changes: 4 additions & 0 deletions ydb/core/base/localdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ extern const TString KqpResourceManagerTaskName;
extern const TString KqpResourceManagerQueue;
extern const TString LegacyQueueIdTaskNamePrefix;

extern const TString ColumnShardCompactionIndexationQueue;
extern const TString ColumnShardCompactionTtlQueue;
extern const TString ColumnShardCompactionGeneralQueue;
extern const TString ColumnShardCompactionNormalizerQueue;
}}
196 changes: 140 additions & 56 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,33 +404,33 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
class TScanGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
public:
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
TScanGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompPrioritiesInitializer: public IKikimrServicesInitializer {
class TCompGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
public:
TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig);
TCompGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
class TCompPrioritiesInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TScanConveyorInitializer: public IKikimrServicesInitializer {
class TCompositeConveyorInitializer : public IKikimrServicesInitializer {
public:
TScanConveyorInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
TCompositeConveyorInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TInsertConveyorInitializer: public IKikimrServicesInitializer {
class TGeneralCachePortionsMetadataInitializer: public IKikimrServicesInitializer {
public:
TInsertConveyorInitializer(const TKikimrRunConfig& runConfig);
TGeneralCachePortionsMetadataInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

Expand Down
15 changes: 6 additions & 9 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1700,23 +1700,20 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
}

if (serviceMask.EnableGroupedMemoryLimiter) {
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
sil->AddServiceInitializer(new TScanGroupedMemoryLimiterInitializer(runConfig));
sil->AddServiceInitializer(new TCompGroupedMemoryLimiterInitializer(runConfig));
}

if (serviceMask.EnableCompPriorities) {
sil->AddServiceInitializer(new TCompPrioritiesInitializer(runConfig));
}

if (serviceMask.EnableCompConveyor) {
sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig));
if (serviceMask.EnableCompConveyor || serviceMask.EnableInsertConveyor || serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TCompositeConveyorInitializer(runConfig));
}

if (serviceMask.EnableInsertConveyor) {
sil->AddServiceInitializer(new TInsertConveyorInitializer(runConfig));
if (serviceMask.EnableGeneralCachePortionsMetadata) {
sil->AddServiceInitializer(new TGeneralCachePortionsMetadataInitializer(runConfig));
}

if (serviceMask.EnableCms) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ union TBasicKikimrServicesMask {
bool EnableGroupedMemoryLimiter:1;
bool EnableAwsService:1;
bool EnableCompPriorities : 1;
bool EnableGeneralCachePortionsMetadata: 1;
};

struct {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ PEERDIR(
ydb/core/tx
ydb/core/tx/columnshard
ydb/core/tx/conveyor/service
ydb/core/tx/general_cache
ydb/core/tx/columnshard/data_accessor/cache_policy
ydb/core/tx/coordinator
ydb/core/tx/datashard
ydb/core/tx/limiter/grouped_memory/usage
ydb/core/tx/limiter/service
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
ydb/core/tx/mediator
Expand Down Expand Up @@ -142,6 +143,7 @@ PEERDIR(
ydb/library/security
ydb/library/signal_backtrace
ydb/library/yql/providers/pq/cm_client
ydb/library/slide_limiter/service
ydb/library/yql/providers/s3/actors
ydb/public/lib/base
ydb/public/lib/deprecated/client
Expand Down
70 changes: 63 additions & 7 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ const std::vector<bool>& TColumnFilter::BuildSimpleFilter() const {
class TMergePolicyAnd {
private:
public:
static bool HasValue(const bool /*a*/, const bool /*b*/) {
return true;
}
static bool Calc(const bool a, const bool b) {
return a && b;
}
Expand All @@ -307,11 +310,17 @@ class TMergePolicyAnd {
return TColumnFilter::BuildDenyFilter();
}
}
static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
return MergeWithSimple(filter, simpleValue);
}
};

class TMergePolicyOr {
private:
public:
static bool HasValue(const bool /*a*/, const bool /*b*/) {
return true;
}
static bool Calc(const bool a, const bool b) {
return a || b;
}
Expand All @@ -322,6 +331,43 @@ class TMergePolicyOr {
return filter;
}
}
static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
return MergeWithSimple(filter, simpleValue);
}
};

class TMergePolicyApplyFilter {
private:
public:
static bool HasValue(const bool /*a*/, const bool b) {
return b;
}
static bool Calc(const bool a, const bool /*b*/) {
return a;
}
static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) {
if (simpleValue) {
return filter;
} else {
return TColumnFilter::BuildDenyFilter();
}
}
static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
const auto count = filter.GetRecordsCount();
if (simpleValue) {
TColumnFilter result = TColumnFilter::BuildAllowFilter();
if (count) {
result.Add(true, *count);
}
return result;
} else {
TColumnFilter result = TColumnFilter::BuildDenyFilter();
if (count) {
result.Add(false, *count);
}
return result;
}
}
};

class TColumnFilter::TMergerImpl {
Expand All @@ -340,7 +386,7 @@ class TColumnFilter::TMergerImpl {
if (Filter1.empty() && Filter2.empty()) {
return TColumnFilter(TMergePolicy::Calc(Filter1.DefaultFilterValue, Filter2.DefaultFilterValue));
} else if (Filter1.empty()) {
return TMergePolicy::MergeWithSimple(Filter2, Filter1.DefaultFilterValue);
return TMergePolicy::MergeWithSimple(Filter1.DefaultFilterValue, Filter2);
} else if (Filter2.empty()) {
return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue);
} else {
Expand All @@ -359,7 +405,7 @@ class TColumnFilter::TMergerImpl {

while (it1 != Filter1.Filter.cend() && it2 != Filter2.Filter.cend()) {
const ui32 delta = TColumnFilter::CrossSize(pos2, pos2 + *it2, pos1, pos1 + *it1);
if (delta) {
if (delta && TMergePolicy::HasValue(curValue1, curValue2)) {
if (!count || curCurrent != TMergePolicy::Calc(curValue1, curValue2)) {
resultFilter.emplace_back(delta);
curCurrent = TMergePolicy::Calc(curValue1, curValue2);
Expand Down Expand Up @@ -405,10 +451,15 @@ TColumnFilter TColumnFilter::Or(const TColumnFilter& extFilter) const {
return TMergerImpl(*this, extFilter).Merge<TMergePolicyOr>();
}

TColumnFilter TColumnFilter::ApplyFilterFrom(const TColumnFilter& extFilter) const {
ResetCaches();
return TMergerImpl(*this, extFilter).Merge<TMergePolicyApplyFilter>();
}

TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter) const {
ResetCaches();
if (Filter.empty()) {
return TMergePolicyAnd::MergeWithSimple(extFilter, DefaultFilterValue);
return TMergePolicyAnd::MergeWithSimple(DefaultFilterValue, extFilter);
} else if (extFilter.Filter.empty()) {
return TMergePolicyAnd::MergeWithSimple(*this, extFilter.DefaultFilterValue);
} else {
Expand Down Expand Up @@ -467,14 +518,19 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}
}

TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
TColumnFilter::TIterator TColumnFilter::GetBegin(const bool reverse, const ui32 expectedSize) const {
return GetIterator(reverse, expectedSize, 0);
}

TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize, const ui64 startOffset) const {
AFL_VERIFY(expectedSize >= startOffset);
if (IsTotalAllowFilter()) {
return TIterator(reverse, expectedSize, true);
return TIterator(reverse, expectedSize, true, startOffset);
} else if (IsTotalDenyFilter()) {
return TIterator(reverse, expectedSize, false);
return TIterator(reverse, expectedSize, false, startOffset);
} else {
AFL_VERIFY(expectedSize == GetRecordsCountVerified())("expected", expectedSize)("count", GetRecordsCountVerified())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
return TIterator(reverse, Filter, GetStartValue(reverse), startOffset);
}
}

Expand Down
18 changes: 13 additions & 5 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TColumnFilter {
public:
TString DebugString() const;

TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue)
TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue, const ui64 startOffset)
: FilterPointer(&filter)
, CurrentValue(startValue)
, FinishPosition(reverse ? -1 : FilterPointer->size())
Expand All @@ -163,9 +163,12 @@ class TColumnFilter {
}
CurrentRemainVolume = (*FilterPointer)[Position];
}
if (startOffset) {
Next(startOffset);
}
}

TIterator(const bool reverse, const ui32 size, const bool startValue)
TIterator(const bool reverse, const ui32 size, const bool startValue, const ui64 startOffset)
: CurrentValue(startValue)
, FinishPosition(reverse ? -1 : 1)
, DeltaPosition(reverse ? -1 : 1) {
Expand All @@ -177,6 +180,9 @@ class TColumnFilter {
}
CurrentRemainVolume = size;
}
if (startOffset) {
Next(startOffset);
}
}

bool GetCurrentAcceptance() const {
Expand All @@ -194,7 +200,8 @@ class TColumnFilter {

TString DebugString() const;

TIterator GetIterator(const bool reverse, const ui32 expectedSize) const;
TIterator GetBegin(const bool reverse, const ui32 expectedSize) const;
TIterator GetIterator(const bool reverse, const ui32 expectedSize, const ui64 startOffset) const;

bool CheckSlice(const ui32 offset, const ui32 count) const;

Expand Down Expand Up @@ -263,8 +270,9 @@ class TColumnFilter {
return TColumnFilter(false);
}

TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
[[nodiscard]] TColumnFilter And(const TColumnFilter& extFilter) const;
[[nodiscard]] TColumnFilter Or(const TColumnFilter& extFilter) const;
[[nodiscard]] TColumnFilter ApplyFilterFrom(const TColumnFilter& filter) const;

class TApplyContext {
private:
Expand Down
Loading