55
55
stream_unique_updates_only ,
56
56
)
57
57
58
+ # The allowed file formats.
59
+ # You could modify this if you wanted to add new...
60
+ g_allowed_formats = ["parquet" , "root-file" ]
61
+
58
62
59
63
class StreamInfoBase :
60
64
"""Contains base information about results that are streamed back from
@@ -320,8 +324,8 @@ def first_supported_datatype(
320
324
return None
321
325
322
326
def ignore_cache (self ):
323
- """Return a context manager that, as long as it is held, will cause any queries against just
324
- this dataset to ignore any locally cached data.
327
+ """Return a context manager that, as long as it is held, will cause any queries against
328
+ just this dataset to ignore any locally cached data.
325
329
326
330
Returns:
327
331
ContextManager: As long as this is held, the local query cache will be ignored.
@@ -352,7 +356,7 @@ async def get_data_rootfiles_stream(
352
356
file locally.
353
357
"""
354
358
async for f_info in self ._stream_local_files (
355
- selection_query , title , "root-files "
359
+ selection_query , title , "root-file "
356
360
): # type: ignore
357
361
yield f_info
358
362
@@ -462,7 +466,7 @@ async def get_data_rootfiles_uri_stream(
462
466
as_signed_url (bool): Return the uri as a presigned http url?
463
467
"""
464
468
async for f_info in self ._stream_url_buckets (
465
- selection_query , "root-files " , title , as_signed_url
469
+ selection_query , "root-file " , title , as_signed_url
466
470
): # type: ignore
467
471
yield f_info
468
472
@@ -591,7 +595,6 @@ async def _stream_url_buckets(
591
595
query = self ._build_json_query (selection_query , data_format , title )
592
596
593
597
async with aiohttp .ClientSession () as client :
594
-
595
598
# Get a request id - which might be cached, but if not, submit it.
596
599
request_id = await self ._get_request_id (client , query )
597
600
@@ -768,7 +771,7 @@ async def _stream_local_files(
768
771
async def _get_files (
769
772
self ,
770
773
selection_query : str ,
771
- data_type : str ,
774
+ data_format : str ,
772
775
notifier : _status_update_wrapper ,
773
776
title : Optional [str ],
774
777
) -> AsyncIterator [Tuple [str , Awaitable [Path ]]]:
@@ -787,7 +790,7 @@ async def _get_files(
787
790
Arguments:
788
791
789
792
selection_query The query string to send to ServiceX
790
- data_type The type of data that we want to come back.
793
+ data_format The type of data that we want to come back.
791
794
notifier Status callback to let our progress be advertised
792
795
title Title to pass to servicex backend.
793
796
@@ -797,10 +800,9 @@ async def _get_files(
797
800
This is returned this way so a number of downloads can run
798
801
simultaneously.
799
802
"""
800
- query = self ._build_json_query (selection_query , data_type , title )
803
+ query = self ._build_json_query (selection_query , data_format , title )
801
804
802
805
async with aiohttp .ClientSession () as client :
803
-
804
806
# Get a request id - which might be cached, but if not, submit it.
805
807
request_id = await self ._get_request_id (client , query )
806
808
@@ -950,7 +952,6 @@ async def _get_files_from_servicex(
950
952
start_time = time .monotonic ()
951
953
good = True
952
954
try :
953
-
954
955
# Get the stream of minio bucket new files.
955
956
stream_new_object = self ._get_minio_bucket_files_from_servicex (
956
957
request_id , client , minio_adaptor , notifier
@@ -1007,7 +1008,6 @@ async def _get_minio_bucket_files_from_servicex(
1007
1008
"""
1008
1009
start_time = time .monotonic ()
1009
1010
try :
1010
-
1011
1011
# Setup the status sequence from servicex
1012
1012
stream_status = transform_status_stream (
1013
1013
self ._servicex_adaptor , client , request_id
@@ -1034,23 +1034,25 @@ async def _get_minio_bucket_files_from_servicex(
1034
1034
)
1035
1035
1036
1036
def _build_json_query (
1037
- self , selection_query : str , data_type : str , title : Optional [str ]
1037
+ self , selection_query : str , data_format : str , title : Optional [str ]
1038
1038
) -> Dict [str , Union [str , Iterable [str ]]]:
1039
1039
"""
1040
1040
Returns a list of locally written files for a given selection query.
1041
1041
1042
1042
Arguments:
1043
1043
selection_query The query to be send into the ServiceX API
1044
- data_type What is the output data type (parquet, root-file, etc.)
1044
+ data_format What is the output data type (parquet, root-file, etc.)
1045
1045
1046
1046
Notes:
1047
1047
- Internal routine.
1048
1048
"""
1049
+ assert data_format in g_allowed_formats
1050
+
1049
1051
# Items that must always be present
1050
1052
json_query : Dict [str , Union [str , Iterable [str ]]] = {
1051
1053
"selection" : selection_query ,
1052
1054
"result-destination" : self ._result_destination ,
1053
- "result-format" : "parquet" if data_type == "parquet" else "root-file" ,
1055
+ "result-format" : "parquet" if data_format == "parquet" else "root-file" ,
1054
1056
"chunk-size" : "1000" ,
1055
1057
"workers" : str (self ._max_workers ),
1056
1058
}
0 commit comments