Skip to content

Commit f2d4aa3

Browse files
committed
Added blocks splitting into s3 source
1 parent 0ce9793 commit f2d4aa3

23 files changed

+248
-45
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,14 +721,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
721721
const auto& readRanges = ev->Get()->ReadRanges;
722722
const auto& typeEnv = ev->Get()->TypeEnv;
723723
const auto& holderFactory = ev->Get()->HolderFactory;
724+
const auto& memoryLimits = ev->Get()->MemoryLimits;
724725
if (Stat) {
725726
Stat->AddCounters2(ev->Get()->Sensors);
726727
}
727728
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
728729
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
729730
std::tie(transform.Input, transform.Buffer) = ev->Get()->InputTransforms.at(inputIndex);
730731
}
731-
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);
732+
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, memoryLimits, nullptr);
732733

733734
{
734735
// say "Hello" to executer

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TTaskOutput;
2222
namespace NYql::NDq {
2323
struct TSourceState;
2424
struct TSinkState;
25+
struct TDqTaskRunnerMemoryLimits;
2526
} // namespace NYql::NDq
2627

2728
namespace NActors {
@@ -262,6 +263,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
262263
const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally
263264
TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings
264265
NWilson::TTraceId TraceId;
266+
const TDqTaskRunnerMemoryLimits& MemoryLimits;
265267
};
266268

267269
struct TLookupSourceArguments {

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13381338
const THashMap<TString, TString>& secureParams,
13391339
const THashMap<TString, TString>& taskParams,
13401340
const TVector<TString>& readRanges,
1341+
const TDqTaskRunnerMemoryLimits& memoryLimits,
13411342
IRandomProvider* randomProvider
13421343
)
13431344
{
@@ -1371,7 +1372,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13711372
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
13721373
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
13731374
.Arena = Task.GetArena(),
1374-
.TraceId = ComputeActorSpan.GetTraceId()
1375+
.TraceId = ComputeActorSpan.GetTraceId(),
1376+
.MemoryLimits = memoryLimits
13751377
});
13761378
} catch (const std::exception& ex) {
13771379
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
249249
TaskRunner->GetSecureParams(),
250250
TaskRunner->GetTaskParams(),
251251
TaskRunner->GetReadRanges(),
252+
TaskRunner->GetMemoryLimits(),
252253
TaskRunner->GetRandomProvider()
253254
);
254255
}

ydb/library/yql/dq/actors/task_runner/events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ struct TEvTaskRunnerCreateFinished
179179
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
180180
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
181181
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>>&& inputTransforms,
182+
const TDqTaskRunnerMemoryLimits& memoryLimits,
182183
const TTaskRunnerActorSensors& sensors = {}
183184
)
184185
: Sensors(sensors)
@@ -189,6 +190,7 @@ struct TEvTaskRunnerCreateFinished
189190
, HolderFactory(holderFactory)
190191
, Alloc(alloc)
191192
, InputTransforms(std::move(inputTransforms))
193+
, MemoryLimits(memoryLimits)
192194
{
193195
Y_ABORT_UNLESS(inputTransforms.empty() || Alloc);
194196
}
@@ -210,6 +212,7 @@ struct TEvTaskRunnerCreateFinished
210212
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
211213
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
212214
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> InputTransforms; //can'not be const, because we need to explicitly clear it in destructor
215+
const TDqTaskRunnerMemoryLimits MemoryLimits;
213216
};
214217

215218
struct TEvTaskRunFinished

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,8 @@ class TLocalTaskRunnerActor
467467
TaskRunner->GetTypeEnv(),
468468
TaskRunner->GetHolderFactory(),
469469
Alloc,
470-
std::move(inputTransforms)
470+
std::move(inputTransforms),
471+
TaskRunner->GetMemoryLimits()
471472
);
472473

473474
Send(

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ class TDqTaskRunner : public IDqTaskRunner {
539539
void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
540540
const IDqTaskRunnerExecutionContext& execCtx) override
541541
{
542+
MemoryLimits = memoryLimits;
542543
TaskId = task.GetId();
543544
auto entry = BuildTask(task);
544545

@@ -901,6 +902,10 @@ class TDqTaskRunner : public IDqTaskRunner {
901902
return Stats.get();
902903
}
903904

905+
const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
906+
return MemoryLimits;
907+
}
908+
904909
TString Save() const override {
905910
return AllocatedHolder->ProgramParsed.CompGraph->SaveGraphState();
906911
}
@@ -998,6 +1003,7 @@ class TDqTaskRunner : public IDqTaskRunner {
9981003
ui64 TaskId = 0;
9991004
TDqTaskRunnerContext Context;
10001005
TDqTaskRunnerSettings Settings;
1006+
TDqTaskRunnerMemoryLimits MemoryLimits;
10011007
TLogFunc LogFunc;
10021008
std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider;
10031009
TDqTaskCountersProvider CountersProvider;

ydb/library/yql/dq/runtime/dq_tasks_runner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
437437
virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0;
438438
virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0;
439439
virtual NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const = 0;
440+
virtual const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const = 0;
440441

441442
virtual const THashMap<TString, TString>& GetSecureParams() const = 0;
442443
virtual const THashMap<TString, TString>& GetTaskParams() const = 0;

ydb/library/yql/providers/dq/actors/worker_actor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ class TDqWorker: public TRichActor<TDqWorker>
300300
const auto& readRanges = ev->Get()->ReadRanges;
301301
const auto& typeEnv = ev->Get()->TypeEnv;
302302
const auto& holderFactory = ev->Get()->HolderFactory;
303+
const auto& memoryLimits = ev->Get()->MemoryLimits;
303304

304305
Stat.Measure<void>("PrepareChannels", [&](){
305306
auto& inputs = Task.GetInputs();
@@ -323,7 +324,8 @@ class TDqWorker: public TRichActor<TDqWorker>
323324
.TypeEnv = typeEnv,
324325
.HolderFactory = holderFactory,
325326
.ProgramBuilder = *source.ProgramBuilder,
326-
.MemoryQuotaManager = MemoryQuotaManager
327+
.MemoryQuotaManager = MemoryQuotaManager,
328+
.MemoryLimits = memoryLimits
327329
});
328330
RegisterLocalChild(source.Actor);
329331
} else {

ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ class TLocalTaskRunner: public ITaskRunner {
181181
return {0, ""};
182182
}
183183

184+
const NDq::TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
185+
return Runner->GetMemoryLimits();
186+
}
187+
184188
private:
185189
void UpdateStats() {
186190
QueryStat.AddTaskRunnerStats(*Runner->GetStats(), Task.GetId(), Task.GetStageId());

ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,7 +1367,7 @@ class TTaskRunner: public IPipeTaskRunner {
13671367
}
13681368

13691369
NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override {
1370-
ChannelBufferSize = limits.ChannelBufferSize;
1370+
MemoryLimits = limits;
13711371
NDqProto::TCommandHeader header;
13721372
header.SetVersion(1);
13731373
header.SetCommand(NDqProto::TCommandHeader::PREPARE);
@@ -1462,6 +1462,10 @@ class TTaskRunner: public IPipeTaskRunner {
14621462
return ReadRanges;
14631463
}
14641464

1465+
const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
1466+
return MemoryLimits;
1467+
}
1468+
14651469
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override {
14661470
auto guard = AllocatedHolder->TypeEnv.BindAllocator();
14671471
if (memoryLimit) {
@@ -1596,11 +1600,11 @@ class TTaskRunner: public IPipeTaskRunner {
15961600
for (ui32 i = 0; i < Task.InputsSize(); ++i) {
15971601
auto& inputDesc = Task.GetInputs(i);
15981602
if (inputDesc.HasSource()) {
1599-
Sources[i] = new TDqSource(Task.GetId(), i, InputTypes.at(i), ChannelBufferSize, this);
1603+
Sources[i] = new TDqSource(Task.GetId(), i, InputTypes.at(i), MemoryLimits.ChannelBufferSize, this);
16001604
} else {
16011605
for (auto& inputChannelDesc : inputDesc.GetChannels()) {
16021606
ui64 channelId = inputChannelDesc.GetId();
1603-
InputChannels[channelId] = new TInputChannel(this, Task.GetId(), channelId, Input, Output, ChannelBufferSize);
1607+
InputChannels[channelId] = new TInputChannel(this, Task.GetId(), channelId, Input, Output, MemoryLimits.ChannelBufferSize);
16041608
}
16051609
}
16061610
}
@@ -1615,7 +1619,7 @@ class TTaskRunner: public IPipeTaskRunner {
16151619
TVector<TString> ReadRanges;
16161620
THashMap<ui64, TIntrusivePtr<TInputChannel>> InputChannels;
16171621
THashMap<ui64, TIntrusivePtr<TDqSource>> Sources;
1618-
i64 ChannelBufferSize = 0;
1622+
TDqTaskRunnerMemoryLimits MemoryLimits;
16191623

16201624
std::shared_ptr <NKikimr::NMiniKQL::TScopedAlloc> Alloc;
16211625

@@ -1771,6 +1775,10 @@ class TDqTaskRunner: public NDq::IDqTaskRunner {
17711775
return Delegate->GetReadRanges();
17721776
}
17731777

1778+
const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
1779+
return Delegate->GetMemoryLimits();
1780+
}
1781+
17741782
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override {
17751783
return Delegate->BindAllocator(memoryLimit);
17761784
}

ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class ITaskRunner: public TThrRefBase, private TNonCopyable {
6262
virtual const THashMap<TString,TString>& GetSecureParams() const = 0;
6363
virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0;
6464
virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0;
65+
virtual const NDq::TDqTaskRunnerMemoryLimits& GetMemoryLimits() const = 0;
6566

6667
// if memoryLimit = Nothing() then don't set memory limit, use existing one (if any)
6768
// if memoryLimit = 0 then set unlimited

ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ class TTaskRunnerActor
643643
taskRunner->GetHolderFactory(),
644644
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc>{},
645645
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>>{},
646+
taskRunner->GetMemoryLimits(),
646647
sensors);
647648

648649
actorSystem->Send(

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PEERDIR(
2222
contrib/libs/fmt
2323
contrib/libs/poco/Util
2424
ydb/library/actors/http
25+
ydb/library/formats/arrow
2526
library/cpp/protobuf/util
2627
library/cpp/string_utils/base64
2728
library/cpp/string_utils/quote

ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ namespace NYql::NDq {
7373
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
7474
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
7575
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
76-
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
76+
args.TaskParams, args.ReadRanges, args.MemoryLimits, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
7777
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
7878
});
7979
#else

0 commit comments

Comments
 (0)