Skip to content

Commit 38a0fb6

Browse files
Yq 3428 fix (#7450)
1 parent 0adcc3d commit 38a0fb6

File tree

3 files changed

+68
-6
lines changed

3 files changed

+68
-6
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,15 +307,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
307307
structuredTokenBuilder.SetNoAuth();
308308
}
309309

310+
auto effectiveFilePattern = meta->TableLocation;
311+
if (meta->TableLocation.EndsWith('/')) {
312+
effectiveFilePattern += '*';
313+
}
314+
310315
const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
311316
auto httpGateway = NYql::IHTTPGateway::Make();
312317
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
313318
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
314319
.Url = meta->DataSourceLocation,
315320
.Credentials = credentials,
316-
.Pattern = meta->TableLocation,
321+
.Pattern = effectiveFilePattern,
317322
}, Nothing(), false);
318-
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
323+
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
319324
auto& listRes = listResFut.GetValue();
320325
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
321326
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
281281
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
282282
tableMeta->ExternalSource.Properties = description.GetProperties();
283283
tableMeta->ExternalSource.DataSourcePath = tableName;
284-
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
285284
return result;
286285
}
287286

@@ -831,14 +830,14 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
831830

832831
switch (entry.Kind) {
833832
case EKind::KindExternalDataSource: {
834-
if (externalPath) {
835-
entry.Path = SplitPath(*externalPath);
836-
}
837833
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
838834
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
839835
promise.SetValue(externalDataSourceMetadata);
840836
return;
841837
}
838+
if (externalPath) {
839+
externalDataSourceMetadata.Metadata->ExternalSource.TableLocation = *externalPath;
840+
}
842841
LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem)
843842
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
844843
{

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,64 @@ def test_inference_optional_types(self, kikimr, s3, client, unique_prefix):
252252
assert result_set.rows[2].items[2].int64_value == 15
253253
assert result_set.rows[2].items[3].int64_value == 33
254254

255+
@yq_v2
256+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
257+
def test_inference_multiple_files(self, kikimr, s3, client, unique_prefix):
258+
resource = boto3.resource(
259+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
260+
)
261+
262+
bucket = resource.Bucket("fbucket")
263+
bucket.create(ACL='public-read')
264+
bucket.objects.all().delete()
265+
266+
s3_client = boto3.client(
267+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
268+
)
269+
270+
read_data_1 = '''a,b,c
271+
1,2,3
272+
1,2,3'''
273+
read_data_2 = '''a,b,c
274+
1,2,3'''
275+
276+
s3_client.put_object(Body=read_data_1, Bucket='fbucket', Key='/1.csv', ContentType='text/plain')
277+
s3_client.put_object(Body=read_data_2, Bucket='fbucket', Key='/2.csv', ContentType='text/plain')
278+
kikimr.control_plane.wait_bootstrap(1)
279+
storage_connection_name = unique_prefix + "multiple_files_bucket"
280+
client.create_storage_connection(storage_connection_name, "fbucket")
281+
282+
sql = f'''
283+
SELECT *
284+
FROM `{storage_connection_name}`.`/`
285+
WITH (format=csv_with_names, with_infer='true');
286+
'''
287+
288+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
289+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
290+
291+
data = client.get_result_data(query_id)
292+
result_set = data.result.result_set
293+
logging.debug(str(result_set))
294+
assert len(result_set.columns) == 3
295+
assert result_set.columns[0].name == "a"
296+
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64
297+
assert result_set.columns[1].name == "b"
298+
assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64
299+
assert result_set.columns[2].name == "c"
300+
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64
301+
assert len(result_set.rows) == 3
302+
assert result_set.rows[0].items[0].int64_value == 1
303+
assert result_set.rows[0].items[1].int64_value == 2
304+
assert result_set.rows[0].items[2].int64_value == 3
305+
assert result_set.rows[1].items[0].int64_value == 1
306+
assert result_set.rows[1].items[1].int64_value == 2
307+
assert result_set.rows[1].items[2].int64_value == 3
308+
assert result_set.rows[2].items[0].int64_value == 1
309+
assert result_set.rows[2].items[1].int64_value == 2
310+
assert result_set.rows[2].items[2].int64_value == 3
311+
assert sum(kikimr.control_plane.get_metering(1)) == 10
312+
255313
@yq_all
256314
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
257315
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):

0 commit comments

Comments
 (0)