Skip to content

Commit 21ff1db

Browse files
authored
query service: optimize result passing over trailing results (#10690)
1 parent fc6864d commit 21ff1db

File tree

4 files changed

+19
-24
lines changed

4 files changed

+19
-24
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,10 +298,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
298298
batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData());
299299
batch.Payload = std::move(computeData.Payload);
300300

301-
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
302-
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder, txResult.ColumnHints);
303-
304301
if (!trailingResults) {
302+
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
303+
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder, txResult.ColumnHints);
305304
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
306305
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
307306
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex);
@@ -319,10 +318,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
319318
ackEv->Record.SetChannelId(channel.Id);
320319
ackEv->Record.SetFreeSpace(50_MB);
321320
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
322-
txResult.TrailingResult.Swap(&resultSet);
321+
ui64 rowCount = batch.RowCount();
322+
ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
323323
txResult.HasTrailingResult = true;
324324
LOG_D("staging TEvStreamData to " << Target << ", seqNo: " << computeData.Proto.GetSeqNo()
325-
<< ", nRows: " << txResult.TrailingResult.rows().size());
325+
<< ", nRows: " << rowCount);
326326
}
327327

328328
return;
@@ -828,7 +828,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
828828
std::map<ui32, TStageScheduleInfo> result;
829829
if (!resourceSnapshot.empty()) // can't schedule w/o node count
830830
{
831-
// collect costs and schedule stages with external sources only
831+
// collect costs and schedule stages with external sources only
832832
double totalCost = 0.0;
833833
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
834834
auto& stage = tx.Body->GetStages(stageIdx);
@@ -843,11 +843,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
843843
if (!result.empty()) {
844844
// allow use 2/3 of threads in single stage
845845
ui32 maxStageTaskCount = (TStagePredictor::GetUsableThreads() * 2 + 2) / 3;
846-
// total limit per mode is x2
846+
// total limit per mode is x2
847847
ui32 maxTotalTaskCount = maxStageTaskCount * 2;
848848
for (auto& [_, stageInfo] : result) {
849849
// schedule tasks evenly between nodes
850-
stageInfo.TaskCount =
850+
stageInfo.TaskCount =
851851
std::max<ui32>(
852852
std::min(static_cast<ui32>(maxTotalTaskCount * stageInfo.StageCost / totalCost), maxStageTaskCount)
853853
, 1

ydb/core/kqp/query_data/kqp_query_data.cpp

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,8 @@ Ydb::ResultSet* TKqpExecuterTxResult::GetYdb(google::protobuf::Arena* arena, TMa
7777
return ydbResult;
7878
}
7979

80-
Ydb::ResultSet* TKqpExecuterTxResult::ExtractTrailingYdb(google::protobuf::Arena* arena) {
81-
if (!HasTrailingResult)
82-
return nullptr;
83-
84-
Ydb::ResultSet* ydbResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(arena);
85-
ydbResult->Swap(&TrailingResult);
86-
87-
return ydbResult;
80+
bool TKqpExecuterTxResult::HasTrailingResults() {
81+
return HasTrailingResult;
8882
}
8983

9084

@@ -235,10 +229,10 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes
235229
return TxResults[txIndex][resultIndex].GetMkql(arena);
236230
}
237231

238-
Ydb::ResultSet* TQueryData::ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
232+
bool TQueryData::HasTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb) {
239233
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
240234
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
241-
return TxResults[txIndex][resultIndex].ExtractTrailingYdb(arena);
235+
return TxResults[txIndex][resultIndex].HasTrailingResults();
242236
}
243237

244238

ydb/core/kqp/query_data/kqp_query_data.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ struct TKqpExecuterTxResult {
8282
const TVector<TString>* ColumnHints = nullptr;
8383
TMaybe<ui32> QueryResultIndex = 0;
8484
NKikimr::NMiniKQL::TUnboxedValueBatch Rows;
85-
Ydb::ResultSet TrailingResult;
8685
bool HasTrailingResult = false;
8786

8887
explicit TKqpExecuterTxResult(
@@ -103,7 +102,7 @@ struct TKqpExecuterTxResult {
103102
NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena);
104103
NKikimrMiniKQL::TResult GetMkql();
105104
Ydb::ResultSet* GetYdb(google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
106-
Ydb::ResultSet* ExtractTrailingYdb(google::protobuf::Arena* arena);
105+
bool HasTrailingResults();
107106

108107
void FillMkql(NKikimrMiniKQL::TResult* mkqlResult);
109108
void FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsLimitPerWrite);
@@ -256,7 +255,7 @@ class TQueryData : NMiniKQL::ITerminator {
256255
TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex);
257256
NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
258257
Ydb::ResultSet* GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
259-
Ydb::ResultSet* ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
258+
bool HasTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb);
260259

261260
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
262261
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,10 +1780,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17801780
size_t trailingResultsCount = 0;
17811781
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
17821782
if (QueryState->IsStreamResult()) {
1783-
auto ydbResult = QueryState->QueryData->ExtractTrailingTxResult(
1784-
phyQuery.GetResultBindings(i), response->GetArena());
17851783

1786-
if (ydbResult) {
1784+
if (QueryState->QueryData->HasTrailingTxResult(phyQuery.GetResultBindings(i))) {
1785+
auto ydbResult = QueryState->QueryData->GetYdbTxResult(
1786+
phyQuery.GetResultBindings(i), response->GetArena(), {});
1787+
1788+
YQL_ENSURE(ydbResult);
17871789
++trailingResultsCount;
17881790
YQL_ENSURE(trailingResultsCount <= 1);
17891791
response->AddYdbResults()->Swap(ydbResult);

0 commit comments

Comments
 (0)