Skip to content

Commit 667a30d

Browse files
authored
remove result channel proxies (#11382)
1 parent 6ed35ea commit 667a30d

21 files changed

+47
-391
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@ struct TProducerState {
3131
ui64 ChannelId = 0;
3232

3333
void SendAck(const NActors::TActorIdentity& actor) const {
34-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
35-
resp->Record.SetSeqNo(*LastSeqNo);
34+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
3635
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
37-
resp->Record.SetChannelId(ChannelId);
3836

3937
actor.Send(ActorId, resp.Release());
4038
}

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
266266
<< ", freeSpace: " << freeSpaceBytes
267267
<< ", to: " << ExecuterActorId_);
268268

269-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
270-
resp->Record.SetSeqNo(*LastSeqNo_);
269+
// scan query has single result set, so it's ok to put zero as channelId here.
270+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
271271
resp->Record.SetFreeSpace(freeSpaceBytes);
272-
273272
ctx.Send(ExecuterActorId_, resp.Release());
274-
275273
AckedFreeSpaceBytes_ = freeSpaceBytes;
276274
}
277275
}
@@ -320,6 +318,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
320318
}
321319

322320
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
321+
323322
auto& record = ev->Get()->Record;
324323
NYql::TIssues issues = ev->Get()->GetIssues();
325324

@@ -355,8 +354,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
355354
<< ", to: " << ev->Sender
356355
<< ", queue: " << FlowControl_.QueueSize());
357356

358-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
359-
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
357+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
360358
resp->Record.SetFreeSpace(freeSpaceBytes);
361359

362360
ctx.Send(ev->Sender, resp.Release());

ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,7 @@ class TStreamExecuteYqlScriptRPC
281281
<< ", to: " << ev->Sender
282282
<< ", queue: " << FlowControl_.QueueSize());
283283

284-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
285-
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
284+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
286285
resp->Record.SetFreeSpace(freeSpaceBytes);
287286

288287
ctx.Send(ev->Sender, resp.Release());
@@ -322,8 +321,7 @@ class TStreamExecuteYqlScriptRPC
322321
<< ", freeSpace: " << freeSpaceBytes
323322
<< ", to: " << GatewayRequestHandlerActorId_);
324323

325-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
326-
resp->Record.SetSeqNo(*LastSeqNo_);
324+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
327325
resp->Record.SetFreeSpace(freeSpaceBytes);
328326

329327
ctx.Send(GatewayRequestHandlerActorId_, resp.Release());

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,10 +2036,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20362036
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
20372037
TVector<ui64> computeTasks;
20382038

2039-
if (StreamResult) {
2040-
InitializeChannelProxies();
2041-
}
2042-
20432039
for (auto& task : TasksGraph.GetTasks()) {
20442040
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
20452041
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,19 @@ struct TEvKqpExecuter {
7171
TKqpExecuterEvents::EvStreamData> {};
7272

7373
struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
74-
TKqpExecuterEvents::EvStreamDataAck> {};
74+
TKqpExecuterEvents::EvStreamDataAck>
75+
{
76+
friend class TEventPBBase;
77+
explicit TEvStreamDataAck(ui64 seqno, ui64 channelId)
78+
{
79+
Record.SetSeqNo(seqno);
80+
Record.SetChannelId(channelId);
81+
}
82+
83+
private:
84+
// using a little hack to hide default empty constructor
85+
TEvStreamDataAck() = default;
86+
};
7587

7688
// deprecated event, remove in the future releases.
7789
struct TEvExecuterProgress : public TEventPB<TEvExecuterProgress, NKikimrKqp::TEvExecuterProgress,

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include "kqp_executer_stats.h"
55
#include "kqp_planner.h"
66
#include "kqp_partition_helper.h"
7-
#include "kqp_result_channel.h"
87
#include "kqp_table_resolver.h"
98

109
#include <ydb/core/kqp/common/kqp_ru_calc.h>
@@ -1855,22 +1854,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18551854
return true;
18561855
}
18571856

1858-
void InitializeChannelProxies() {
1859-
// notice: forward all respones to executer if
1860-
// trailing results are allowed.
1861-
// temporary, will be removed in the next pr.
1862-
if (Request.IsTrailingResultsAllowed())
1863-
return;
1864-
1865-
for(const auto& channel: TasksGraph.GetChannels()) {
1866-
if (channel.DstTask) {
1867-
continue;
1868-
}
1869-
1870-
CreateChannelProxy(channel);
1871-
}
1872-
}
1873-
18741857
const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
18751858
return TasksGraph.GetMeta().Snapshot;
18761859
}
@@ -1879,31 +1862,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18791862
TasksGraph.GetMeta().SetSnapshot(step, txId);
18801863
}
18811864

1882-
IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {
1883-
auto channelIt = ResultChannelProxies.find(channel.Id);
1884-
if (channelIt != ResultChannelProxies.end()) {
1885-
return channelIt->second;
1886-
}
1887-
1888-
YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
1889-
const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
1890-
1891-
IActor* proxy;
1892-
if (txResult.IsStream && txResult.QueryResultIndex.Defined()) {
1893-
proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
1894-
txResult.ColumnOrder, txResult.ColumnHints, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex);
1895-
} else {
1896-
proxy = CreateResultDataChannelProxy(TxId, channel.Id, this->SelfId(),
1897-
channel.DstInputIndex, ResponseEv.get());
1898-
}
1899-
1900-
this->RegisterWithSameMailbox(proxy);
1901-
ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy));
1902-
TasksGraph.GetMeta().ResultChannelProxies.emplace(channel.Id, proxy->SelfId());
1903-
1904-
return proxy;
1905-
}
1906-
19071865
protected:
19081866
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
19091867
// that `PassAway()` should kill actor immediately.

0 commit comments

Comments
 (0)