From 2ce728f978b5f8b1229ccb39b6be2ead04b31564 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:51:56 -0500 Subject: [PATCH 1/7] get_stream removed from sdk --- libs/labelbox/src/labelbox/__init__.py | 4 - .../src/labelbox/schema/export_task.py | 399 +----------------- 2 files changed, 1 insertion(+), 402 deletions(-) diff --git a/libs/labelbox/src/labelbox/__init__.py b/libs/labelbox/src/labelbox/__init__.py index f9b82b422..08655e833 100644 --- a/libs/labelbox/src/labelbox/__init__.py +++ b/libs/labelbox/src/labelbox/__init__.py @@ -25,10 +25,6 @@ from labelbox.schema.export_task import ( StreamType, ExportTask, - JsonConverter, - JsonConverterOutput, - FileConverter, - FileConverterOutput, BufferedJsonConverterOutput, ) from labelbox.schema.labeling_frontend import ( diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index a144f4c76..b2e1d054f 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -2,31 +2,26 @@ from dataclasses import dataclass from enum import Enum from functools import lru_cache -from io import TextIOWrapper import json -from pathlib import Path from typing import ( Callable, Generic, Iterator, - List, Optional, Tuple, TypeVar, Union, TYPE_CHECKING, - overload, Any, ) import requests -import warnings import tempfile import os from labelbox.schema.task import Task from labelbox.utils import _CamelCaseMixin -from pydantic import BaseModel, Field, AliasChoices +from pydantic import BaseModel if TYPE_CHECKING: from labelbox import Client @@ -100,122 +95,6 @@ def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: """ -@dataclass -class JsonConverterOutput: - """Output with the JSON string.""" - - current_offset: int - current_line: int - json_str: str - - -class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-public-methods - """Converts JSON data. - - Deprecated: This converter is deprecated and will be removed in a future release. - """ - - def __init__(self) -> None: - warnings.warn( - "JSON converter is deprecated and will be removed in a future release" - ) - super().__init__() - - def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: - object_offsets: List[Tuple[int, int]] = [] - stack = [] - current_object_start = None - - for index, char in enumerate(data): - if char == "{": - stack.append(char) - if len(stack) == 1: - current_object_start = index - # we need to account for scenarios where data lands in the middle of an object - # and the object is not the last one in the data - if ( - index > 0 - and data[index - 1] == "\n" - and not object_offsets - ): - object_offsets.append((0, index - 1)) - elif char == "}" and stack: - stack.pop() - # this covers cases where the last object is either followed by a newline or - # it is missing - if ( - len(stack) == 0 - and (len(data) == index + 1 or data[index + 1] == "\n") - and current_object_start is not None - ): - object_offsets.append((current_object_start, index + 1)) - current_object_start = None - - # we also need to account for scenarios where data lands in the middle of the last object - return object_offsets if object_offsets else [(0, len(data) - 1)] - - def convert( - self, input_args: Converter.ConverterInputArgs - ) -> Iterator[JsonConverterOutput]: - current_offset, current_line, raw_data = ( - input_args.file_info.offsets.start, - input_args.file_info.lines.start, - input_args.raw_data, - ) - offsets = self._find_json_object_offsets(raw_data) - for line, (offset_start, offset_end) in enumerate(offsets): - yield JsonConverterOutput( - current_offset=current_offset + offset_start, - current_line=current_line + line, - json_str=raw_data[offset_start : offset_end + 1].strip(), - ) - - -@dataclass -class FileConverterOutput: - """Output with statistics about the written file.""" - - file_path: Path - total_size: int - total_lines: int - current_offset: int - current_line: int - bytes_written: int - - -class FileConverter(Converter[FileConverterOutput]): - """Converts data to a file.""" - - def __init__(self, file_path: str) -> None: - super().__init__() - self._file: Optional[TextIOWrapper] = None - self._file_path = file_path - - def __enter__(self): - self._file = open(self._file_path, "w", encoding="utf-8") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._file: - self._file.close() - return False - - def convert( - self, input_args: Converter.ConverterInputArgs - ) -> Iterator[FileConverterOutput]: - # appends data to the file - assert self._file is not None - self._file.write(input_args.raw_data) - yield FileConverterOutput( - file_path=Path(self._file_path), - total_size=input_args.ctx.metadata_header.total_size, - total_lines=input_args.ctx.metadata_header.total_lines, - current_offset=input_args.file_info.offsets.start, - current_line=input_args.file_info.lines.start, - bytes_written=len(input_args.raw_data), - ) - - class FileRetrieverStrategy(ABC): # pylint: disable=too-few-public-methods """Abstract class for retrieving files.""" @@ -251,147 +130,6 @@ def _get_file_content( return file_info, response.text -class FileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods - """Retrieves files by offset.""" - - def __init__( - self, - ctx: _TaskContext, - offset: int, - ) -> None: - super().__init__(ctx) - self._current_offset = offset - self._current_line: Optional[int] = None - if self._current_offset >= self._ctx.metadata_header.total_size: - raise ValueError( - f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" - ) - - def _find_line_at_offset( - self, file_content: str, target_offset: int - ) -> int: - # TODO: Remove this, incorrect parsing of JSON to find braces - stack = [] - line_number = 0 - - for index, char in enumerate(file_content): - if char == "{": - stack.append(char) - if len(stack) == 1 and index > 0: - line_number += 1 - elif char == "}" and stack: - stack.pop() - - if index == target_offset: - break - - return line_number - - def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: - if self._current_offset >= self._ctx.metadata_header.total_size: - return None - query = ( - f"query GetExportFileFromOffsetPyApi" - f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" - f"{{task(where: $where)" - f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" - f"{{offsets {{start end}} lines {{start end}} file}}" - f"}}}}" - ) - variables = { - "where": {"id": self._ctx.task_id}, - "streamType": self._ctx.stream_type.value, - "offset": str(self._current_offset), - } - file_info, file_content = self._get_file_content( - query, variables, "exportFileFromOffset" - ) - if self._current_line is None: - self._current_line = self._find_line_at_offset( - file_content, self._current_offset - file_info.offsets.start - ) - self._current_line += file_info.lines.start - file_content = file_content[ - self._current_offset - file_info.offsets.start : - ] - file_info.offsets.start = self._current_offset - file_info.lines.start = self._current_line - self._current_offset = file_info.offsets.end + 1 - self._current_line = file_info.lines.end + 1 - return file_info, file_content - - -class FileRetrieverByLine(FileRetrieverStrategy): # pylint: disable=too-few-public-methods - """Retrieves files by line.""" - - def __init__( - self, - ctx: _TaskContext, - line: int, - ) -> None: - super().__init__(ctx) - self._current_line = line - self._current_offset: Optional[int] = None - if self._current_line >= self._ctx.metadata_header.total_lines: - raise ValueError( - f"line is out of range, max line is {self._ctx.metadata_header.total_lines - 1}" - ) - - def _find_offset_of_line(self, file_content: str, target_line: int): - # TODO: Remove this, incorrect parsing of JSON to find braces - start_offset = None - stack = [] - line_number = 0 - - for index, char in enumerate(file_content): - if char == "{": - stack.append(char) - if len(stack) == 1: - if line_number == target_line: - start_offset = index - line_number += 1 - elif char == "}" and stack: - stack.pop() - - if line_number > target_line: - break - - return start_offset - - def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: - if self._current_line >= self._ctx.metadata_header.total_lines: - return None - query = ( - f"query GetExportFileFromLinePyApi" - f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $line: UInt64!)" - f"{{task(where: $where)" - f"{{{'exportFileFromLine'}(streamType: $streamType, line: $line)" - f"{{offsets {{start end}} lines {{start end}} file}}" - f"}}}}" - ) - variables = { - "where": {"id": self._ctx.task_id}, - "streamType": self._ctx.stream_type.value, - "line": self._current_line, - } - file_info, file_content = self._get_file_content( - query, variables, "exportFileFromLine" - ) - if self._current_offset is None: - self._current_offset = self._find_offset_of_line( - file_content, self._current_line - file_info.lines.start - ) - self._current_offset += file_info.offsets.start - file_content = file_content[ - self._current_offset - file_info.offsets.start : - ] - file_info.offsets.start = self._current_offset - file_info.lines.start = self._current_line - self._current_offset = file_info.offsets.end + 1 - self._current_line = file_info.lines.end + 1 - return file_info, file_content - - class _Reader(ABC): # pylint: disable=too-few-public-methods """Abstract class for reading data from a source.""" @@ -404,94 +142,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: """Reads data from the source.""" -class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods - """Reads data from multiple GCS files in a seamless way. - - Deprecated: This reader is deprecated and will be removed in a future release. - """ - - def __init__(self): - warnings.warn( - "_MultiGCSFileReader is deprecated and will be removed in a future release" - ) - super().__init__() - self._retrieval_strategy = None - - def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: - """Sets the retrieval strategy.""" - self._retrieval_strategy = strategy - - def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: - if not self._retrieval_strategy: - raise ValueError("retrieval strategy not set") - result = self._retrieval_strategy.get_next_chunk() - while result: - file_info, raw_data = result - yield file_info, raw_data - result = self._retrieval_strategy.get_next_chunk() - - -class Stream(Generic[OutputT]): - """Streams data from a Reader.""" - - def __init__( - self, - ctx: _TaskContext, - reader: _Reader, - converter: Converter, - ): - self._ctx = ctx - self._reader = reader - self._converter = converter - # default strategy is to retrieve files by offset, starting from 0 - self.with_offset(0) - - def __iter__(self): - yield from self._fetch() - - def _fetch( - self, - ) -> Iterator[OutputT]: - """Fetches the result data. - Returns an iterator that yields the offset and the data. - """ - if self._ctx.metadata_header.total_size is None: - return - - stream = self._reader.read() - with self._converter as converter: - for file_info, raw_data in stream: - for output in converter.convert( - Converter.ConverterInputArgs(self._ctx, file_info, raw_data) - ): - yield output - - def with_offset(self, offset: int) -> "Stream[OutputT]": - """Sets the offset for the stream.""" - self._reader.set_retrieval_strategy( - FileRetrieverByOffset(self._ctx, offset) - ) - return self - - def with_line(self, line: int) -> "Stream[OutputT]": - """Sets the line number for the stream.""" - self._reader.set_retrieval_strategy( - FileRetrieverByLine(self._ctx, line) - ) - return self - - def start( - self, stream_handler: Optional[Callable[[OutputT], None]] = None - ) -> None: - """Starts streaming the result data. - Calls the stream_handler for each result. - """ - # this calls the __iter__ method, which in turn calls the _fetch method - for output in self: - if stream_handler: - stream_handler(output) - - class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods """Retrieves files by offset.""" @@ -925,53 +575,6 @@ def get_buffered_stream( ), ) - @overload - def get_stream( - self, - converter: JsonConverter, - stream_type: StreamType = StreamType.RESULT, - ) -> Stream[JsonConverterOutput]: - """Overload for getting the right typing hints when using a JsonConverter.""" - - @overload - def get_stream( - self, - converter: FileConverter, - stream_type: StreamType = StreamType.RESULT, - ) -> Stream[FileConverterOutput]: - """Overload for getting the right typing hints when using a FileConverter.""" - - def get_stream( - self, - converter: Optional[Converter] = None, - stream_type: StreamType = StreamType.RESULT, - ) -> Stream: - warnings.warn( - "get_stream is deprecated and will be removed in a future release, use get_buffered_stream" - ) - if converter is None: - converter = JsonConverter() - """Returns the result of the task.""" - if self._task.status == "FAILED": - raise ExportTask.ExportTaskException("Task failed") - if self._task.status != "COMPLETE": - raise ExportTask.ExportTaskException("Task is not ready yet") - - metadata_header = self._get_metadata_header( - self._task.client, self._task.uid, stream_type - ) - if metadata_header is None: - raise ValueError( - f"Task {self._task.uid} does not have a {stream_type.value} stream" - ) - return Stream( - _TaskContext( - self._task.client, self._task.uid, stream_type, metadata_header - ), - _MultiGCSFileReader(), - converter, - ) - @staticmethod def get_task(client, task_id): """Returns the task with the given id.""" From 6254f9955c30e40540eb395446184e570adab2e2 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:54:18 -0500 Subject: [PATCH 2/7] Removed from tests --- .../test_export_embeddings_streamable.py | 16 ++++------------ .../streamable/test_export_video_streamable.py | 4 +++- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py index 25e58e2dc..125f4d392 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py @@ -23,12 +23,8 @@ def test_export_embeddings_precomputed( assert export_task.has_errors() is False results = [] - export_task.get_stream( - converter=JsonConverter(), stream_type=StreamType.RESULT - ).start( - stream_handler=lambda output: results.append( - json.loads(output.json_str) - ) + export_task.get_buffered_stream(stream_type=StreamType.RESULT).start( + stream_handler=lambda output: results.append(output.json) ) assert len(results) == len(data_row_specs) @@ -69,12 +65,8 @@ def test_export_embeddings_custom( assert export_task.has_errors() is False results = [] - export_task.get_stream( - converter=JsonConverter(), stream_type=StreamType.RESULT - ).start( - stream_handler=lambda output: results.append( - json.loads(output.json_str) - ) + export_task.get_buffered_stream(stream_type=StreamType.RESULT).start( + stream_handler=lambda output: results.append(output.json) ) assert len(results) == 1 diff --git a/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py index 28ef6e0cf..33a6b494c 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py @@ -71,7 +71,9 @@ def test_export( export_task.get_total_file_size(stream_type=StreamType.RESULT) > 0 ) - export_data = json.loads(list(export_task.get_stream())[0].json_str) + export_data = json.loads( + list(export_task.get_buffered_stream())[0].json + ) data_row_export = export_data["data_row"] assert data_row_export["global_key"] == video_data_row["global_key"] assert data_row_export["row_data"] == video_data_row["row_data"] From b1abe80bcb8d083ab56c63b9efc10c5cd9309869 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:56:47 -0500 Subject: [PATCH 3/7] fixed bug --- .../data/export/streamable/test_export_video_streamable.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py index 33a6b494c..56ce4c646 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_video_streamable.py @@ -71,9 +71,8 @@ def test_export( export_task.get_total_file_size(stream_type=StreamType.RESULT) > 0 ) - export_data = json.loads( - list(export_task.get_buffered_stream())[0].json - ) + export_data = list(export_task.get_buffered_stream())[0].json + data_row_export = export_data["data_row"] assert data_row_export["global_key"] == video_data_row["global_key"] assert data_row_export["row_data"] == video_data_row["row_data"] From 1d6fc3b81b967f8eece66699098a6b9f69bd6188 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:08:47 -0500 Subject: [PATCH 4/7] removed unit tests --- .../export_task/test_unit_file_converter.py | 77 ----------- .../test_unit_file_retriever_by_line.py | 126 ------------------ .../test_unit_file_retriever_by_offset.py | 87 ------------ .../export_task/test_unit_json_converter.py | 112 ---------------- 4 files changed, 402 deletions(-) delete mode 100644 libs/labelbox/tests/unit/export_task/test_unit_file_converter.py delete mode 100644 libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_line.py delete mode 100644 libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_offset.py delete mode 100644 libs/labelbox/tests/unit/export_task/test_unit_json_converter.py diff --git a/libs/labelbox/tests/unit/export_task/test_unit_file_converter.py b/libs/labelbox/tests/unit/export_task/test_unit_file_converter.py deleted file mode 100644 index 81e9eb60f..000000000 --- a/libs/labelbox/tests/unit/export_task/test_unit_file_converter.py +++ /dev/null @@ -1,77 +0,0 @@ -from unittest.mock import MagicMock - -from labelbox.schema.export_task import ( - Converter, - FileConverter, - Range, - StreamType, - _MetadataFileInfo, - _MetadataHeader, - _TaskContext, -) - - -class TestFileConverter: - def test_with_correct_ndjson(self, tmp_path, generate_random_ndjson): - directory = tmp_path / "file-converter" - directory.mkdir() - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - input_args = Converter.ConverterInputArgs( - ctx=_TaskContext( - client=MagicMock(), - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ), - file_info=_MetadataFileInfo( - offsets=Range(start=0, end=len(file_content) - 1), - lines=Range(start=0, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - path = directory / "output.ndjson" - with FileConverter(file_path=path) as converter: - for output in converter.convert(input_args): - assert output.current_line == 0 - assert output.current_offset == 0 - assert output.file_path == path - assert output.total_lines == line_count - assert output.total_size == len(file_content) - assert output.bytes_written == len(file_content) - - def test_with_no_newline_at_end(self, tmp_path, generate_random_ndjson): - directory = tmp_path / "file-converter" - directory.mkdir() - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) - input_args = Converter.ConverterInputArgs( - ctx=_TaskContext( - client=MagicMock(), - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ), - file_info=_MetadataFileInfo( - offsets=Range(start=0, end=len(file_content) - 1), - lines=Range(start=0, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - path = directory / "output.ndjson" - with FileConverter(file_path=path) as converter: - for output in converter.convert(input_args): - assert output.current_line == 0 - assert output.current_offset == 0 - assert output.file_path == path - assert output.total_lines == line_count - assert output.total_size == len(file_content) - assert output.bytes_written == len(file_content) diff --git a/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_line.py b/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_line.py deleted file mode 100644 index 37c93647e..000000000 --- a/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_line.py +++ /dev/null @@ -1,126 +0,0 @@ -from unittest.mock import MagicMock, patch -from labelbox.schema.export_task import ( - FileRetrieverByLine, - _TaskContext, - _MetadataHeader, - StreamType, -) - - -class TestFileRetrieverByLine: - def test_by_line_from_start(self, generate_random_ndjson, mock_response): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - - mock_client = MagicMock() - mock_client.execute = MagicMock( - return_value={ - "task": { - "exportFileFromLine": { - "offsets": {"start": "0", "end": len(file_content) - 1}, - "lines": {"start": "0", "end": str(line_count - 1)}, - "file": "http://some-url.com/file.ndjson", - } - } - } - ) - - mock_ctx = _TaskContext( - client=mock_client, - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ) - - with patch("requests.get", return_value=mock_response(file_content)): - retriever = FileRetrieverByLine(mock_ctx, 0) - info, content = retriever.get_next_chunk() - assert info.offsets.start == 0 - assert info.offsets.end == len(file_content) - 1 - assert info.lines.start == 0 - assert info.lines.end == line_count - 1 - assert info.file == "http://some-url.com/file.ndjson" - assert content == file_content - - def test_by_line_from_middle(self, generate_random_ndjson, mock_response): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - - mock_client = MagicMock() - mock_client.execute = MagicMock( - return_value={ - "task": { - "exportFileFromLine": { - "offsets": {"start": "0", "end": len(file_content) - 1}, - "lines": {"start": "0", "end": str(line_count - 1)}, - "file": "http://some-url.com/file.ndjson", - } - } - } - ) - - mock_ctx = _TaskContext( - client=mock_client, - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ) - - line_start = 5 - current_offset = file_content.find(ndjson[line_start]) - - with patch("requests.get", return_value=mock_response(file_content)): - retriever = FileRetrieverByLine(mock_ctx, line_start) - info, content = retriever.get_next_chunk() - assert info.offsets.start == current_offset - assert info.offsets.end == len(file_content) - 1 - assert info.lines.start == line_start - assert info.lines.end == line_count - 1 - assert info.file == "http://some-url.com/file.ndjson" - assert content == file_content[current_offset:] - - def test_by_line_from_last(self, generate_random_ndjson, mock_response): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - - mock_client = MagicMock() - mock_client.execute = MagicMock( - return_value={ - "task": { - "exportFileFromLine": { - "offsets": {"start": "0", "end": len(file_content) - 1}, - "lines": {"start": "0", "end": str(line_count - 1)}, - "file": "http://some-url.com/file.ndjson", - } - } - } - ) - - mock_ctx = _TaskContext( - client=mock_client, - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ) - - line_start = 9 - current_offset = file_content.find(ndjson[line_start]) - - with patch("requests.get", return_value=mock_response(file_content)): - retriever = FileRetrieverByLine(mock_ctx, line_start) - info, content = retriever.get_next_chunk() - assert info.offsets.start == current_offset - assert info.offsets.end == len(file_content) - 1 - assert info.lines.start == line_start - assert info.lines.end == line_count - 1 - assert info.file == "http://some-url.com/file.ndjson" - assert content == file_content[current_offset:] diff --git a/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_offset.py b/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_offset.py deleted file mode 100644 index 870e03307..000000000 --- a/libs/labelbox/tests/unit/export_task/test_unit_file_retriever_by_offset.py +++ /dev/null @@ -1,87 +0,0 @@ -from unittest.mock import MagicMock, patch -from labelbox.schema.export_task import ( - FileRetrieverByOffset, - _TaskContext, - _MetadataHeader, - StreamType, -) - - -class TestFileRetrieverByOffset: - def test_by_offset_from_start(self, generate_random_ndjson, mock_response): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - - mock_client = MagicMock() - mock_client.execute = MagicMock( - return_value={ - "task": { - "exportFileFromOffset": { - "offsets": {"start": "0", "end": len(file_content) - 1}, - "lines": {"start": "0", "end": str(line_count - 1)}, - "file": "http://some-url.com/file.ndjson", - } - } - } - ) - - mock_ctx = _TaskContext( - client=mock_client, - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ) - - with patch("requests.get", return_value=mock_response(file_content)): - retriever = FileRetrieverByOffset(mock_ctx, 0) - info, content = retriever.get_next_chunk() - assert info.offsets.start == 0 - assert info.offsets.end == len(file_content) - 1 - assert info.lines.start == 0 - assert info.lines.end == line_count - 1 - assert info.file == "http://some-url.com/file.ndjson" - assert content == file_content - - def test_by_offset_from_middle(self, generate_random_ndjson, mock_response): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - - mock_client = MagicMock() - mock_client.execute = MagicMock( - return_value={ - "task": { - "exportFileFromOffset": { - "offsets": {"start": "0", "end": len(file_content) - 1}, - "lines": {"start": "0", "end": str(line_count - 1)}, - "file": "http://some-url.com/file.ndjson", - } - } - } - ) - - mock_ctx = _TaskContext( - client=mock_client, - task_id="task-id", - stream_type=StreamType.RESULT, - metadata_header=_MetadataHeader( - total_size=len(file_content), total_lines=line_count - ), - ) - - line_start = 5 - skipped_bytes = 15 - current_offset = file_content.find(ndjson[line_start]) + skipped_bytes - - with patch("requests.get", return_value=mock_response(file_content)): - retriever = FileRetrieverByOffset(mock_ctx, current_offset) - info, content = retriever.get_next_chunk() - assert info.offsets.start == current_offset - assert info.offsets.end == len(file_content) - 1 - assert info.lines.start == 5 - assert info.lines.end == line_count - 1 - assert info.file == "http://some-url.com/file.ndjson" - assert content == file_content[current_offset:] diff --git a/libs/labelbox/tests/unit/export_task/test_unit_json_converter.py b/libs/labelbox/tests/unit/export_task/test_unit_json_converter.py deleted file mode 100644 index f5ccf26fb..000000000 --- a/libs/labelbox/tests/unit/export_task/test_unit_json_converter.py +++ /dev/null @@ -1,112 +0,0 @@ -from unittest.mock import MagicMock - -from labelbox.schema.export_task import ( - Converter, - JsonConverter, - Range, - _MetadataFileInfo, -) - - -class TestJsonConverter: - def test_with_correct_ndjson(self, generate_random_ndjson): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - input_args = Converter.ConverterInputArgs( - ctx=MagicMock(), - file_info=_MetadataFileInfo( - offsets=Range(start=0, end=len(file_content) - 1), - lines=Range(start=0, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - with JsonConverter() as converter: - current_offset = 0 - for idx, output in enumerate(converter.convert(input_args)): - assert output.current_line == idx - assert output.current_offset == current_offset - assert output.json_str == ndjson[idx] - current_offset += len(output.json_str) + 1 - - def test_with_no_newline_at_end(self, generate_random_ndjson): - line_count = 10 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) - input_args = Converter.ConverterInputArgs( - ctx=MagicMock(), - file_info=_MetadataFileInfo( - offsets=Range(start=0, end=len(file_content) - 1), - lines=Range(start=0, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - with JsonConverter() as converter: - current_offset = 0 - for idx, output in enumerate(converter.convert(input_args)): - assert output.current_line == idx - assert output.current_offset == current_offset - assert output.json_str == ndjson[idx] - current_offset += len(output.json_str) + 1 - - def test_from_offset(self, generate_random_ndjson): - # testing middle of a JSON string, but not the last line - line_count = 10 - line_start = 5 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - offset_end = len(file_content) - skipped_bytes = 15 - current_offset = file_content.find(ndjson[line_start]) + skipped_bytes - file_content = file_content[current_offset:] - - input_args = Converter.ConverterInputArgs( - ctx=MagicMock(), - file_info=_MetadataFileInfo( - offsets=Range(start=current_offset, end=offset_end), - lines=Range(start=line_start, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - with JsonConverter() as converter: - for idx, output in enumerate(converter.convert(input_args)): - assert output.current_line == line_start + idx - assert output.current_offset == current_offset - assert ( - output.json_str == ndjson[line_start + idx][skipped_bytes:] - ) - current_offset += len(output.json_str) + 1 - skipped_bytes = 0 - - def test_from_offset_last_line(self, generate_random_ndjson): - # testing middle of a JSON string, but not the last line - line_count = 10 - line_start = 9 - ndjson = generate_random_ndjson(line_count) - file_content = "\n".join(ndjson) + "\n" - offset_end = len(file_content) - skipped_bytes = 15 - current_offset = file_content.find(ndjson[line_start]) + skipped_bytes - file_content = file_content[current_offset:] - - input_args = Converter.ConverterInputArgs( - ctx=MagicMock(), - file_info=_MetadataFileInfo( - offsets=Range(start=current_offset, end=offset_end), - lines=Range(start=line_start, end=line_count - 1), - file="file.ndjson", - ), - raw_data=file_content, - ) - with JsonConverter() as converter: - for idx, output in enumerate(converter.convert(input_args)): - assert output.current_line == line_start + idx - assert output.current_offset == current_offset - assert ( - output.json_str == ndjson[line_start + idx][skipped_bytes:] - ) - current_offset += len(output.json_str) + 1 - skipped_bytes = 0 From 7ee90d194bfce4e491eda3657bb3196d8a5904fc Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:46:21 -0500 Subject: [PATCH 5/7] Removed get_stream from data tests --- .../test_export_data_rows_streamable.py | 14 ++-------- .../test_export_dataset_streamable.py | 12 ++++---- .../test_export_model_run_streamable.py | 4 +-- .../test_export_project_streamable.py | 28 +++++++++---------- 4 files changed, 25 insertions(+), 33 deletions(-) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py index 3e4efbc46..8dfa1df72 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py @@ -1,7 +1,5 @@ -import json import time -import pytest from labelbox import DataRow, ExportTask, StreamType @@ -27,9 +25,7 @@ def test_with_data_row_object( ) assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 assert ( - json.loads(list(export_task.get_stream())[0].json_str)["data_row"][ - "id" - ] + list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid ) @@ -75,9 +71,7 @@ def test_with_id(self, client, data_row, wait_for_data_row_processing): ) assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 assert ( - json.loads(list(export_task.get_stream())[0].json_str)["data_row"][ - "id" - ] + list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid ) @@ -101,9 +95,7 @@ def test_with_global_key( ) assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 assert ( - json.loads(list(export_task.get_stream())[0].json_str)["data_row"][ - "id" - ] + list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid ) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_dataset_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_dataset_streamable.py index 57f617a00..0d34a40b1 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_dataset_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_dataset_streamable.py @@ -25,8 +25,8 @@ def test_export(self, dataset, data_rows): ) == len(expected_data_row_ids) data_row_ids = list( map( - lambda x: json.loads(x.json_str)["data_row"]["id"], - export_task.get_stream(), + lambda x: x.json["data_row"]["id"], + export_task.get_buffered_stream(), ) ) assert data_row_ids.sort() == expected_data_row_ids.sort() @@ -58,8 +58,8 @@ def test_with_data_row_filter(self, dataset, data_rows): ) data_row_ids = list( map( - lambda x: json.loads(x.json_str)["data_row"]["id"], - export_task.get_stream(), + lambda x: x.json["data_row"]["id"], + export_task.get_buffered_stream(), ) ) assert data_row_ids.sort() == expected_data_row_ids.sort() @@ -91,8 +91,8 @@ def test_with_global_key_filter(self, dataset, data_rows): ) global_keys = list( map( - lambda x: json.loads(x.json_str)["data_row"]["global_key"], - export_task.get_stream(), + lambda x: x.json["data_row"]["global_key"], + export_task.get_buffered_stream(), ) ) assert global_keys.sort() == expected_global_keys.sort() diff --git a/libs/labelbox/tests/data/export/streamable/test_export_model_run_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_model_run_streamable.py index ada493fc3..7a583198b 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_model_run_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_model_run_streamable.py @@ -27,8 +27,8 @@ def test_export(self, model_run_with_data_rows): stream_type=StreamType.RESULT ) == len(expected_data_rows) - for data in export_task.get_stream(): - obj = json.loads(data.json_str) + for data in export_task.get_buffered_stream(): + obj = data.json assert ( "media_attributes" in obj and obj["media_attributes"] is not None diff --git a/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py index 818a0178c..bf6df3363 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py @@ -56,8 +56,8 @@ def test_export( ) assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 - for data in export_task.get_stream(): - obj = json.loads(data.json_str) + for data in export_task.get_buffered_stream(): + obj = data.json task_media_attributes = obj["media_attributes"] task_project = obj["projects"][project.uid] task_project_label_ids_set = set( @@ -139,8 +139,8 @@ def test_with_date_filters( ) assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 - for data in export_task.get_stream(): - obj = json.loads(data.json_str) + for data in export_task.get_buffered_stream(): + obj = data.json task_project = obj["projects"][project.uid] task_project_label_ids_set = set( map(lambda prediction: prediction["id"], task_project["labels"]) @@ -181,9 +181,9 @@ def test_with_iso_date_filters( assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 assert ( label_id - == json.loads(list(export_task.get_stream())[0].json_str)[ - "projects" - ][project.uid]["labels"][0]["id"] + == export_task.get_buffered_stream()[0].json["projects"][ + project.uid + ]["labels"][0]["id"] ) def test_with_iso_date_filters_no_start_date( @@ -207,9 +207,9 @@ def test_with_iso_date_filters_no_start_date( assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 assert ( label_id - == json.loads(list(export_task.get_stream())[0].json_str)[ - "projects" - ][project.uid]["labels"][0]["id"] + == export_task.get_buffered_stream()[0].json["projects"][ + project.uid + ]["labels"][0]["id"] ) def test_with_iso_date_filters_and_future_start_date( @@ -270,8 +270,8 @@ def test_with_data_row_filter( ) data_row_ids = list( map( - lambda x: json.loads(x.json_str)["data_row"]["id"], - export_task.get_stream(), + lambda x: x.json["data_row"]["id"], + export_task.get_buffered_stream(), ) ) assert data_row_ids.sort() == expected_data_row_ids.sort() @@ -310,8 +310,8 @@ def test_with_global_key_filter( ) global_keys = list( map( - lambda x: json.loads(x.json_str)["data_row"]["global_key"], - export_task.get_stream(), + lambda x: x.json["data_row"]["global_key"], + export_task.get_buffered_stream(), ) ) assert global_keys.sort() == expected_global_keys.sort() From a83b0031c72dd913fe0dabfcdd26e5d91b938bb2 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:02:31 -0500 Subject: [PATCH 6/7] Fixed bug --- .../data/export/streamable/test_export_project_streamable.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py index bf6df3363..63423202a 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_project_streamable.py @@ -181,7 +181,7 @@ def test_with_iso_date_filters( assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 assert ( label_id - == export_task.get_buffered_stream()[0].json["projects"][ + == list(export_task.get_buffered_stream())[0].json["projects"][ project.uid ]["labels"][0]["id"] ) @@ -207,7 +207,7 @@ def test_with_iso_date_filters_no_start_date( assert export_task.get_total_lines(stream_type=StreamType.RESULT) > 0 assert ( label_id - == export_task.get_buffered_stream()[0].json["projects"][ + == list(export_task.get_buffered_stream())[0].json["projects"][ project.uid ]["labels"][0]["id"] ) From 38fb0b30f44106ff400c5219ac7095dd04cfdef7 Mon Sep 17 00:00:00 2001 From: Gabefire <33893811+Gabefire@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:23:21 -0500 Subject: [PATCH 7/7] Removed bad import --- .../data/export/streamable/test_export_embeddings_streamable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py index 125f4d392..803b5994a 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_embeddings_streamable.py @@ -1,7 +1,7 @@ import json import random -from labelbox import StreamType, JsonConverter +from labelbox import StreamType class TestExportEmbeddings: