Skip to content

Commit 22ba6e9

Browse files
rvu1024maximyurchuk
authored andcommitted
Apply GH commits
Apply GH: Stop wide combiner state from growing unlimited (#10997) Apply GH: Reconnect session has been supported (#9862) Apply GH: Handle unexpected future exception (#11091) commit_hash:c2597abeae3a153692a80cc13446423769765ae1
1 parent 3c382c6 commit 22ba6e9

File tree

6 files changed

+51
-12
lines changed

6 files changed

+51
-12
lines changed

yql/essentials/core/facade/yql_facade.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution(
8080
(program->*method)(std::forward<Params2>(params)...);
8181
YQL_ENSURE(future.Initialized());
8282
future.Wait();
83-
YQL_ENSURE(!future.HasException());
83+
HandleFutureException(future);
8484

8585
TProgram::TStatus status = future.GetValue();
8686
while (status == TProgram::TStatus::Async) {
8787
auto continueFuture = program->ContinueAsync();
8888
continueFuture.Wait();
89-
YQL_ENSURE(!continueFuture.HasException());
89+
HandleFutureException(continueFuture);
9090
status = continueFuture.GetValue();
9191
}
9292

yql/essentials/core/yql_execution.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,12 +525,12 @@ class TExecutionTransformer : public TGraphTransformerBase {
525525

526526
if (DeterministicMode) {
527527
future.Subscribe([state](const NThreading::TFuture<void>& future) {
528-
YQL_ENSURE(!future.HasException());
528+
HandleFutureException(future);
529529
ProcessFutureResultQueue(state);
530530
});
531531
} else {
532532
future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
533-
YQL_ENSURE(!future.HasException());
533+
HandleFutureException(future);
534534

535535
TAutoPtr<TState::TItem> item = new TState::TItem;
536536
item->Node = node; item->DataProvider = dataProvider;

yql/essentials/core/yql_graph_transformer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo
249249

250250
auto future = transformer.GetAsyncFuture(*root);
251251
future.Wait();
252-
YQL_ENSURE(!future.HasException());
252+
HandleFutureException(future);
253253

254254
status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
255255
if (newRoot) {
@@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
377377
NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
378378
status.Subscribe(
379379
[asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
380-
YQL_ENSURE(!status.HasException());
380+
HandleFutureException(status);
381381
asyncCallback(status.GetValue());
382382
});
383383
}

yql/essentials/core/yql_graph_transformer.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
236236
IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
237237
TExprContext& ctx, bool applyAsyncChanges);
238238

239+
template <typename T>
240+
void HandleFutureException(const NThreading::TFuture<T>& future) {
241+
if (future.HasException()) {
242+
try {
243+
future.TryRethrow();
244+
} catch (...) {
245+
throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
246+
}
247+
}
248+
}
249+
239250
class TSyncTransformerBase : public TGraphTransformerBase {
240251
public:
241252
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {

yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ class TState : public TComputationValue<TState> {
240240
return KeyWidth + StateWidth;
241241
}
242242
public:
243-
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal)
244-
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), States(hash, equal, CountRowsOnPage) {
243+
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
244+
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
245245
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
246246
CurrentPosition = 0;
247247
Tongue = CurrentPage->data();
@@ -276,11 +276,28 @@ class TState : public TComputationValue<TState> {
276276
}
277277
Throat = States.GetKey(itInsert) + KeyWidth;
278278
if (isNew) {
279-
States.CheckGrow();
279+
GrowStates();
280280
}
281281
return isNew;
282282
}
283283

284+
void GrowStates() {
285+
try {
286+
States.CheckGrow();
287+
} catch (TMemoryLimitExceededException) {
288+
YQL_LOG(INFO) << "State failed to grow";
289+
if (IsOutOfMemory || !AllowOutOfMemory) {
290+
throw;
291+
} else {
292+
IsOutOfMemory = true;
293+
}
294+
}
295+
}
296+
297+
bool CheckIsOutOfMemory() const {
298+
return IsOutOfMemory;
299+
}
300+
284301
template<bool SkipYields>
285302
bool ReadMore() {
286303
if constexpr (SkipYields) {
@@ -332,6 +349,8 @@ class TState : public TComputationValue<TState> {
332349
private:
333350
std::optional<TStorageIterator> ExtractIt;
334351
const ui32 KeyWidth, StateWidth;
352+
const bool AllowOutOfMemory;
353+
bool IsOutOfMemory = false;
335354
ui64 CurrentPosition = 0;
336355
TRow* CurrentPage = nullptr;
337356
TStorage Storage;
@@ -387,7 +406,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
387406
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
388407
)
389408
: TBase(memInfo)
390-
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
409+
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory)
391410
, UsedInputItemType(usedInputItemType)
392411
, KeyAndStateType(keyAndStateType)
393412
, KeyWidth(keyWidth)
@@ -452,6 +471,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
452471
ETasteResult TasteIt() {
453472
if (GetMode() == EOperatingMode::InMemory) {
454473
bool isNew = InMemoryProcessingState.TasteIt();
474+
if (InMemoryProcessingState.CheckIsOutOfMemory()) {
475+
StateWantsToSpill = true;
476+
}
455477
Throat = InMemoryProcessingState.Throat;
456478
return isNew ? ETasteResult::Init : ETasteResult::Update;
457479
}
@@ -650,7 +672,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
650672
}
651673

652674
bool CheckMemoryAndSwitchToSpilling() {
653-
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
675+
if (!(AllowSpilling && Ctx.SpillerFactory)) {
676+
return false;
677+
}
678+
if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
679+
StateWantsToSpill = false;
654680
LogMemoryUsage();
655681

656682
SwitchMode(EOperatingMode::SplittingState);
@@ -841,6 +867,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
841867
NUdf::TUnboxedValuePod* Throat = nullptr;
842868

843869
private:
870+
bool StateWantsToSpill = false;
844871
bool IsEverythingExtracted = false;
845872

846873
TState InMemoryProcessingState;

yql/essentials/providers/common/proto/gateways_config.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ message TPqClusterConfig {
326326
optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token
327327
optional string DatabaseId = 12;
328328
repeated TAttr Settings = 100;
329-
optional bool SharedReading = 101;
329+
optional bool SharedReading = 101;
330+
optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
330331
}
331332

332333
message TPqGatewayConfig {

0 commit comments

Comments
 (0)