Skip to content

Commit ffb9e1c

Browse files
Olap to S3 data transfer fix (#13753)
1 parent f6e89b1 commit ffb9e1c

File tree

2 files changed

+141
-10
lines changed

2 files changed

+141
-10
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,125 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
21132113
}
21142114
}
21152115

2116+
Y_UNIT_TEST(TestOlapToS3Insert) {
2117+
const TString root = "/Root/";
2118+
const TString source = "source";
2119+
const TString table1 = "table1";
2120+
const TString table2 = "table2";
2121+
const TString bucket = "bucket";
2122+
2123+
CreateBucket(bucket);
2124+
2125+
auto kikimr = NTestUtils::MakeKikimrRunner();
2126+
2127+
auto tc = kikimr->GetTableClient();
2128+
auto session = tc.CreateSession().GetValueSync().GetSession();
2129+
2130+
const TString olapTable = "DestinationOlap";
2131+
2132+
const TString query = fmt::format(R"(
2133+
CREATE EXTERNAL DATA SOURCE `{source}` WITH (
2134+
SOURCE_TYPE="ObjectStorage",
2135+
LOCATION="{location}",
2136+
AUTH_METHOD="NONE"
2137+
);
2138+
CREATE EXTERNAL TABLE `{table1}` (
2139+
key Int64 NOT NULL,
2140+
value String NOT NULL,
2141+
) WITH (
2142+
DATA_SOURCE="{source}",
2143+
LOCATION="/{location_table1}/",
2144+
FORMAT="csv_with_names"
2145+
);
2146+
CREATE EXTERNAL TABLE `{table2}` (
2147+
key Int64 NOT NULL,
2148+
value String NOT NULL,
2149+
year String NOT NULL
2150+
) WITH (
2151+
DATA_SOURCE="{source}",
2152+
LOCATION="/{location_table2}/",
2153+
FORMAT="csv_with_names",
2154+
PARTITIONED_BY="['year']"
2155+
);
2156+
CREATE TABLE `{olap_table}` (
2157+
key Int64 NOT NULL,
2158+
value String NOT NULL,
2159+
PRIMARY KEY (key)
2160+
)
2161+
WITH (STORE = COLUMN);)",
2162+
"location"_a = GetBucketLocation(bucket),
2163+
"source"_a = root + source,
2164+
"table1"_a = root + table1,
2165+
"table2"_a = root + table2,
2166+
"location_table1"_a = table1,
2167+
"location_table2"_a = table2,
2168+
"olap_table"_a = olapTable
2169+
);
2170+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2171+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2172+
2173+
auto db = kikimr->GetQueryClient();
2174+
2175+
{
2176+
const TString sql = fmt::format(R"(
2177+
INSERT INTO {destination}
2178+
SELECT key, value FROM {source};)",
2179+
"destination"_a = table1,
2180+
"source"_a = olapTable);
2181+
2182+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2183+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2184+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2185+
2186+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2187+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2188+
}
2189+
2190+
{
2191+
const TString sql = fmt::format(R"(
2192+
INSERT INTO {destination}
2193+
SELECT key, value FROM {source} LIMIT 1;)",
2194+
"destination"_a = table1,
2195+
"source"_a = olapTable);
2196+
2197+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2198+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2199+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2200+
2201+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2202+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2203+
}
2204+
2205+
{
2206+
const TString sql = fmt::format(R"(
2207+
INSERT INTO {destination}
2208+
SELECT key, value, "2024" AS year FROM {source};)",
2209+
"destination"_a = table2,
2210+
"source"_a = olapTable);
2211+
2212+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2213+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2214+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2215+
2216+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2217+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2218+
}
2219+
2220+
{
2221+
const TString sql = fmt::format(R"(
2222+
INSERT INTO {destination}
2223+
SELECT key, value, "2024" AS year FROM {source} LIMIT 1;)",
2224+
"destination"_a = table2,
2225+
"source"_a = olapTable);
2226+
2227+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2228+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2229+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
2230+
2231+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2232+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
2233+
}
2234+
}
21162235
}
21172236

21182237
} // namespace NKikimr::NKqp

ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,29 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
201201

202202
if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
203203
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink.";
204+
auto shouldBePassedAsInput = FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TDqStage::CallableName()); });
205+
206+
auto stageInputs = Build<TExprList>(ctx, writePos);
207+
auto toFlow = Build<TCoToFlow>(ctx, writePos);
208+
TVector<TCoArgument> args;
209+
210+
if (shouldBePassedAsInput) {
211+
auto arg = Build<TCoArgument>(ctx, writePos).Name("in").Done();
212+
stageInputs.Add(input);
213+
args.push_back(arg);
214+
toFlow.Input(arg);
215+
}
216+
else {
217+
toFlow.Input(input);
218+
}
219+
204220
return keys.empty() ?
205221
Build<TDqStage>(ctx, writePos)
206-
.Inputs().Build()
222+
.Inputs(stageInputs.Done())
207223
.Program<TCoLambda>()
208-
.Args({})
224+
.Args(args)
209225
.Body<TS3SinkOutput>()
210-
.Input<TCoToFlow>()
211-
.Input(input)
212-
.Build()
226+
.Input(toFlow.Done())
213227
.Format(target.Format())
214228
.KeyColumns().Build()
215229
.Settings(sinkOutputSettingsBuilder.Done())
@@ -237,12 +251,10 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
237251
.Add<TDqCnHashShuffle>()
238252
.Output<TDqOutput>()
239253
.Stage<TDqStage>()
240-
.Inputs().Build()
254+
.Inputs(stageInputs.Done())
241255
.Program<TCoLambda>()
242-
.Args({})
243-
.Body<TCoToFlow>()
244-
.Input(input)
245-
.Build()
256+
.Args(args)
257+
.Body(toFlow.Done())
246258
.Build()
247259
.Settings().Build()
248260
.Build()

0 commit comments

Comments
 (0)