Skip to content

async ca/task runner: get rid of channel/etc id lists in events #21546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 3 additions & 34 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,8 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC

void DoBootstrap() {
NActors::IActor* actor;
THashSet<ui32> inputWithDisabledCheckpointing;
for (const auto&[idx, inputInfo]: InputChannelsMap) {
if (inputInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) {
inputWithDisabledCheckpointing.insert(idx);
}
}
std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create(
this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), InitMemoryQuota());
TaskRunnerActorId = RegisterWithSameMailbox(actor);

TDqTaskRunnerMemoryLimits limits;
Expand Down Expand Up @@ -199,13 +193,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), CheckpointRequest, .Defined());
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), WatermarkRequest, .Defined());
DUMP_PREFIXED("ContinueRunEvent.", (*ContinueRunEvent), MemLimit);
for (const auto& sinkId: ContinueRunEvent->SinkIds) {
html << "ContinueRunEvent.SinkIds: " << sinkId << "<br />";
}

for (const auto& inputTransformId: ContinueRunEvent->InputTransformIds) {
html << "ContinueRunEvent.InputTransformIds: " << inputTransformId << "<br />";
}
}

DUMP((*this), ContinueRunStartWaitTime, .ToString());
Expand Down Expand Up @@ -444,7 +431,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
void OnStateRequest(TEvDqCompute::TEvStateRequest::TPtr& ev) {
CA_LOG_T("Got TEvStateRequest from actor " << ev->Sender << " PingCookie: " << ev->Cookie);
if (!SentStatsRequest) {
Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics(GetIds(SinksMap), GetIds(InputTransformsMap)));
Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics());
SentStatsRequest = true;
}
WaitingForStateResponse.push_back({ev->Sender, ev->Cookie});
Expand Down Expand Up @@ -1066,15 +1053,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
return nullptr;
}

template<typename TSecond>
TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) {
TVector<ui32> ids;
std::transform(collection.begin(), collection.end(), std::back_inserter(ids), [](const auto& p) {
return p.first;
});
return ids;
}

void InjectBarrierToOutputs(const NDqProto::TCheckpoint&) override {
Y_ABORT_UNLESS(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
// already done in task_runner_actor
Expand Down Expand Up @@ -1139,28 +1117,19 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
if (!ContinueRunEvent) {
ContinueRunStartWaitTime = TInstant::Now();
ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>();
ContinueRunEvent->SinkIds = GetIds(SinksMap);
ContinueRunEvent->InputTransformIds = GetIds(InputTransformsMap);
}
ContinueRunEvent->CheckpointOnly = checkpointOnly;
if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) {
if (!ContinueRunEvent->WatermarkRequest) {
ContinueRunEvent->WatermarkRequest.ConstructInPlace();
ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest;

ContinueRunEvent->WatermarkRequest->ChannelIds.reserve(OutputChannelsMap.size());
for (const auto& [channelId, info] : OutputChannelsMap) {
if (info.WatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
ContinueRunEvent->WatermarkRequest->ChannelIds.emplace_back(channelId);
}
}
} else {
ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest);
}
}
if (checkpointRequest) {
if (!ContinueRunEvent->CheckpointRequest) {
ContinueRunEvent->CheckpointRequest.ConstructInPlace(GetIds(OutputChannelsMap), GetIds(SinksMap), *checkpointRequest);
ContinueRunEvent->CheckpointRequest.ConstructInPlace(*checkpointRequest);
} else {
Y_ABORT_UNLESS(ContinueRunEvent->CheckpointRequest->Checkpoint.GetGeneration() == checkpointRequest->GetGeneration());
Y_ABORT_UNLESS(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId());
Expand Down
28 changes: 8 additions & 20 deletions ydb/library/yql/dq/actors/task_runner/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,25 +299,19 @@ struct TEvOutputChannelData
struct TWatermarkRequest {
TWatermarkRequest() = default;

TWatermarkRequest(TVector<ui32>&& channelIds, TInstant watermark)
: ChannelIds(std::move(channelIds))
, Watermark(watermark) {
explicit TWatermarkRequest(TInstant watermark)
: Watermark(watermark) {
}

TVector<ui32> ChannelIds;
TInstant Watermark;
};

// Holds info required to inject barriers to outputs
struct TCheckpointRequest {
TCheckpointRequest(TVector<ui32>&& channelIds, TVector<ui32>&& sinkIds, const NDqProto::TCheckpoint& checkpoint)
: ChannelIds(std::move(channelIds))
, SinkIds(std::move(sinkIds))
, Checkpoint(checkpoint) {
explicit TCheckpointRequest(const NDqProto::TCheckpoint& checkpoint)
: Checkpoint(checkpoint) {
}

TVector<ui32> ChannelIds;
TVector<ui32> SinkIds;
NDqProto::TCheckpoint Checkpoint;
};

Expand All @@ -337,20 +331,18 @@ struct TEvContinueRun
, CheckpointOnly(checkpointOnly)
{ }

TEvContinueRun(THashSet<ui32>&& inputChannels, ui64 memLimit)
TEvContinueRun(TVector<ui32>&& inputChannels, ui64 memLimit)
: AskFreeSpace(false)
, InputChannels(std::move(inputChannels))
, MemLimit(memLimit)
{ }

bool AskFreeSpace = true;
const THashSet<ui32> InputChannels;
const TVector<ui32> InputChannels;
ui64 MemLimit;
TMaybe<TWatermarkRequest> WatermarkRequest = Nothing();
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
TVector<ui32> SinkIds;
TVector<ui32> InputTransformIds;
};

//Sent by TaskRunnerActor to ComputeActor as an acknowledgement in AsyncInputPush method call
Expand Down Expand Up @@ -427,14 +419,10 @@ struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFr

struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::EvStatistics>
{
explicit TEvStatistics(TVector<ui32>&& sinkIds, TVector<ui32>&& inputTransformIds)
: SinkIds(std::move(sinkIds))
, InputTransformIds(std::move(inputTransformIds))
, Stats() {
explicit TEvStatistics()
: Stats() {
}

TVector<ui32> SinkIds;
TVector<ui32> InputTransformIds;
NDq::TDqTaskRunnerStatsView Stats;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct ITaskRunnerActorFactory {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints = {},
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
};

Expand Down
70 changes: 44 additions & 26 deletions ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ class TLocalTaskRunnerActor
public:
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";

TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
, Alloc(alloc)
, Parent(parent)
, Factory(factory)
, TxId(txId)
, TaskId(taskId)
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
, MemoryQuota(std::move(memoryQuota))
{
}
Expand Down Expand Up @@ -91,12 +90,12 @@ class TLocalTaskRunnerActor
void OnStatisticsRequest(TEvStatistics::TPtr& ev) {

THashMap<ui32, const IDqAsyncOutputBuffer*> sinks;
for (const auto sinkId : ev->Get()->SinkIds) {
for (const auto sinkId : Sinks) {
sinks[sinkId] = TaskRunner->GetSink(sinkId).Get();
}

THashMap<ui32, const IDqAsyncInputBuffer*> inputTransforms;
for (const auto inputTransformId : ev->Get()->InputTransformIds) {
for (const auto inputTransformId : InputTransforms) {
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
}

Expand Down Expand Up @@ -132,11 +131,7 @@ class TLocalTaskRunnerActor
}

bool ReadyToCheckpoint() {
for (const auto inputChannelId: Inputs) {
if (InputChannelsWithDisabledCheckpoints.contains(inputChannelId)) {
continue;
}

for (const auto inputChannelId: InputsWithCheckpoints) {
const auto input = TaskRunner->GetInputChannel(inputChannelId);
if (!input->IsPaused()) {
return false;
Expand All @@ -162,7 +157,7 @@ class TLocalTaskRunnerActor

void OnContinueRun(TEvContinueRun::TPtr& ev) {
auto guard = TaskRunner->BindAllocator(MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : ev->Get()->MemLimit);
auto inputMap = ev->Get()->AskFreeSpace
const auto& inputMap = ev->Get()->AskFreeSpace
? Inputs
: ev->Get()->InputChannels;

Expand Down Expand Up @@ -198,9 +193,9 @@ class TLocalTaskRunnerActor
if (shouldHandleWatermark) {
const auto watermarkRequested = ev->Get()->WatermarkRequest->Watermark;
LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequested
<< " to " << ev->Get()->WatermarkRequest->ChannelIds.size() << " outputs ");
<< " to " << Outputs.size() << " outputs ");

for (const auto& channelId : ev->Get()->WatermarkRequest->ChannelIds) {
for (const auto& channelId : Outputs) {
NDqProto::TWatermark watermark;
watermark.SetTimestampUs(watermarkRequested.MicroSeconds());
TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark));
Expand All @@ -218,10 +213,10 @@ class TLocalTaskRunnerActor
data.Blob = TaskRunner->Save();
// inject barriers
// todo:(whcrc) barriers are injected even if source state save failed
for (const auto& channelId : ev->Get()->CheckpointRequest->ChannelIds) {
for (const auto& channelId : Outputs) {
TaskRunner->GetOutputChannel(channelId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
}
for (const auto& sinkId : ev->Get()->CheckpointRequest->SinkIds) {
for (const auto& sinkId : Sinks) {
TaskRunner->GetSink(sinkId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
}
} catch (const std::exception& e) {
Expand All @@ -236,15 +231,15 @@ class TLocalTaskRunnerActor
}

{
auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds), std::move(ev->Get()->InputTransformIds));
auto st = MakeHolder<TEvStatistics>();

THashMap<ui32, const IDqAsyncOutputBuffer*> sinks;
for (const auto sinkId : st->SinkIds) {
for (const auto sinkId : Sinks) {
sinks[sinkId] = TaskRunner->GetSink(sinkId).Get();
}

THashMap<ui32, const IDqAsyncInputBuffer*> inputTransforms;
for (const auto inputTransformId : st->InputTransformIds) { // TODO
for (const auto inputTransformId : InputTransforms) { // TODO
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
}

Expand Down Expand Up @@ -428,13 +423,32 @@ class TLocalTaskRunnerActor
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
if (input.HasSource()) {
Sources.emplace(inputId);
Sources.emplace_back(inputId);
} else {
for (auto& channel : input.GetChannels()) {
Inputs.emplace(channel.GetId());
Inputs.emplace_back(channel.GetId());
if (channel.GetCheckpointingMode() != NDqProto::CHECKPOINTING_MODE_DISABLED) {
InputsWithCheckpoints.emplace_back(channel.GetId());
}
}
}
}
std::sort(Inputs.begin(), Inputs.end());
Y_ENSURE(std::unique(Inputs.begin(), Inputs.end()) == Inputs.end());

auto& outputs = settings.GetOutputs();
for (auto outputId = 0; outputId < outputs.size(); outputId++) {
auto& output = outputs[outputId];
if (output.HasSink()) {
Sinks.emplace_back(outputId);
} else {
for (auto& channel : output.GetChannels()) {
Outputs.emplace_back(channel.GetId());
}
}
}
std::sort(Outputs.begin(), Outputs.end());
Y_ENSURE(std::unique(Outputs.begin(), Outputs.end()) == Outputs.end());

auto guard = TaskRunner->BindAllocator(MemoryQuota ? TMaybe<ui64>(MemoryQuota->GetMkqlMemoryLimit()) : Nothing());
if (MemoryQuota) {
Expand All @@ -456,8 +470,11 @@ class TLocalTaskRunnerActor
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> inputTransforms;
for (auto i = 0; i != inputs.size(); ++i) {
if (auto t = TaskRunner->GetInputTransform(i)) {
Y_ENSURE(inputs[i].HasTransform());
inputTransforms[i] = *t;
InputTransforms.emplace(i);
InputTransforms.emplace_back(i);
} else {
Y_ENSURE(!inputs[i].HasTransform());
}
}

Expand Down Expand Up @@ -504,11 +521,13 @@ class TLocalTaskRunnerActor
TTaskRunnerFactory Factory;
const TTxId TxId;
const ui64 TaskId;
THashSet<ui32> Inputs;
THashSet<ui32> InputTransforms;
THashSet<ui32> Sources;
TVector<ui32> Inputs;
TVector<ui32> InputsWithCheckpoints;
TVector<ui32> InputTransforms;
TVector<ui32> Sources;
TVector<ui32> Sinks;
TVector<ui32> Outputs;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THashSet<ui32> InputChannelsWithDisabledCheckpoints;
THolder<TDqMemoryQuota> MemoryQuota;
ui64 ActorElapsedTicks = 0;
};
Expand All @@ -523,10 +542,9 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
{
auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(memoryQuota));
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/providers/dq/actors/worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,11 @@ class TDqWorker: public TRichActor<TDqWorker>
return;
}

THashSet<ui32> inputChannels;
TVector<ui32> inputChannels;
for (auto& input : InputMap) {
auto& channel = input.second;
if (!channel.Requested && !channel.Finished) {
inputChannels.insert(channel.ChannelId);
inputChannels.push_back(channel.ChannelId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,10 @@ class TTaskRunnerActor
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
if (input.HasSource()) {
Sources.emplace(inputId);
Sources.emplace_back(inputId);
} else {
for (auto& channel : input.GetChannels()) {
Inputs.emplace(channel.GetId());
Inputs.emplace_back(channel.GetId());
}
}
}
Expand Down Expand Up @@ -756,8 +756,8 @@ class TTaskRunnerActor
NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner;
ITaskRunnerInvoker::TPtr Invoker;
bool Local;
THashSet<ui32> Inputs;
THashSet<ui32> Sources;
TVector<ui32> Inputs;
TVector<ui32> Sources;
TIntrusivePtr<TDqConfiguration> Settings;
NDqProto::EDataTransportVersion DataTransportVersion;
ui64 StageId;
Expand All @@ -784,7 +784,6 @@ class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&&,
THolder<NYql::NDq::TDqMemoryQuota>&&) override
{
auto* actor = new TTaskRunnerActor(parent, alloc, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData);
Expand Down
Loading