@@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-
114
114
115
115
Deprecated: This converter is deprecated and will be removed in a future release.
116
116
"""
117
-
117
+
118
118
def __init__ (self ) -> None :
119
- warnings .warn ("JSON converter is deprecated and will be removed in a future release" )
119
+ warnings .warn (
120
+ "JSON converter is deprecated and will be removed in a future release"
121
+ )
120
122
super ().__init__ ()
121
123
122
124
def _find_json_object_offsets (self , data : str ) -> List [Tuple [int , int ]]:
@@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods
397
399
"""
398
400
399
401
def __init__ (self ):
400
- warnings .warn ("_MultiGCSFileReader is deprecated and will be removed in a future release" )
402
+ warnings .warn (
403
+ "_MultiGCSFileReader is deprecated and will be removed in a future release"
404
+ )
401
405
super ().__init__ ()
402
406
self ._retrieval_strategy = None
403
407
@@ -414,6 +418,7 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
414
418
yield file_info , raw_data
415
419
result = self ._retrieval_strategy .get_next_chunk ()
416
420
421
+
417
422
class Stream (Generic [OutputT ]):
418
423
"""Streams data from a Reader."""
419
424
@@ -481,7 +486,7 @@ def __init__(
481
486
) -> None :
482
487
super ().__init__ (ctx )
483
488
self ._current_offset = offset
484
- self ._current_line : Optional [ int ] = None
489
+ self ._current_line = 0
485
490
if self ._current_offset >= self ._ctx .metadata_header .total_size :
486
491
raise ValueError (
487
492
f"offset is out of range, max offset is { self ._ctx .metadata_header .total_size - 1 } "
@@ -523,7 +528,8 @@ def __init__(
523
528
self ._ctx = ctx
524
529
self ._reader = _BufferedGCSFileReader ()
525
530
self ._converter = _BufferedJsonConverter ()
526
- self ._reader .set_retrieval_strategy (_BufferedFileRetrieverByOffset (self ._ctx , 0 ))
531
+ self ._reader .set_retrieval_strategy (
532
+ _BufferedFileRetrieverByOffset (self ._ctx , 0 ))
527
533
528
534
def __iter__ (self ):
529
535
yield from self ._fetch ()
@@ -564,13 +570,14 @@ class BufferedJsonConverterOutput:
564
570
class _BufferedJsonConverter (Converter [BufferedJsonConverterOutput ]):
565
571
"""Converts JSON data in a buffered manner
566
572
"""
573
+
567
574
def convert (
568
575
self , input_args : Converter .ConverterInputArgs
569
576
) -> Iterator [BufferedJsonConverterOutput ]:
570
577
yield BufferedJsonConverterOutput (json = json .loads (input_args .raw_data ))
571
578
572
579
573
- class _BufferedGCSFileReader (_Reader ):
580
+ class _BufferedGCSFileReader (_Reader ):
574
581
"""Reads data from multiple GCS files and buffer them to disk"""
575
582
576
583
def __init__ (self ):
@@ -599,10 +606,10 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
599
606
# read buffer
600
607
with open (temp_file .name , 'r' ) as temp_file_reopened :
601
608
for idx , line in enumerate (temp_file_reopened ):
602
- yield _MetadataFileInfo (
603
- offsets = Range ( start = 0 , end = len (line ) - 1 ),
604
- lines = Range (start = idx , end = idx + 1 ),
605
- file = temp_file .name ), line
609
+ yield _MetadataFileInfo (offsets = Range ( start = 0 ,
610
+ end = len (line ) - 1 ),
611
+ lines = Range (start = idx , end = idx + 1 ),
612
+ file = temp_file .name ), line
606
613
# manually delete buffer
607
614
os .unlink (temp_file .name )
608
615
@@ -733,9 +740,10 @@ def errors(self):
733
740
if metadata_header is None :
734
741
return None
735
742
BufferedStream (
736
- _TaskContext (self ._task .client , self ._task .uid , StreamType .ERRORS ,
737
- metadata_header ),
738
- ).start (stream_handler = lambda output : data .append (output .json ))
743
+ _TaskContext (
744
+ self ._task .client , self ._task .uid , StreamType .ERRORS ,
745
+ metadata_header ),).start (
746
+ stream_handler = lambda output : data .append (output .json ))
739
747
return data
740
748
741
749
@property
@@ -753,9 +761,10 @@ def result(self):
753
761
if metadata_header is None :
754
762
return []
755
763
BufferedStream (
756
- _TaskContext (self ._task .client , self ._task .uid ,
757
- StreamType .RESULT , metadata_header ),
758
- ).start (stream_handler = lambda output : data .append (output .json ))
764
+ _TaskContext (
765
+ self ._task .client , self ._task .uid , StreamType .RESULT ,
766
+ metadata_header ),).start (
767
+ stream_handler = lambda output : data .append (output .json ))
759
768
return data
760
769
return self ._task .result_url
761
770
@@ -830,26 +839,10 @@ def has_errors(self) -> bool:
830
839
total_size = self .get_total_file_size (StreamType .ERRORS )
831
840
return total_size is not None and total_size > 0
832
841
833
- @overload
834
- def get_stream (
835
- self ,
836
- converter : JsonConverter ,
837
- stream_type : StreamType = StreamType .RESULT ,
838
- ) -> Stream [JsonConverterOutput ]:
839
- """Overload for getting the right typing hints when using a JsonConverter."""
840
-
841
- @overload
842
- def get_stream (
843
- self ,
844
- converter : FileConverter ,
845
- stream_type : StreamType = StreamType .RESULT ,
846
- ) -> Stream [FileConverterOutput ]:
847
- """Overload for getting the right typing hints when using a FileConverter."""
848
-
849
842
def get_buffered_stream (
850
843
self ,
851
844
stream_type : StreamType = StreamType .RESULT ,
852
- ) -> Stream :
845
+ ) -> BufferedStream :
853
846
"""
854
847
Returns the result of the task.
855
848
@@ -876,15 +869,32 @@ def get_buffered_stream(
876
869
)
877
870
return BufferedStream (
878
871
_TaskContext (self ._task .client , self ._task .uid , stream_type ,
879
- metadata_header ),
880
- )
872
+ metadata_header ),)
873
+
874
+ @overload
875
+ def get_stream (
876
+ self ,
877
+ converter : JsonConverter ,
878
+ stream_type : StreamType = StreamType .RESULT ,
879
+ ) -> Stream [JsonConverterOutput ]:
880
+ """Overload for getting the right typing hints when using a JsonConverter."""
881
+
882
+ @overload
883
+ def get_stream (
884
+ self ,
885
+ converter : FileConverter ,
886
+ stream_type : StreamType = StreamType .RESULT ,
887
+ ) -> Stream [FileConverterOutput ]:
888
+ """Overload for getting the right typing hints when using a FileConverter."""
881
889
882
890
def get_stream (
883
891
self ,
884
892
converter : Optional [Converter ] = None ,
885
893
stream_type : StreamType = StreamType .RESULT ,
886
894
) -> Stream :
887
- warnings .warn ("get_stream is deprecated and will be removed in a future release, use get_buffered_stream" )
895
+ warnings .warn (
896
+ "get_stream is deprecated and will be removed in a future release, use get_buffered_stream"
897
+ )
888
898
if converter is None :
889
899
converter = JsonConverter ()
890
900
"""Returns the result of the task."""
0 commit comments