From 718ac7f4ab98777034d21ee0b2f1096fe3440234 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Thu, 23 May 2024 17:22:56 -0700 Subject: [PATCH 1/6] ignore meta file partials --- libs/labelbox/src/labelbox/schema/export_task.py | 8 ++++++-- .../tests/unit/export_task/test_export_task.py | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index e30551713..2d4e56f9f 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -448,8 +448,12 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: result = self._retrieval_strategy.get_next_chunk() while result: - file_info, raw_data = result - temp_file.seek(file_info.offsets.start) + _, raw_data = result + # there is something wrong with the way the offsets are being calculated + # so just write all of the chunks as is too the file, with pointer initially + # pointed to the start of the file (like what is in GCS) and do not + # rely on offsets for file location + # temp_file.seek(file_info.offsets.start) temp_file.write(raw_data) result = self._retrieval_strategy.get_next_chunk() # read buffer diff --git a/libs/labelbox/tests/unit/export_task/test_export_task.py b/libs/labelbox/tests/unit/export_task/test_export_task.py index b147400b1..69c81500a 100644 --- a/libs/labelbox/tests/unit/export_task/test_export_task.py +++ b/libs/labelbox/tests/unit/export_task/test_export_task.py @@ -24,4 +24,18 @@ def test_export_task(self): mock_requests_get.return_value.text = json.dumps(data) mock_requests_get.return_value.content = "b" export_task = ExportTask(mock_task, is_export_v2=True) + assert export_task.result[0] == data + + def test_export_task_bad_offsets(self): + with patch('requests.get') as mock_requests_get: + mock_task = MagicMock() + mock_task.client.execute.side_effect = [ + {"task": {"exportMetadataHeader": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, + {"task": {"exportFileFromOffset": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, + ] + mock_task.status = "COMPLETE" + data = {"data_row": {"id": "clwb6wvpv3mpx0712aafl9m00", "external_id": "43cdad5e-1fcf-450d-ad72-df4460edf973", "global_key": "9ab56c5a-5c2f-45ae-8e21-e53eb415cefe", "row_data": "{\"type\":\"application/vnd.labelbox.conversational\",\"version\":1,\"messages\":[{\"messageId\":\"message-0\",\"timestampUsec\":1530718491,\"content\":\"The minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is\",\"user\":{\"userId\":\"prompt\",\"name\":\"prompt\"},\"align\":\"left\",\"canLabel\":true}],\"modelOutputs\":[{\"title\":\"Response 1\",\"content\":\"To find the minimum value of the expression $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x+b\\\\sin x=\\\\sqrt{a^2+b^2}\\\\left(\\\\frac{a}{\\\\sqrt{a^2+b^2}}\\\\cos x+\\\\frac{b}{\\\\sqrt{a^2+b^2}}\\\\sin x\\\\right)$. This allows us to rewrite the expression as:\\n\\n$3\\\\cos x+4\\\\sin x+8=\\\\sqrt{3^2+4^2}\\\\left(\\\\frac{3}{\\\\sqrt{3^2+4^2}}\\\\cos x+\\\\frac{4}{\\\\sqrt{3^2+4^2}}\\\\sin x\\\\right)+8=5\\\\left(\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x\\\\right)+8$\\n\\nNow, let's consider the expression $\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x$. Since $\\\\left(\\\\frac{3}{5}\\\\right)^2+\\\\left(\\\\frac{4}{5}\\\\right)^2=1$, we can write $\\\\frac{3}{5}=\\\\cos\\\\theta$ and $\\\\frac{4}{5}=\\\\sin\\\\theta$ for some angle $\\\\theta$. Then:\\n\\n$\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x=\\\\cos\\\\theta\\\\cos x+\\\\sin\\\\theta\\\\sin x=\\\\cos(x-\\\\theta)$\\n\\nSo, the original expression can be written as:\\n\\n$5\\\\cos(x-\\\\theta)+8$\\n\\nSince the minimum value of $\\\\cos(x-\\\\theta)$ is $-1$, the minimum value of the original expression is:\\n\\n$5(-1)+8=-5+8=3$\\n\\nTherefore, the minimum value of $3\\\\cos x + 4\\\\sin x + 8$ is $\\\\boxed{3}$.\",\"modelConfigName\":\"null\"},{\"title\":\"Response 2\",\"content\":\"A nice math question!\\n\\nTo find the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x + b\\\\sin x = \\\\sqrt{a^2 + b^2} \\\\cos(x - \\\\alpha)$, where $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{b}{a}\\\\right)$.\\n\\nIn this case, $a = 3$ and $b = 4$, so $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{4}{3}\\\\right)$.\\n\\nSo, we have:\\n\\n$$3 \\\\cos x + 4 \\\\sin x + 8 = \\\\sqrt{3^2 + 4^2} \\\\cos(x - \\\\alpha) + 8 = 5 \\\\cos(x - \\\\alpha) + 8$$\\n\\nNow, the minimum value of $\\\\cos(x - \\\\alpha)$ is $-1$, so the minimum value of $5 \\\\cos(x - \\\\alpha) + 8$ is:\\n\\n$$5(-1) + 8 = -5 + 8 = 3$$\\n\\nTherefore, the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is $\\\\boxed{3}$.\\n\\nLet me know if you have any questions or need further clarification!\",\"modelConfigName\":\"null\"}]}"}, "media_attributes": {"asset_type": "conversational", "mime_type": "application/vnd.labelbox.conversational", "labelable_ids": ["message-0"], "message_count": 1}} + mock_requests_get.return_value.text = json.dumps(data) + mock_requests_get.return_value.content = "b" + export_task = ExportTask(mock_task, is_export_v2=True) assert export_task.result[0] == data \ No newline at end of file From f9e76b17d4475246eb67136ab790659308c18516 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Thu, 23 May 2024 17:57:00 -0700 Subject: [PATCH 2/6] Buffered stream --- .../src/labelbox/schema/export_task.py | 219 +++++++++++++----- 1 file changed, 160 insertions(+), 59 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index 2d4e56f9f..6916f1b74 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -414,59 +414,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: yield file_info, raw_data result = self._retrieval_strategy.get_next_chunk() - -@dataclass -class BufferedJsonConverterOutput: - """Output with the JSON object""" - json: Any - - -class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): - """Converts JSON data in a buffered manner - """ - def convert( - self, input_args: Converter.ConverterInputArgs - ) -> Iterator[BufferedJsonConverterOutput]: - yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) - - -class _BufferedGCSFileReader(_Reader): - """Reads data from multiple GCS files and buffer them to disk""" - - def __init__(self): - super().__init__() - self._retrieval_strategy = None - - def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: - """Sets the retrieval strategy.""" - self._retrieval_strategy = strategy - - def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: - if not self._retrieval_strategy: - raise ValueError("retrieval strategy not set") - # create a buffer - with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: - result = self._retrieval_strategy.get_next_chunk() - while result: - _, raw_data = result - # there is something wrong with the way the offsets are being calculated - # so just write all of the chunks as is too the file, with pointer initially - # pointed to the start of the file (like what is in GCS) and do not - # rely on offsets for file location - # temp_file.seek(file_info.offsets.start) - temp_file.write(raw_data) - result = self._retrieval_strategy.get_next_chunk() - # read buffer - with open(temp_file.name, 'r') as temp_file_reopened: - for idx, line in enumerate(temp_file_reopened): - yield _MetadataFileInfo( - offsets=Range(start=0, end=len(line) - 1), - lines=Range(start=idx, end=idx + 1), - file=temp_file.name), line - # manually delete buffer - os.unlink(temp_file.name) - - class Stream(Generic[OutputT]): """Streams data from a Reader.""" @@ -524,6 +471,142 @@ def start( stream_handler(output) +class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods + """Retrieves files by offset.""" + + def __init__( + self, + ctx: _TaskContext, + offset: int, + ) -> None: + super().__init__(ctx) + self._current_offset = offset + self._current_line: Optional[int] = None + if self._current_offset >= self._ctx.metadata_header.total_size: + raise ValueError( + f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" + ) + + def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: + if self._current_offset >= self._ctx.metadata_header.total_size: + return None + query = ( + f"query GetExportFileFromOffsetPyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" + f"{{task(where: $where)" + f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" + f"{{offsets {{start end}} lines {{start end}} file}}" + f"}}}}") + variables = { + "where": { + "id": self._ctx.task_id + }, + "streamType": self._ctx.stream_type.value, + "offset": str(self._current_offset), + } + file_info, file_content = self._get_file_content( + query, variables, "exportFileFromOffset") + file_info.offsets.start = self._current_offset + file_info.lines.start = self._current_line + self._current_offset = file_info.offsets.end + 1 + self._current_line = file_info.lines.end + 1 + return file_info, file_content + + +class BufferedStream(Generic[OutputT]): + """Streams data from a Reader.""" + + def __init__( + self, + ctx: _TaskContext, + ): + self._ctx = ctx + self._reader = _BufferedGCSFileReader() + self._converter = _BufferedJsonConverter() + self._reader.set_retrieval_strategy(_BufferedFileRetrieverByOffset(self._ctx, 0)) + + def __iter__(self): + yield from self._fetch() + + def _fetch(self,) -> Iterator[OutputT]: + """Fetches the result data. + Returns an iterator that yields the offset and the data. + """ + if self._ctx.metadata_header.total_size is None: + return + + stream = self._reader.read() + with self._converter as converter: + for file_info, raw_data in stream: + for output in converter.convert( + Converter.ConverterInputArgs(self._ctx, file_info, + raw_data)): + yield output + + def start( + self, + stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: + """Starts streaming the result data. + Calls the stream_handler for each result. + """ + # this calls the __iter__ method, which in turn calls the _fetch method + for output in self: + if stream_handler: + stream_handler(output) + + +@dataclass +class BufferedJsonConverterOutput: + """Output with the JSON object""" + json: Any + + +class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): + """Converts JSON data in a buffered manner + """ + def convert( + self, input_args: Converter.ConverterInputArgs + ) -> Iterator[BufferedJsonConverterOutput]: + yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) + + +class _BufferedGCSFileReader(_Reader): + """Reads data from multiple GCS files and buffer them to disk""" + + def __init__(self): + super().__init__() + self._retrieval_strategy = None + + def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: + """Sets the retrieval strategy.""" + self._retrieval_strategy = strategy + + def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: + if not self._retrieval_strategy: + raise ValueError("retrieval strategy not set") + # create a buffer + with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: + result = self._retrieval_strategy.get_next_chunk() + while result: + _, raw_data = result + # there is something wrong with the way the offsets are being calculated + # so just write all of the chunks as is too the file, with pointer initially + # pointed to the start of the file (like what is in GCS) and do not + # rely on offsets for file location + # temp_file.seek(file_info.offsets.start) + temp_file.write(raw_data) + result = self._retrieval_strategy.get_next_chunk() + # read buffer + with open(temp_file.name, 'r') as temp_file_reopened: + for idx, line in enumerate(temp_file_reopened): + yield _MetadataFileInfo( + offsets=Range(start=0, end=len(line) - 1), + lines=Range(start=idx, end=idx + 1), + file=temp_file.name), line + # manually delete buffer + os.unlink(temp_file.name) + + class ExportTask: """ An adapter class for working with task objects, providing extended functionality @@ -649,11 +732,9 @@ def errors(self): self._task.client, self._task.uid, StreamType.ERRORS) if metadata_header is None: return None - Stream( + BufferedStream( _TaskContext(self._task.client, self._task.uid, StreamType.ERRORS, metadata_header), - _BufferedGCSFileReader(), - _BufferedJsonConverter(), ).start(stream_handler=lambda output: data.append(output.json)) return data @@ -671,11 +752,9 @@ def result(self): self._task.client, self._task.uid, StreamType.RESULT) if metadata_header is None: return [] - Stream( + BufferedStream( _TaskContext(self._task.client, self._task.uid, StreamType.RESULT, metadata_header), - _BufferedGCSFileReader(), - _BufferedJsonConverter(), ).start(stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @@ -767,11 +846,33 @@ def get_stream( ) -> Stream[FileConverterOutput]: """Overload for getting the right typing hints when using a FileConverter.""" + def get_buffered_stream( + self, + stream_type: StreamType = StreamType.RESULT, + ) -> Stream: + """Returns the result of the task.""" + if self._task.status == "FAILED": + raise ExportTask.ExportTaskException("Task failed") + if self._task.status != "COMPLETE": + raise ExportTask.ExportTaskException("Task is not ready yet") + + metadata_header = self._get_metadata_header(self._task.client, + self._task.uid, stream_type) + if metadata_header is None: + raise ValueError( + f"Task {self._task.uid} does not have a {stream_type.value} stream" + ) + return BufferedStream( + _TaskContext(self._task.client, self._task.uid, stream_type, + metadata_header), + ) + def get_stream( self, converter: Optional[Converter] = None, stream_type: StreamType = StreamType.RESULT, ) -> Stream: + warnings.warn("get_stream is deprecated and will be removed in a future release, use get_buffered_stream") if converter is None: converter = JsonConverter() """Returns the result of the task.""" From 10cbf6653b8caee4ba46a6b2007cdd0b796e9840 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Thu, 23 May 2024 17:58:49 -0700 Subject: [PATCH 3/6] Buffered stream code --- .../src/labelbox/schema/export_task.py | 94 +++++++++++------- libs/labelbox/tests/conftest.py | 7 +- .../unit/export_task/test_export_task.py | 97 +++++++++++++++++-- 3 files changed, 154 insertions(+), 44 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index 6916f1b74..473333d1f 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few- Deprecated: This converter is deprecated and will be removed in a future release. """ - + def __init__(self) -> None: - warnings.warn("JSON converter is deprecated and will be removed in a future release") + warnings.warn( + "JSON converter is deprecated and will be removed in a future release" + ) super().__init__() def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: @@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods """ def __init__(self): - warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release") + warnings.warn( + "_MultiGCSFileReader is deprecated and will be removed in a future release" + ) super().__init__() self._retrieval_strategy = None @@ -414,6 +418,7 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: yield file_info, raw_data result = self._retrieval_strategy.get_next_chunk() + class Stream(Generic[OutputT]): """Streams data from a Reader.""" @@ -481,7 +486,7 @@ def __init__( ) -> None: super().__init__(ctx) self._current_offset = offset - self._current_line: Optional[int] = None + self._current_line = 0 if self._current_offset >= self._ctx.metadata_header.total_size: raise ValueError( f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" @@ -523,7 +528,8 @@ def __init__( self._ctx = ctx self._reader = _BufferedGCSFileReader() self._converter = _BufferedJsonConverter() - self._reader.set_retrieval_strategy(_BufferedFileRetrieverByOffset(self._ctx, 0)) + self._reader.set_retrieval_strategy( + _BufferedFileRetrieverByOffset(self._ctx, 0)) def __iter__(self): yield from self._fetch() @@ -564,13 +570,14 @@ class BufferedJsonConverterOutput: class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): """Converts JSON data in a buffered manner """ + def convert( self, input_args: Converter.ConverterInputArgs ) -> Iterator[BufferedJsonConverterOutput]: yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) -class _BufferedGCSFileReader(_Reader): +class _BufferedGCSFileReader(_Reader): """Reads data from multiple GCS files and buffer them to disk""" def __init__(self): @@ -599,10 +606,10 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: # read buffer with open(temp_file.name, 'r') as temp_file_reopened: for idx, line in enumerate(temp_file_reopened): - yield _MetadataFileInfo( - offsets=Range(start=0, end=len(line) - 1), - lines=Range(start=idx, end=idx + 1), - file=temp_file.name), line + yield _MetadataFileInfo(offsets=Range(start=0, + end=len(line) - 1), + lines=Range(start=idx, end=idx + 1), + file=temp_file.name), line # manually delete buffer os.unlink(temp_file.name) @@ -733,9 +740,10 @@ def errors(self): if metadata_header is None: return None BufferedStream( - _TaskContext(self._task.client, self._task.uid, StreamType.ERRORS, - metadata_header), - ).start(stream_handler=lambda output: data.append(output.json)) + _TaskContext( + self._task.client, self._task.uid, StreamType.ERRORS, + metadata_header),).start( + stream_handler=lambda output: data.append(output.json)) return data @property @@ -753,9 +761,10 @@ def result(self): if metadata_header is None: return [] BufferedStream( - _TaskContext(self._task.client, self._task.uid, - StreamType.RESULT, metadata_header), - ).start(stream_handler=lambda output: data.append(output.json)) + _TaskContext( + self._task.client, self._task.uid, StreamType.RESULT, + metadata_header),).start( + stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @@ -830,27 +839,23 @@ def has_errors(self) -> bool: total_size = self.get_total_file_size(StreamType.ERRORS) return total_size is not None and total_size > 0 - @overload - def get_stream( + def get_buffered_stream( self, - converter: JsonConverter, stream_type: StreamType = StreamType.RESULT, - ) -> Stream[JsonConverterOutput]: - """Overload for getting the right typing hints when using a JsonConverter.""" + ) -> BufferedStream: + """ + Returns the result of the task. - @overload - def get_stream( - self, - converter: FileConverter, - stream_type: StreamType = StreamType.RESULT, - ) -> Stream[FileConverterOutput]: - """Overload for getting the right typing hints when using a FileConverter.""" + Args: + stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT. - def get_buffered_stream( - self, - stream_type: StreamType = StreamType.RESULT, - ) -> Stream: - """Returns the result of the task.""" + Returns: + Stream: The buffered stream object. + + Raises: + ExportTask.ExportTaskException: If the task has failed or is not ready yet. + ValueError: If the task does not have the specified stream type. + """ if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": @@ -864,15 +869,32 @@ def get_buffered_stream( ) return BufferedStream( _TaskContext(self._task.client, self._task.uid, stream_type, - metadata_header), - ) + metadata_header),) + + @overload + def get_stream( + self, + converter: JsonConverter, + stream_type: StreamType = StreamType.RESULT, + ) -> Stream[JsonConverterOutput]: + """Overload for getting the right typing hints when using a JsonConverter.""" + + @overload + def get_stream( + self, + converter: FileConverter, + stream_type: StreamType = StreamType.RESULT, + ) -> Stream[FileConverterOutput]: + """Overload for getting the right typing hints when using a FileConverter.""" def get_stream( self, converter: Optional[Converter] = None, stream_type: StreamType = StreamType.RESULT, ) -> Stream: - warnings.warn("get_stream is deprecated and will be removed in a future release, use get_buffered_stream") + warnings.warn( + "get_stream is deprecated and will be removed in a future release, use get_buffered_stream" + ) if converter is None: converter = JsonConverter() """Returns the result of the task.""" diff --git a/libs/labelbox/tests/conftest.py b/libs/labelbox/tests/conftest.py index edb165539..a5a0d0f1f 100644 --- a/libs/labelbox/tests/conftest.py +++ b/libs/labelbox/tests/conftest.py @@ -430,10 +430,14 @@ def consensus_project(client, rand_gen): @pytest.fixture def model_config(client, rand_gen, valid_model_id): - model_config = client.create_model_config(name=rand_gen(str), model_id=valid_model_id, inference_params = {"param": "value"}) + model_config = client.create_model_config( + name=rand_gen(str), + model_id=valid_model_id, + inference_params={"param": "value"}) yield model_config client.delete_model_config(model_config.uid) + @pytest.fixture def consensus_project_with_batch(consensus_project, initial_dataset, rand_gen, image_url): @@ -1050,6 +1054,7 @@ def embedding(client: Client): yield embedding embedding.delete() + @pytest.fixture def valid_model_id(): return "2c903542-d1da-48fd-9db1-8c62571bd3d2" diff --git a/libs/labelbox/tests/unit/export_task/test_export_task.py b/libs/labelbox/tests/unit/export_task/test_export_task.py index 69c81500a..3a1335d00 100644 --- a/libs/labelbox/tests/unit/export_task/test_export_task.py +++ b/libs/labelbox/tests/unit/export_task/test_export_task.py @@ -3,19 +3,53 @@ from unittest.mock import MagicMock, patch from labelbox.schema.export_task import ExportTask + class TestExportTask: def test_export_task(self): with patch('requests.get') as mock_requests_get: mock_task = MagicMock() mock_task.client.execute.side_effect = [ - {"task": {"exportMetadataHeader": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, - {"task": {"exportFileFromOffset": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, ] mock_task.status = "COMPLETE" data = { "data_row": { - "raw_data": """ + "raw_data": + """ {"raw_text":"}{"} {"raw_text":"\\nbad"} """ @@ -30,12 +64,61 @@ def test_export_task_bad_offsets(self): with patch('requests.get') as mock_requests_get: mock_task = MagicMock() mock_task.client.execute.side_effect = [ - {"task": {"exportMetadataHeader": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, - {"task": {"exportFileFromOffset": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, ] mock_task.status = "COMPLETE" - data = {"data_row": {"id": "clwb6wvpv3mpx0712aafl9m00", "external_id": "43cdad5e-1fcf-450d-ad72-df4460edf973", "global_key": "9ab56c5a-5c2f-45ae-8e21-e53eb415cefe", "row_data": "{\"type\":\"application/vnd.labelbox.conversational\",\"version\":1,\"messages\":[{\"messageId\":\"message-0\",\"timestampUsec\":1530718491,\"content\":\"The minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is\",\"user\":{\"userId\":\"prompt\",\"name\":\"prompt\"},\"align\":\"left\",\"canLabel\":true}],\"modelOutputs\":[{\"title\":\"Response 1\",\"content\":\"To find the minimum value of the expression $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x+b\\\\sin x=\\\\sqrt{a^2+b^2}\\\\left(\\\\frac{a}{\\\\sqrt{a^2+b^2}}\\\\cos x+\\\\frac{b}{\\\\sqrt{a^2+b^2}}\\\\sin x\\\\right)$. This allows us to rewrite the expression as:\\n\\n$3\\\\cos x+4\\\\sin x+8=\\\\sqrt{3^2+4^2}\\\\left(\\\\frac{3}{\\\\sqrt{3^2+4^2}}\\\\cos x+\\\\frac{4}{\\\\sqrt{3^2+4^2}}\\\\sin x\\\\right)+8=5\\\\left(\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x\\\\right)+8$\\n\\nNow, let's consider the expression $\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x$. Since $\\\\left(\\\\frac{3}{5}\\\\right)^2+\\\\left(\\\\frac{4}{5}\\\\right)^2=1$, we can write $\\\\frac{3}{5}=\\\\cos\\\\theta$ and $\\\\frac{4}{5}=\\\\sin\\\\theta$ for some angle $\\\\theta$. Then:\\n\\n$\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x=\\\\cos\\\\theta\\\\cos x+\\\\sin\\\\theta\\\\sin x=\\\\cos(x-\\\\theta)$\\n\\nSo, the original expression can be written as:\\n\\n$5\\\\cos(x-\\\\theta)+8$\\n\\nSince the minimum value of $\\\\cos(x-\\\\theta)$ is $-1$, the minimum value of the original expression is:\\n\\n$5(-1)+8=-5+8=3$\\n\\nTherefore, the minimum value of $3\\\\cos x + 4\\\\sin x + 8$ is $\\\\boxed{3}$.\",\"modelConfigName\":\"null\"},{\"title\":\"Response 2\",\"content\":\"A nice math question!\\n\\nTo find the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x + b\\\\sin x = \\\\sqrt{a^2 + b^2} \\\\cos(x - \\\\alpha)$, where $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{b}{a}\\\\right)$.\\n\\nIn this case, $a = 3$ and $b = 4$, so $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{4}{3}\\\\right)$.\\n\\nSo, we have:\\n\\n$$3 \\\\cos x + 4 \\\\sin x + 8 = \\\\sqrt{3^2 + 4^2} \\\\cos(x - \\\\alpha) + 8 = 5 \\\\cos(x - \\\\alpha) + 8$$\\n\\nNow, the minimum value of $\\\\cos(x - \\\\alpha)$ is $-1$, so the minimum value of $5 \\\\cos(x - \\\\alpha) + 8$ is:\\n\\n$$5(-1) + 8 = -5 + 8 = 3$$\\n\\nTherefore, the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is $\\\\boxed{3}$.\\n\\nLet me know if you have any questions or need further clarification!\",\"modelConfigName\":\"null\"}]}"}, "media_attributes": {"asset_type": "conversational", "mime_type": "application/vnd.labelbox.conversational", "labelable_ids": ["message-0"], "message_count": 1}} + data = { + "data_row": { + "id": + "clwb6wvpv3mpx0712aafl9m00", + "external_id": + "43cdad5e-1fcf-450d-ad72-df4460edf973", + "global_key": + "9ab56c5a-5c2f-45ae-8e21-e53eb415cefe", + "row_data": + "{\"type\":\"application/vnd.labelbox.conversational\",\"version\":1,\"messages\":[{\"messageId\":\"message-0\",\"timestampUsec\":1530718491,\"content\":\"The minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is\",\"user\":{\"userId\":\"prompt\",\"name\":\"prompt\"},\"align\":\"left\",\"canLabel\":true}],\"modelOutputs\":[{\"title\":\"Response 1\",\"content\":\"To find the minimum value of the expression $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x+b\\\\sin x=\\\\sqrt{a^2+b^2}\\\\left(\\\\frac{a}{\\\\sqrt{a^2+b^2}}\\\\cos x+\\\\frac{b}{\\\\sqrt{a^2+b^2}}\\\\sin x\\\\right)$. This allows us to rewrite the expression as:\\n\\n$3\\\\cos x+4\\\\sin x+8=\\\\sqrt{3^2+4^2}\\\\left(\\\\frac{3}{\\\\sqrt{3^2+4^2}}\\\\cos x+\\\\frac{4}{\\\\sqrt{3^2+4^2}}\\\\sin x\\\\right)+8=5\\\\left(\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x\\\\right)+8$\\n\\nNow, let's consider the expression $\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x$. Since $\\\\left(\\\\frac{3}{5}\\\\right)^2+\\\\left(\\\\frac{4}{5}\\\\right)^2=1$, we can write $\\\\frac{3}{5}=\\\\cos\\\\theta$ and $\\\\frac{4}{5}=\\\\sin\\\\theta$ for some angle $\\\\theta$. Then:\\n\\n$\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x=\\\\cos\\\\theta\\\\cos x+\\\\sin\\\\theta\\\\sin x=\\\\cos(x-\\\\theta)$\\n\\nSo, the original expression can be written as:\\n\\n$5\\\\cos(x-\\\\theta)+8$\\n\\nSince the minimum value of $\\\\cos(x-\\\\theta)$ is $-1$, the minimum value of the original expression is:\\n\\n$5(-1)+8=-5+8=3$\\n\\nTherefore, the minimum value of $3\\\\cos x + 4\\\\sin x + 8$ is $\\\\boxed{3}$.\",\"modelConfigName\":\"null\"},{\"title\":\"Response 2\",\"content\":\"A nice math question!\\n\\nTo find the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x + b\\\\sin x = \\\\sqrt{a^2 + b^2} \\\\cos(x - \\\\alpha)$, where $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{b}{a}\\\\right)$.\\n\\nIn this case, $a = 3$ and $b = 4$, so $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{4}{3}\\\\right)$.\\n\\nSo, we have:\\n\\n$$3 \\\\cos x + 4 \\\\sin x + 8 = \\\\sqrt{3^2 + 4^2} \\\\cos(x - \\\\alpha) + 8 = 5 \\\\cos(x - \\\\alpha) + 8$$\\n\\nNow, the minimum value of $\\\\cos(x - \\\\alpha)$ is $-1$, so the minimum value of $5 \\\\cos(x - \\\\alpha) + 8$ is:\\n\\n$$5(-1) + 8 = -5 + 8 = 3$$\\n\\nTherefore, the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is $\\\\boxed{3}$.\\n\\nLet me know if you have any questions or need further clarification!\",\"modelConfigName\":\"null\"}]}" + }, + "media_attributes": { + "asset_type": "conversational", + "mime_type": "application/vnd.labelbox.conversational", + "labelable_ids": ["message-0"], + "message_count": 1 + } + } mock_requests_get.return_value.text = json.dumps(data) mock_requests_get.return_value.content = "b" export_task = ExportTask(mock_task, is_export_v2=True) - assert export_task.result[0] == data \ No newline at end of file + assert export_task.result[0] == data From 4ccf197e24170a4381cfad40d8c6e0ceb81c1ec6 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Sat, 25 May 2024 00:05:13 -0700 Subject: [PATCH 4/6] Fixup unit tests --- .../unit/export_task/test_export_task.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/libs/labelbox/tests/unit/export_task/test_export_task.py b/libs/labelbox/tests/unit/export_task/test_export_task.py index 3a1335d00..50f08191b 100644 --- a/libs/labelbox/tests/unit/export_task/test_export_task.py +++ b/libs/labelbox/tests/unit/export_task/test_export_task.py @@ -1,4 +1,5 @@ import json +import pytest from unittest.mock import MagicMock, patch from labelbox.schema.export_task import ExportTask @@ -60,6 +61,76 @@ def test_export_task(self): export_task = ExportTask(mock_task, is_export_v2=True) assert export_task.result[0] == data + def test_get_buffered_stream_complete(self): + with pytest.raises(ExportTask.ExportTaskException): + mock_task = MagicMock() + mock_task.status = "FAILED" + export_task = ExportTask(mock_task, is_export_v2=True) + export_task.get_buffered_stream() + + def test_get_buffered_stream_failed(self): + with pytest.raises(ExportTask.ExportTaskException): + mock_task = MagicMock() + mock_task.status = "INPROGRESS" + export_task = ExportTask(mock_task, is_export_v2=True) + export_task.get_buffered_stream() + + def test_get_buffered_stream(self): + with patch('requests.get') as mock_requests_get: + mock_task = MagicMock() + mock_task.client.execute.side_effect = [ + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + ] + mock_task.status = "COMPLETE" + data = { + "data_row": { + "raw_data": + """ + {"raw_text":"}{"} + {"raw_text":"\\nbad"} + """ + } + } + mock_requests_get.return_value.text = json.dumps(data) + mock_requests_get.return_value.content = "b" + export_task = ExportTask(mock_task, is_export_v2=True) + output_data = [] + export_task.get_buffered_stream().start(stream_handler=lambda x: output_data.append(x.json)) + assert data == output_data[0] + def test_export_task_bad_offsets(self): with patch('requests.get') as mock_requests_get: mock_task = MagicMock() From 40168a48ed1b5ea2fda362964bc4828050d6fdf8 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Sat, 25 May 2024 00:10:57 -0700 Subject: [PATCH 5/6] Add integration test for buffered --- .../test_export_data_rows_streamable.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py index 8da08a833..0d98d8a89 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py @@ -27,6 +27,25 @@ def test_with_data_row_object(self, client, data_row, assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 assert (json.loads(list(export_task.get_stream())[0].json_str) ["data_row"]["id"] == data_row.uid) + + def test_with_data_row_object_buffered(self, client, data_row, + wait_for_data_row_processing): + data_row = wait_for_data_row_processing(client, data_row) + time.sleep(7) # temp fix for ES indexing delay + export_task = DataRow.export( + client=client, + data_rows=[data_row], + task_name="TestExportDataRow:test_with_data_row_object_buffered", + ) + export_task.wait_till_done() + assert export_task.status == "COMPLETE" + assert isinstance(export_task, ExportTask) + assert export_task.has_result() + assert export_task.has_errors() is False + assert export_task.get_total_file_size( + stream_type=StreamType.RESULT) > 0 + assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 + assert list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid def test_with_id(self, client, data_row, wait_for_data_row_processing): data_row = wait_for_data_row_processing(client, data_row) From 705275b141518cfade1720429e4f748e2d0cffb8 Mon Sep 17 00:00:00 2001 From: Adrian Chang Date: Sat, 25 May 2024 00:14:30 -0700 Subject: [PATCH 6/6] buffered result --- libs/labelbox/src/labelbox/schema/data_row.py | 1 - libs/labelbox/src/labelbox/schema/project.py | 2 -- libs/labelbox/tests/integration/conftest.py | 2 +- requirements-dev.lock | 1 + 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/data_row.py b/libs/labelbox/src/labelbox/schema/data_row.py index d9b7fd204..b7c9b324d 100644 --- a/libs/labelbox/src/labelbox/schema/data_row.py +++ b/libs/labelbox/src/labelbox/schema/data_row.py @@ -356,7 +356,6 @@ def _export( res = client.execute(create_task_query_str, query_params, error_log_key="errors") - print(res) res = res[mutation_name] task_id = res["taskId"] is_streamable = res["isStreamable"] diff --git a/libs/labelbox/src/labelbox/schema/project.py b/libs/labelbox/src/labelbox/schema/project.py index 9ffed2816..8f4b26aee 100644 --- a/libs/labelbox/src/labelbox/schema/project.py +++ b/libs/labelbox/src/labelbox/schema/project.py @@ -1022,8 +1022,6 @@ def create_batches_from_dataset( consensus_settings = ConsensusSettings(**consensus_settings).dict( by_alias=True) - print("Creating batches from dataset %s", dataset_id) - method = 'createBatchesFromDataset' mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateBatchesFromDatasetInput!) { project(where: {id: $projectId}) { diff --git a/libs/labelbox/tests/integration/conftest.py b/libs/labelbox/tests/integration/conftest.py index be07bb09e..7a1d2d8cc 100644 --- a/libs/labelbox/tests/integration/conftest.py +++ b/libs/labelbox/tests/integration/conftest.py @@ -453,4 +453,4 @@ def print_perf_summary(): num_of_entries = 10 if len(sorted_dict) >= 10 else len(sorted_dict) slowest_fixtures = [(aaa, sorted_dict[aaa]) for aaa in islice(sorted_dict, num_of_entries)] - print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr) + print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr) \ No newline at end of file diff --git a/requirements-dev.lock b/requirements-dev.lock index a90e24e44..68d7650ab 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:libs/labelbox alabaster==0.7.13