Skip to content

Commit ef55dcd

Browse files
authored
merge to ydb stable YQ-4053 support parallel s3 import (#14174)
1 parent c0d6909 commit ef55dcd

File tree

3 files changed

+56
-53
lines changed

3 files changed

+56
-53
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,15 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) {
143143
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
144144

145145
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
146-
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Limit-Sink");
147-
UNIT_ASSERT(writeStagePlan["Operators"].GetArraySafe().size() >= 2);
148-
const auto& sinkOp = writeStagePlan["Operators"].GetArraySafe()[1];
146+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink");
147+
UNIT_ASSERT(writeStagePlan["Operators"].GetArraySafe().size() >= 1);
148+
const auto& sinkOp = writeStagePlan["Operators"].GetArraySafe()[0];
149149
UNIT_ASSERT_VALUES_EQUAL(sinkOp["ExternalDataSource"].GetStringSafe(), "write_data_source");
150150
UNIT_ASSERT_VALUES_EQUAL(sinkOp["Compression"].GetStringSafe(), "gzip");
151151

152-
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
152+
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
153153
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "Source");
154+
UNIT_ASSERT(readStagePlan["Operators"].GetArraySafe().size() >= 1);
154155
const auto& sourceOp = readStagePlan["Operators"].GetArraySafe()[0];
155156
UNIT_ASSERT_VALUES_EQUAL(sourceOp["ExternalDataSource"].GetStringSafe(), "read_data_source");
156157

ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -573,15 +573,15 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
573573
const auto& key = MakePartitionKey(row);
574574
const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
575575
if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) {
576-
auto fileWrite = std::make_unique<TS3FileWriteActor>(
577-
TxId,
578-
Gateway,
579-
Credentials,
580-
key,
581-
NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension),
582-
Compression,
583-
RetryPolicy, DirtyWrite, Token);
584-
keyIt->second.emplace_back(fileWrite.get());
576+
auto fileWrite = std::make_unique<TS3FileWriteActor>(
577+
TxId,
578+
Gateway,
579+
Credentials,
580+
key,
581+
NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension),
582+
Compression,
583+
RetryPolicy, DirtyWrite, Token);
584+
keyIt->second.emplace_back(fileWrite.get());
585585
RegisterWithSameMailbox(fileWrite.release());
586586
}
587587

@@ -619,6 +619,10 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
619619
NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode;
620620
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
621621
statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode);
622+
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
623+
statusCode = NDqProto::StatusIds::INTERNAL_ERROR;
624+
result->Get()->Issues.AddIssue("Got upload error with unspecified error code.");
625+
}
622626
}
623627

624628
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);
@@ -640,10 +644,15 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
640644
if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) {
641645
(*ft)->PassAway();
642646
it->second.erase(ft);
643-
if (it->second.empty())
647+
if (it->second.empty()) {
644648
FileWriteActors.erase(it);
649+
}
645650
}
646651
}
652+
if (!Finished && GetFreeSpace() > 0) {
653+
LOG_D("TS3WriteActor", "Has free space, notify owner");
654+
Callbacks->ResumeExecution();
655+
}
647656
FinishIfNeeded();
648657
}
649658

ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,15 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
124124
auto keys = GetPartitionKeys(partBy);
125125

126126
auto sinkSettingsBuilder = Build<TExprList>(ctx, target.Pos());
127-
if (partBy)
127+
if (partBy) {
128128
sinkSettingsBuilder.Add(std::move(partBy));
129+
}
129130

130131
auto compression = GetCompression(settings);
131132
const auto& extension = GetExtension(target.Format().Value(), compression ? compression->Tail().Content() : ""sv);
132-
if (compression)
133+
if (compression) {
133134
sinkSettingsBuilder.Add(std::move(compression));
135+
}
134136

135137
auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, target.Pos());
136138
if (auto csvDelimiter = GetCsvDelimiter(settings)) {
@@ -199,31 +201,17 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
199201
}
200202
}
201203

202-
if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
204+
if (IsDqPureExpr(input)) {
203205
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink.";
204-
auto shouldBePassedAsInput = FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TDqStage::CallableName()); });
205-
206-
auto stageInputs = Build<TExprList>(ctx, writePos);
207-
auto toFlow = Build<TCoToFlow>(ctx, writePos);
208-
TVector<TCoArgument> args;
209-
210-
if (shouldBePassedAsInput) {
211-
auto arg = Build<TCoArgument>(ctx, writePos).Name("in").Done();
212-
stageInputs.Add(input);
213-
args.push_back(arg);
214-
toFlow.Input(arg);
215-
}
216-
else {
217-
toFlow.Input(input);
218-
}
219-
220206
return keys.empty() ?
221207
Build<TDqStage>(ctx, writePos)
222-
.Inputs(stageInputs.Done())
208+
.Inputs().Build()
223209
.Program<TCoLambda>()
224-
.Args(args)
210+
.Args({})
225211
.Body<TS3SinkOutput>()
226-
.Input(toFlow.Done())
212+
.Input<TCoToFlow>()
213+
.Input(input)
214+
.Build()
227215
.Format(target.Format())
228216
.KeyColumns().Build()
229217
.Settings(sinkOutputSettingsBuilder.Done())
@@ -251,10 +239,12 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
251239
.Add<TDqCnHashShuffle>()
252240
.Output<TDqOutput>()
253241
.Stage<TDqStage>()
254-
.Inputs(stageInputs.Done())
242+
.Inputs().Build()
255243
.Program<TCoLambda>()
256-
.Args(args)
257-
.Body(toFlow.Done())
244+
.Args({})
245+
.Body<TCoToFlow>()
246+
.Input(input)
247+
.Build()
258248
.Build()
259249
.Settings().Build()
260250
.Build()
@@ -317,23 +307,26 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
317307
.Build()
318308
.Done();
319309

320-
auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos());
321-
if (inputStage.Outputs() && keys.empty()) {
322-
outputsBuilder.InitFrom(inputStage.Outputs().Cast());
323-
}
324-
outputsBuilder.Add(sink);
310+
auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos())
311+
.Add(sink);
325312

326313
if (keys.empty()) {
327-
const auto outputBuilder = Build<TS3SinkOutput>(ctx, target.Pos())
328-
.Input(inputStage.Program().Body().Ptr())
329-
.Format(target.Format())
330-
.KeyColumns().Add(std::move(keys)).Build()
331-
.Settings(sinkOutputSettingsBuilder.Done())
332-
.Done();
333-
334314
return Build<TDqStage>(ctx, writePos)
335-
.InitFrom(inputStage)
336-
.Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr()))
315+
.Inputs()
316+
.Add<TDqCnMap>()
317+
.Output(dqUnion.Output())
318+
.Build()
319+
.Build()
320+
.Program<TCoLambda>()
321+
.Args({"in"})
322+
.Body<TS3SinkOutput>()
323+
.Input("in")
324+
.Format(target.Format())
325+
.KeyColumns().Add(std::move(keys)).Build()
326+
.Settings(sinkOutputSettingsBuilder.Done())
327+
.Build()
328+
.Build()
329+
.Settings().Build()
337330
.Outputs(outputsBuilder.Done())
338331
.Done();
339332
} else {

0 commit comments

Comments
 (0)