From 557909f42acaeb67cce0a996ed8677264def5830 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Wed, 9 Oct 2024 15:54:53 -0700 Subject: [PATCH] force utf8 --- .../src/labelbox/schema/export_task.py | 231 ++++++++++-------- 1 file changed, 131 insertions(+), 100 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index 06715748c..0ae9ce89d 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -111,7 +111,7 @@ class JsonConverterOutput: class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-public-methods """Converts JSON data. - + Deprecated: This converter is deprecated and will be removed in a future release. """ @@ -133,16 +133,17 @@ def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: current_object_start = index # we need to account for scenarios where data lands in the middle of an object # and the object is not the last one in the data - if index > 0 and data[index - - 1] == "\n" and not object_offsets: + if index > 0 and data[index - 1] == "\n" and not object_offsets: object_offsets.append((0, index - 1)) elif char == "}" and stack: stack.pop() # this covers cases where the last object is either followed by a newline or # it is missing - if len(stack) == 0 and (len(data) == index + 1 or - data[index + 1] == "\n" - ) and current_object_start is not None: + if ( + len(stack) == 0 + and (len(data) == index + 1 or data[index + 1] == "\n") + and current_object_start is not None + ): object_offsets.append((current_object_start, index + 1)) current_object_start = None @@ -162,7 +163,7 @@ def convert( yield JsonConverterOutput( current_offset=current_offset + offset_start, current_line=current_line + line, - json_str=raw_data[offset_start:offset_end + 1].strip(), + json_str=raw_data[offset_start : offset_end + 1].strip(), ) @@ -179,8 +180,7 @@ class FileConverterOutput: class FileConverter(Converter[FileConverterOutput]): - """Converts data to a file. - """ + """Converts data to a file.""" def __init__(self, file_path: str) -> None: super().__init__() @@ -224,8 +224,8 @@ def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: """Retrieves the file.""" def _get_file_content( - self, query: str, variables: dict, - result_field_name: str) -> Tuple[_MetadataFileInfo, str]: + self, query: str, variables: dict, result_field_name: str + ) -> Tuple[_MetadataFileInfo, str]: """Runs the query.""" res = self._ctx.client.execute(query, variables, error_log_key="errors") res = res["task"][result_field_name] @@ -233,14 +233,17 @@ def _get_file_content( if not file_info: raise ValueError( f"Task {self._ctx.task_id} does not have a metadata file for the " - f"{self._ctx.stream_type.value} stream") + f"{self._ctx.stream_type.value} stream" + ) response = requests.get(file_info.file, timeout=30) response.raise_for_status() - assert len( - response.content - ) == file_info.offsets.end - file_info.offsets.start + 1, ( + response.encoding = "utf-8" + assert ( + len(response.content) == file_info.offsets.end - file_info.offsets.start + 1 + ), ( f"expected {file_info.offsets.end - file_info.offsets.start + 1} bytes, " - f"got {len(response.content)} bytes") + f"got {len(response.content)} bytes" + ) return file_info, response.text @@ -260,8 +263,7 @@ def __init__( f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" ) - def _find_line_at_offset(self, file_content: str, - target_offset: int) -> int: + def _find_line_at_offset(self, file_content: str, target_offset: int) -> int: # TODO: Remove this, incorrect parsing of JSON to find braces stack = [] line_number = 0 @@ -288,22 +290,22 @@ def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: f"{{task(where: $where)" f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" f"{{offsets {{start end}} lines {{start end}} file}}" - f"}}}}") + f"}}}}" + ) variables = { - "where": { - "id": self._ctx.task_id - }, + "where": {"id": self._ctx.task_id}, "streamType": self._ctx.stream_type.value, "offset": str(self._current_offset), } file_info, file_content = self._get_file_content( - query, variables, "exportFileFromOffset") + query, variables, "exportFileFromOffset" + ) if self._current_line is None: self._current_line = self._find_line_at_offset( - file_content, self._current_offset - file_info.offsets.start) + file_content, self._current_offset - file_info.offsets.start + ) self._current_line += file_info.lines.start - file_content = file_content[self._current_offset - - file_info.offsets.start:] + file_content = file_content[self._current_offset - file_info.offsets.start :] file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 @@ -357,22 +359,22 @@ def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: f"{{task(where: $where)" f"{{{'exportFileFromLine'}(streamType: $streamType, line: $line)" f"{{offsets {{start end}} lines {{start end}} file}}" - f"}}}}") + f"}}}}" + ) variables = { - "where": { - "id": self._ctx.task_id - }, + "where": {"id": self._ctx.task_id}, "streamType": self._ctx.stream_type.value, "line": self._current_line, } file_info, file_content = self._get_file_content( - query, variables, "exportFileFromLine") + query, variables, "exportFileFromLine" + ) if self._current_offset is None: self._current_offset = self._find_offset_of_line( - file_content, self._current_line - file_info.lines.start) + file_content, self._current_line - file_info.lines.start + ) self._current_offset += file_info.offsets.start - file_content = file_content[self._current_offset - - file_info.offsets.start:] + file_content = file_content[self._current_offset - file_info.offsets.start :] file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 @@ -394,7 +396,7 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods """Reads data from multiple GCS files in a seamless way. - + Deprecated: This reader is deprecated and will be removed in a future release. """ @@ -437,7 +439,9 @@ def __init__( def __iter__(self): yield from self._fetch() - def _fetch(self,) -> Iterator[OutputT]: + def _fetch( + self, + ) -> Iterator[OutputT]: """Fetches the result data. Returns an iterator that yields the offset and the data. """ @@ -448,25 +452,21 @@ def _fetch(self,) -> Iterator[OutputT]: with self._converter as converter: for file_info, raw_data in stream: for output in converter.convert( - Converter.ConverterInputArgs(self._ctx, file_info, - raw_data)): + Converter.ConverterInputArgs(self._ctx, file_info, raw_data) + ): yield output def with_offset(self, offset: int) -> "Stream[OutputT]": """Sets the offset for the stream.""" - self._reader.set_retrieval_strategy( - FileRetrieverByOffset(self._ctx, offset)) + self._reader.set_retrieval_strategy(FileRetrieverByOffset(self._ctx, offset)) return self def with_line(self, line: int) -> "Stream[OutputT]": """Sets the line number for the stream.""" - self._reader.set_retrieval_strategy(FileRetrieverByLine( - self._ctx, line)) + self._reader.set_retrieval_strategy(FileRetrieverByLine(self._ctx, line)) return self - def start( - self, - stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: + def start(self, stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: """Starts streaming the result data. Calls the stream_handler for each result. """ @@ -501,16 +501,16 @@ def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: f"{{task(where: $where)" f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" f"{{offsets {{start end}} lines {{start end}} file}}" - f"}}}}") + f"}}}}" + ) variables = { - "where": { - "id": self._ctx.task_id - }, + "where": {"id": self._ctx.task_id}, "streamType": self._ctx.stream_type.value, "offset": str(self._current_offset), } file_info, file_content = self._get_file_content( - query, variables, "exportFileFromOffset") + query, variables, "exportFileFromOffset" + ) file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 @@ -529,12 +529,15 @@ def __init__( self._reader = _BufferedGCSFileReader() self._converter = _BufferedJsonConverter() self._reader.set_retrieval_strategy( - _BufferedFileRetrieverByOffset(self._ctx, 0)) + _BufferedFileRetrieverByOffset(self._ctx, 0) + ) def __iter__(self): yield from self._fetch() - def _fetch(self,) -> Iterator[OutputT]: + def _fetch( + self, + ) -> Iterator[OutputT]: """Fetches the result data. Returns an iterator that yields the offset and the data. """ @@ -545,13 +548,11 @@ def _fetch(self,) -> Iterator[OutputT]: with self._converter as converter: for file_info, raw_data in stream: for output in converter.convert( - Converter.ConverterInputArgs(self._ctx, file_info, - raw_data)): + Converter.ConverterInputArgs(self._ctx, file_info, raw_data) + ): yield output - def start( - self, - stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: + def start(self, stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: """Starts streaming the result data. Calls the stream_handler for each result. """ @@ -564,12 +565,12 @@ def start( @dataclass class BufferedJsonConverterOutput: """Output with the JSON object""" + json: Any class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): - """Converts JSON data in a buffered manner - """ + """Converts JSON data in a buffered manner""" def convert( self, input_args: Converter.ConverterInputArgs @@ -592,7 +593,7 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: if not self._retrieval_strategy: raise ValueError("retrieval strategy not set") # create a buffer - with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: result = self._retrieval_strategy.get_next_chunk() while result: _, raw_data = result @@ -604,12 +605,16 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: temp_file.write(raw_data) result = self._retrieval_strategy.get_next_chunk() # read buffer - with open(temp_file.name, 'r') as temp_file_reopened: + with open(temp_file.name, "r") as temp_file_reopened: for idx, line in enumerate(temp_file_reopened): - yield _MetadataFileInfo(offsets=Range(start=0, - end=len(line) - 1), - lines=Range(start=idx, end=idx + 1), - file=temp_file.name), line + yield ( + _MetadataFileInfo( + offsets=Range(start=0, end=len(line) - 1), + lines=Range(start=idx, end=idx + 1), + file=temp_file.name, + ), + line, + ) # manually delete buffer os.unlink(temp_file.name) @@ -632,8 +637,11 @@ def __init__(self, task: Task, is_export_v2: bool = False) -> None: self._task = task def __repr__(self): - return f"" if getattr( - self, "uid", None) else "" + return ( + f"" + if getattr(self, "uid", None) + else "" + ) def __str__(self): properties_to_include = [ @@ -702,8 +710,13 @@ def result_url(self): "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) base_url = self._task.client.rest_endpoint - return base_url + '/export-results/' + self._task.uid + '/' + self._task.client.get_organization( - ).uid + return ( + base_url + + "/export-results/" + + self._task.uid + + "/" + + self._task.client.get_organization().uid + ) @property def errors_url(self): @@ -715,8 +728,13 @@ def errors_url(self): if not self.has_errors(): return None base_url = self._task.client.rest_endpoint - return base_url + '/export-errors/' + self._task.uid + '/' + self._task.client.get_organization( - ).uid + return ( + base_url + + "/export-errors/" + + self._task.uid + + "/" + + self._task.client.get_organization().uid + ) @property def errors(self): @@ -736,14 +754,15 @@ def errors(self): data = [] metadata_header = ExportTask._get_metadata_header( - self._task.client, self._task.uid, StreamType.ERRORS) + self._task.client, self._task.uid, StreamType.ERRORS + ) if metadata_header is None: return None BufferedStream( _TaskContext( - self._task.client, self._task.uid, StreamType.ERRORS, - metadata_header),).start( - stream_handler=lambda output: data.append(output.json)) + self._task.client, self._task.uid, StreamType.ERRORS, metadata_header + ), + ).start(stream_handler=lambda output: data.append(output.json)) return data @property @@ -757,14 +776,18 @@ def result(self): data = [] metadata_header = ExportTask._get_metadata_header( - self._task.client, self._task.uid, StreamType.RESULT) + self._task.client, self._task.uid, StreamType.RESULT + ) if metadata_header is None: return [] BufferedStream( _TaskContext( - self._task.client, self._task.uid, StreamType.RESULT, - metadata_header),).start( - stream_handler=lambda output: data.append(output.json)) + self._task.client, + self._task.uid, + StreamType.RESULT, + metadata_header, + ), + ).start(stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @@ -798,15 +821,17 @@ def wait_till_done(self, timeout_seconds: int = 7200) -> None: @staticmethod @lru_cache(maxsize=5) def _get_metadata_header( - client, task_id: str, - stream_type: StreamType) -> Union[_MetadataHeader, None]: + client, task_id: str, stream_type: StreamType + ) -> Union[_MetadataHeader, None]: """Returns the total file size for a specific task.""" - query = (f"query GetExportMetadataHeaderPyApi" - f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!)" - f"{{task(where: $where)" - f"{{{'exportMetadataHeader'}(streamType: $streamType)" - f"{{totalSize totalLines}}" - f"}}}}") + query = ( + f"query GetExportMetadataHeaderPyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!)" + f"{{task(where: $where)" + f"{{{'exportMetadataHeader'}(streamType: $streamType)" + f"{{totalSize totalLines}}" + f"}}}}" + ) variables = {"where": {"id": task_id}, "streamType": stream_type.value} res = client.execute(query, variables, error_log_key="errors") res = res["task"]["exportMetadataHeader"] @@ -818,8 +843,9 @@ def get_total_file_size(self, stream_type: StreamType) -> Union[int, None]: raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") - header = ExportTask._get_metadata_header(self._task.client, - self._task.uid, stream_type) + header = ExportTask._get_metadata_header( + self._task.client, self._task.uid, stream_type + ) return header.total_size if header else None def get_total_lines(self, stream_type: StreamType) -> Union[int, None]: @@ -828,8 +854,9 @@ def get_total_lines(self, stream_type: StreamType) -> Union[int, None]: raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") - header = ExportTask._get_metadata_header(self._task.client, - self._task.uid, stream_type) + header = ExportTask._get_metadata_header( + self._task.client, self._task.uid, stream_type + ) return header.total_lines if header else None def has_result(self) -> bool: @@ -864,15 +891,18 @@ def get_buffered_stream( if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") - metadata_header = self._get_metadata_header(self._task.client, - self._task.uid, stream_type) + metadata_header = self._get_metadata_header( + self._task.client, self._task.uid, stream_type + ) if metadata_header is None: raise ValueError( f"Task {self._task.uid} does not have a {stream_type.value} stream" ) return BufferedStream( - _TaskContext(self._task.client, self._task.uid, stream_type, - metadata_header),) + _TaskContext( + self._task.client, self._task.uid, stream_type, metadata_header + ), + ) @overload def get_stream( @@ -906,15 +936,17 @@ def get_stream( if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") - metadata_header = self._get_metadata_header(self._task.client, - self._task.uid, stream_type) + metadata_header = self._get_metadata_header( + self._task.client, self._task.uid, stream_type + ) if metadata_header is None: raise ValueError( f"Task {self._task.uid} does not have a {stream_type.value} stream" ) return Stream( - _TaskContext(self._task.client, self._task.uid, stream_type, - metadata_header), + _TaskContext( + self._task.client, self._task.uid, stream_type, metadata_header + ), _MultiGCSFileReader(), converter, ) @@ -923,4 +955,3 @@ def get_stream( def get_task(client, task_id): """Returns the task with the given id.""" return ExportTask(Task.get_task(client, task_id)) - \ No newline at end of file