Skip to content

Commit 5dad85d

Browse files
authored
Stop wide combiner state from growing unlimited (#10997)
1 parent 93c21b6 commit 5dad85d

File tree

1 file changed

+32
-5
lines changed

1 file changed

+32
-5
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ class TState : public TComputationValue<TState> {
239239
return KeyWidth + StateWidth;
240240
}
241241
public:
242-
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal)
243-
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), States(hash, equal, CountRowsOnPage) {
242+
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
243+
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
244244
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
245245
CurrentPosition = 0;
246246
Tongue = CurrentPage->data();
@@ -275,11 +275,28 @@ class TState : public TComputationValue<TState> {
275275
}
276276
Throat = States.GetKey(itInsert) + KeyWidth;
277277
if (isNew) {
278-
States.CheckGrow();
278+
GrowStates();
279279
}
280280
return isNew;
281281
}
282282

283+
void GrowStates() {
284+
try {
285+
States.CheckGrow();
286+
} catch (TMemoryLimitExceededException) {
287+
YQL_LOG(INFO) << "State failed to grow";
288+
if (IsOutOfMemory || !AllowOutOfMemory) {
289+
throw;
290+
} else {
291+
IsOutOfMemory = true;
292+
}
293+
}
294+
}
295+
296+
bool CheckIsOutOfMemory() const {
297+
return IsOutOfMemory;
298+
}
299+
283300
template<bool SkipYields>
284301
bool ReadMore() {
285302
if constexpr (SkipYields) {
@@ -331,6 +348,8 @@ class TState : public TComputationValue<TState> {
331348
private:
332349
std::optional<TStorageIterator> ExtractIt;
333350
const ui32 KeyWidth, StateWidth;
351+
const bool AllowOutOfMemory;
352+
bool IsOutOfMemory = false;
334353
ui64 CurrentPosition = 0;
335354
TRow* CurrentPage = nullptr;
336355
TStorage Storage;
@@ -386,7 +405,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
386405
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
387406
)
388407
: TBase(memInfo)
389-
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
408+
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory)
390409
, UsedInputItemType(usedInputItemType)
391410
, KeyAndStateType(keyAndStateType)
392411
, KeyWidth(keyWidth)
@@ -451,6 +470,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
451470
ETasteResult TasteIt() {
452471
if (GetMode() == EOperatingMode::InMemory) {
453472
bool isNew = InMemoryProcessingState.TasteIt();
473+
if (InMemoryProcessingState.CheckIsOutOfMemory()) {
474+
StateWantsToSpill = true;
475+
}
454476
Throat = InMemoryProcessingState.Throat;
455477
return isNew ? ETasteResult::Init : ETasteResult::Update;
456478
}
@@ -649,7 +671,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
649671
}
650672

651673
bool CheckMemoryAndSwitchToSpilling() {
652-
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
674+
if (!(AllowSpilling && Ctx.SpillerFactory)) {
675+
return false;
676+
}
677+
if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
678+
StateWantsToSpill = false;
653679
LogMemoryUsage();
654680

655681
SwitchMode(EOperatingMode::SplittingState);
@@ -840,6 +866,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
840866
NUdf::TUnboxedValuePod* Throat = nullptr;
841867

842868
private:
869+
bool StateWantsToSpill = false;
843870
bool IsEverythingExtracted = false;
844871

845872
TState InMemoryProcessingState;

0 commit comments

Comments
 (0)