Skip to content

Commit cfe8084

Browse files
authored
YQL: Skip Hybrid at Dq timeout (#12487)
1 parent 9af6b6f commit cfe8084

File tree

7 files changed

+25
-2
lines changed

7 files changed

+25
-2
lines changed

ydb/library/yql/providers/dq/actors/executer_actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
9393
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
9494
Issues.AddIssues({issue});
9595
*ExecutionTimeoutCounter += 1;
96-
Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED);
96+
Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, true);
9797
})
9898
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
9999
})
@@ -279,7 +279,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
279279
Send(ev->Sender, response.Release());
280280
}
281281

282-
void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode)
282+
void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool timeout = false)
283283
{
284284
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString();
285285
if (Finished) {
@@ -292,6 +292,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
292292
}
293293
IssuesToMessage(Issues, result.MutableIssues());
294294
result.SetStatusCode(statusCode);
295+
result.SetTimeout(timeout);
295296
Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result)));
296297
Finished = true;
297298
}

ydb/library/yql/providers/dq/api/protos/dqs.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ message TQueryResponse {
184184
uint64 RowsCount = 8;
185185
NYql.NDqProto.StatusIds.StatusCode StatusCode = 9;
186186
repeated NDqProto.TData Sample = 10;
187+
bool Timeout = 11;
187188
}
188189

189190
message TDqFailure {

ydb/library/yql/providers/dq/api/protos/service.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ message ExecuteGraphResponse {
9898
Ydb.Operations.Operation Operation = 1;
9999
repeated ResponseMetric Metric = 2;
100100
bool Truncated = 3;
101+
bool Timeout = 4;
101102
}
102103

103104
message SvnRevisionRequest {

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
10621062
state->Statistics[state->MetricId++] = res.Statistics;
10631063

10641064
if (res.Fallback) {
1065+
if (res.Timeout) {
1066+
NotifyDqTimeout(state);
1067+
}
10651068
if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) {
10661069
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
10671070
issues.AddIssues(res.Issues());
@@ -1496,6 +1499,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
14961499
state->Metrics->IncCounter("dq", "Fallback");
14971500
}
14981501
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
1502+
if (res.Timeout) {
1503+
NotifyDqTimeout(state);
1504+
}
14991505
// never fallback will be captured in yql_facade
15001506
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
15011507
issues.AddIssues(res.Issues());
@@ -1998,6 +2004,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
19982004
state->Metrics->IncCounter("dq", "Fallback");
19992005
}
20002006
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
2007+
if (res.Timeout) {
2008+
NotifyDqTimeout(state);
2009+
}
20012010
}
20022011

20032012
CompleteNode(execState, node, [resIssues = res.Issues(), fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus {
@@ -2086,6 +2095,14 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
20862095
return status;
20872096
}
20882097

2098+
static void NotifyDqTimeout(const TDqStatePtr& state) {
2099+
auto integrations = GetUniqueIntegrations(*state->TypeCtx);
2100+
std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::NotifyDqTimeout, std::placeholders::_1));
2101+
if (state->Metrics) {
2102+
state->Metrics->IncCounter("dq", "Timeout");
2103+
}
2104+
}
2105+
20892106
private:
20902107
TDqStatePtr State;
20912108
ISkiffConverterPtr SkiffConverter;

ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>
176176

177177
bool error = false;
178178
bool fallback = false;
179+
result.Timeout = resp.GetTimeout();
179180

180181
if (status.Ok()) {
181182
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::Ok";

ydb/library/yql/providers/dq/provider/yql_dq_gateway.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class IDqGateway : public TThrRefBase {
7878
bool Retriable = false;
7979
bool Truncated = false;
8080
ui64 RowsCount = 0;
81+
bool Timeout = false;
8182

8283
TOperationStatistics Statistics;
8384

ydb/library/yql/providers/dq/service/grpc_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ namespace NYql::NDqs {
178178
operation.Mutableresult()->PackFrom(queryResult);
179179
*operation.Mutableissues() = result.GetIssues();
180180
ResponseBuffer.SetTruncated(result.GetTruncated());
181+
ResponseBuffer.SetTimeout(result.GetTimeout());
181182

182183
Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0);
183184
}

0 commit comments

Comments
 (0)