diff --git a/libs/labelbox/src/labelbox/schema/data_row.py b/libs/labelbox/src/labelbox/schema/data_row.py index d9b7fd204..b7c9b324d 100644 --- a/libs/labelbox/src/labelbox/schema/data_row.py +++ b/libs/labelbox/src/labelbox/schema/data_row.py @@ -356,7 +356,6 @@ def _export( res = client.execute(create_task_query_str, query_params, error_log_key="errors") - print(res) res = res[mutation_name] task_id = res["taskId"] is_streamable = res["isStreamable"] diff --git a/libs/labelbox/src/labelbox/schema/export_task.py b/libs/labelbox/src/labelbox/schema/export_task.py index e30551713..473333d1f 100644 --- a/libs/labelbox/src/labelbox/schema/export_task.py +++ b/libs/labelbox/src/labelbox/schema/export_task.py @@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few- Deprecated: This converter is deprecated and will be removed in a future release. """ - + def __init__(self) -> None: - warnings.warn("JSON converter is deprecated and will be removed in a future release") + warnings.warn( + "JSON converter is deprecated and will be removed in a future release" + ) super().__init__() def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: @@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods """ def __init__(self): - warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release") + warnings.warn( + "_MultiGCSFileReader is deprecated and will be removed in a future release" + ) super().__init__() self._retrieval_strategy = None @@ -415,54 +419,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: result = self._retrieval_strategy.get_next_chunk() -@dataclass -class BufferedJsonConverterOutput: - """Output with the JSON object""" - json: Any - - -class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): - """Converts JSON data in a buffered manner - """ - def convert( - self, input_args: Converter.ConverterInputArgs - ) -> Iterator[BufferedJsonConverterOutput]: - yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) - - -class _BufferedGCSFileReader(_Reader): - """Reads data from multiple GCS files and buffer them to disk""" - - def __init__(self): - super().__init__() - self._retrieval_strategy = None - - def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: - """Sets the retrieval strategy.""" - self._retrieval_strategy = strategy - - def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: - if not self._retrieval_strategy: - raise ValueError("retrieval strategy not set") - # create a buffer - with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: - result = self._retrieval_strategy.get_next_chunk() - while result: - file_info, raw_data = result - temp_file.seek(file_info.offsets.start) - temp_file.write(raw_data) - result = self._retrieval_strategy.get_next_chunk() - # read buffer - with open(temp_file.name, 'r') as temp_file_reopened: - for idx, line in enumerate(temp_file_reopened): - yield _MetadataFileInfo( - offsets=Range(start=0, end=len(line) - 1), - lines=Range(start=idx, end=idx + 1), - file=temp_file.name), line - # manually delete buffer - os.unlink(temp_file.name) - - class Stream(Generic[OutputT]): """Streams data from a Reader.""" @@ -520,6 +476,144 @@ def start( stream_handler(output) +class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods + """Retrieves files by offset.""" + + def __init__( + self, + ctx: _TaskContext, + offset: int, + ) -> None: + super().__init__(ctx) + self._current_offset = offset + self._current_line = 0 + if self._current_offset >= self._ctx.metadata_header.total_size: + raise ValueError( + f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" + ) + + def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: + if self._current_offset >= self._ctx.metadata_header.total_size: + return None + query = ( + f"query GetExportFileFromOffsetPyApi" + f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" + f"{{task(where: $where)" + f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" + f"{{offsets {{start end}} lines {{start end}} file}}" + f"}}}}") + variables = { + "where": { + "id": self._ctx.task_id + }, + "streamType": self._ctx.stream_type.value, + "offset": str(self._current_offset), + } + file_info, file_content = self._get_file_content( + query, variables, "exportFileFromOffset") + file_info.offsets.start = self._current_offset + file_info.lines.start = self._current_line + self._current_offset = file_info.offsets.end + 1 + self._current_line = file_info.lines.end + 1 + return file_info, file_content + + +class BufferedStream(Generic[OutputT]): + """Streams data from a Reader.""" + + def __init__( + self, + ctx: _TaskContext, + ): + self._ctx = ctx + self._reader = _BufferedGCSFileReader() + self._converter = _BufferedJsonConverter() + self._reader.set_retrieval_strategy( + _BufferedFileRetrieverByOffset(self._ctx, 0)) + + def __iter__(self): + yield from self._fetch() + + def _fetch(self,) -> Iterator[OutputT]: + """Fetches the result data. + Returns an iterator that yields the offset and the data. + """ + if self._ctx.metadata_header.total_size is None: + return + + stream = self._reader.read() + with self._converter as converter: + for file_info, raw_data in stream: + for output in converter.convert( + Converter.ConverterInputArgs(self._ctx, file_info, + raw_data)): + yield output + + def start( + self, + stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: + """Starts streaming the result data. + Calls the stream_handler for each result. + """ + # this calls the __iter__ method, which in turn calls the _fetch method + for output in self: + if stream_handler: + stream_handler(output) + + +@dataclass +class BufferedJsonConverterOutput: + """Output with the JSON object""" + json: Any + + +class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): + """Converts JSON data in a buffered manner + """ + + def convert( + self, input_args: Converter.ConverterInputArgs + ) -> Iterator[BufferedJsonConverterOutput]: + yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) + + +class _BufferedGCSFileReader(_Reader): + """Reads data from multiple GCS files and buffer them to disk""" + + def __init__(self): + super().__init__() + self._retrieval_strategy = None + + def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: + """Sets the retrieval strategy.""" + self._retrieval_strategy = strategy + + def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: + if not self._retrieval_strategy: + raise ValueError("retrieval strategy not set") + # create a buffer + with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: + result = self._retrieval_strategy.get_next_chunk() + while result: + _, raw_data = result + # there is something wrong with the way the offsets are being calculated + # so just write all of the chunks as is too the file, with pointer initially + # pointed to the start of the file (like what is in GCS) and do not + # rely on offsets for file location + # temp_file.seek(file_info.offsets.start) + temp_file.write(raw_data) + result = self._retrieval_strategy.get_next_chunk() + # read buffer + with open(temp_file.name, 'r') as temp_file_reopened: + for idx, line in enumerate(temp_file_reopened): + yield _MetadataFileInfo(offsets=Range(start=0, + end=len(line) - 1), + lines=Range(start=idx, end=idx + 1), + file=temp_file.name), line + # manually delete buffer + os.unlink(temp_file.name) + + class ExportTask: """ An adapter class for working with task objects, providing extended functionality @@ -645,12 +739,11 @@ def errors(self): self._task.client, self._task.uid, StreamType.ERRORS) if metadata_header is None: return None - Stream( - _TaskContext(self._task.client, self._task.uid, StreamType.ERRORS, - metadata_header), - _BufferedGCSFileReader(), - _BufferedJsonConverter(), - ).start(stream_handler=lambda output: data.append(output.json)) + BufferedStream( + _TaskContext( + self._task.client, self._task.uid, StreamType.ERRORS, + metadata_header),).start( + stream_handler=lambda output: data.append(output.json)) return data @property @@ -667,12 +760,11 @@ def result(self): self._task.client, self._task.uid, StreamType.RESULT) if metadata_header is None: return [] - Stream( - _TaskContext(self._task.client, self._task.uid, - StreamType.RESULT, metadata_header), - _BufferedGCSFileReader(), - _BufferedJsonConverter(), - ).start(stream_handler=lambda output: data.append(output.json)) + BufferedStream( + _TaskContext( + self._task.client, self._task.uid, StreamType.RESULT, + metadata_header),).start( + stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @@ -747,6 +839,38 @@ def has_errors(self) -> bool: total_size = self.get_total_file_size(StreamType.ERRORS) return total_size is not None and total_size > 0 + def get_buffered_stream( + self, + stream_type: StreamType = StreamType.RESULT, + ) -> BufferedStream: + """ + Returns the result of the task. + + Args: + stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT. + + Returns: + Stream: The buffered stream object. + + Raises: + ExportTask.ExportTaskException: If the task has failed or is not ready yet. + ValueError: If the task does not have the specified stream type. + """ + if self._task.status == "FAILED": + raise ExportTask.ExportTaskException("Task failed") + if self._task.status != "COMPLETE": + raise ExportTask.ExportTaskException("Task is not ready yet") + + metadata_header = self._get_metadata_header(self._task.client, + self._task.uid, stream_type) + if metadata_header is None: + raise ValueError( + f"Task {self._task.uid} does not have a {stream_type.value} stream" + ) + return BufferedStream( + _TaskContext(self._task.client, self._task.uid, stream_type, + metadata_header),) + @overload def get_stream( self, @@ -768,6 +892,9 @@ def get_stream( converter: Optional[Converter] = None, stream_type: StreamType = StreamType.RESULT, ) -> Stream: + warnings.warn( + "get_stream is deprecated and will be removed in a future release, use get_buffered_stream" + ) if converter is None: converter = JsonConverter() """Returns the result of the task.""" diff --git a/libs/labelbox/src/labelbox/schema/project.py b/libs/labelbox/src/labelbox/schema/project.py index 9ffed2816..8f4b26aee 100644 --- a/libs/labelbox/src/labelbox/schema/project.py +++ b/libs/labelbox/src/labelbox/schema/project.py @@ -1022,8 +1022,6 @@ def create_batches_from_dataset( consensus_settings = ConsensusSettings(**consensus_settings).dict( by_alias=True) - print("Creating batches from dataset %s", dataset_id) - method = 'createBatchesFromDataset' mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateBatchesFromDatasetInput!) { project(where: {id: $projectId}) { diff --git a/libs/labelbox/tests/conftest.py b/libs/labelbox/tests/conftest.py index edb165539..a5a0d0f1f 100644 --- a/libs/labelbox/tests/conftest.py +++ b/libs/labelbox/tests/conftest.py @@ -430,10 +430,14 @@ def consensus_project(client, rand_gen): @pytest.fixture def model_config(client, rand_gen, valid_model_id): - model_config = client.create_model_config(name=rand_gen(str), model_id=valid_model_id, inference_params = {"param": "value"}) + model_config = client.create_model_config( + name=rand_gen(str), + model_id=valid_model_id, + inference_params={"param": "value"}) yield model_config client.delete_model_config(model_config.uid) + @pytest.fixture def consensus_project_with_batch(consensus_project, initial_dataset, rand_gen, image_url): @@ -1050,6 +1054,7 @@ def embedding(client: Client): yield embedding embedding.delete() + @pytest.fixture def valid_model_id(): return "2c903542-d1da-48fd-9db1-8c62571bd3d2" diff --git a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py index 8da08a833..0d98d8a89 100644 --- a/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py +++ b/libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py @@ -27,6 +27,25 @@ def test_with_data_row_object(self, client, data_row, assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 assert (json.loads(list(export_task.get_stream())[0].json_str) ["data_row"]["id"] == data_row.uid) + + def test_with_data_row_object_buffered(self, client, data_row, + wait_for_data_row_processing): + data_row = wait_for_data_row_processing(client, data_row) + time.sleep(7) # temp fix for ES indexing delay + export_task = DataRow.export( + client=client, + data_rows=[data_row], + task_name="TestExportDataRow:test_with_data_row_object_buffered", + ) + export_task.wait_till_done() + assert export_task.status == "COMPLETE" + assert isinstance(export_task, ExportTask) + assert export_task.has_result() + assert export_task.has_errors() is False + assert export_task.get_total_file_size( + stream_type=StreamType.RESULT) > 0 + assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1 + assert list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid def test_with_id(self, client, data_row, wait_for_data_row_processing): data_row = wait_for_data_row_processing(client, data_row) diff --git a/libs/labelbox/tests/integration/conftest.py b/libs/labelbox/tests/integration/conftest.py index be07bb09e..7a1d2d8cc 100644 --- a/libs/labelbox/tests/integration/conftest.py +++ b/libs/labelbox/tests/integration/conftest.py @@ -453,4 +453,4 @@ def print_perf_summary(): num_of_entries = 10 if len(sorted_dict) >= 10 else len(sorted_dict) slowest_fixtures = [(aaa, sorted_dict[aaa]) for aaa in islice(sorted_dict, num_of_entries)] - print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr) + print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr) \ No newline at end of file diff --git a/libs/labelbox/tests/unit/export_task/test_export_task.py b/libs/labelbox/tests/unit/export_task/test_export_task.py index b147400b1..50f08191b 100644 --- a/libs/labelbox/tests/unit/export_task/test_export_task.py +++ b/libs/labelbox/tests/unit/export_task/test_export_task.py @@ -1,21 +1,56 @@ import json +import pytest from unittest.mock import MagicMock, patch from labelbox.schema.export_task import ExportTask + class TestExportTask: def test_export_task(self): with patch('requests.get') as mock_requests_get: mock_task = MagicMock() mock_task.client.execute.side_effect = [ - {"task": {"exportMetadataHeader": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, - {"task": {"exportFileFromOffset": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } }, + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, ] mock_task.status = "COMPLETE" data = { "data_row": { - "raw_data": """ + "raw_data": + """ {"raw_text":"}{"} {"raw_text":"\\nbad"} """ @@ -24,4 +59,137 @@ def test_export_task(self): mock_requests_get.return_value.text = json.dumps(data) mock_requests_get.return_value.content = "b" export_task = ExportTask(mock_task, is_export_v2=True) - assert export_task.result[0] == data \ No newline at end of file + assert export_task.result[0] == data + + def test_get_buffered_stream_complete(self): + with pytest.raises(ExportTask.ExportTaskException): + mock_task = MagicMock() + mock_task.status = "FAILED" + export_task = ExportTask(mock_task, is_export_v2=True) + export_task.get_buffered_stream() + + def test_get_buffered_stream_failed(self): + with pytest.raises(ExportTask.ExportTaskException): + mock_task = MagicMock() + mock_task.status = "INPROGRESS" + export_task = ExportTask(mock_task, is_export_v2=True) + export_task.get_buffered_stream() + + def test_get_buffered_stream(self): + with patch('requests.get') as mock_requests_get: + mock_task = MagicMock() + mock_task.client.execute.side_effect = [ + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + ] + mock_task.status = "COMPLETE" + data = { + "data_row": { + "raw_data": + """ + {"raw_text":"}{"} + {"raw_text":"\\nbad"} + """ + } + } + mock_requests_get.return_value.text = json.dumps(data) + mock_requests_get.return_value.content = "b" + export_task = ExportTask(mock_task, is_export_v2=True) + output_data = [] + export_task.get_buffered_stream().start(stream_handler=lambda x: output_data.append(x.json)) + assert data == output_data[0] + + def test_export_task_bad_offsets(self): + with patch('requests.get') as mock_requests_get: + mock_task = MagicMock() + mock_task.client.execute.side_effect = [ + { + "task": { + "exportMetadataHeader": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + { + "task": { + "exportFileFromOffset": { + "total_size": 1, + "total_lines": 1, + "lines": { + "start": 0, + "end": 1 + }, + "offsets": { + "start": 0, + "end": 0 + }, + "file": "file" + } + } + }, + ] + mock_task.status = "COMPLETE" + data = { + "data_row": { + "id": + "clwb6wvpv3mpx0712aafl9m00", + "external_id": + "43cdad5e-1fcf-450d-ad72-df4460edf973", + "global_key": + "9ab56c5a-5c2f-45ae-8e21-e53eb415cefe", + "row_data": + "{\"type\":\"application/vnd.labelbox.conversational\",\"version\":1,\"messages\":[{\"messageId\":\"message-0\",\"timestampUsec\":1530718491,\"content\":\"The minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is\",\"user\":{\"userId\":\"prompt\",\"name\":\"prompt\"},\"align\":\"left\",\"canLabel\":true}],\"modelOutputs\":[{\"title\":\"Response 1\",\"content\":\"To find the minimum value of the expression $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x+b\\\\sin x=\\\\sqrt{a^2+b^2}\\\\left(\\\\frac{a}{\\\\sqrt{a^2+b^2}}\\\\cos x+\\\\frac{b}{\\\\sqrt{a^2+b^2}}\\\\sin x\\\\right)$. This allows us to rewrite the expression as:\\n\\n$3\\\\cos x+4\\\\sin x+8=\\\\sqrt{3^2+4^2}\\\\left(\\\\frac{3}{\\\\sqrt{3^2+4^2}}\\\\cos x+\\\\frac{4}{\\\\sqrt{3^2+4^2}}\\\\sin x\\\\right)+8=5\\\\left(\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x\\\\right)+8$\\n\\nNow, let's consider the expression $\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x$. Since $\\\\left(\\\\frac{3}{5}\\\\right)^2+\\\\left(\\\\frac{4}{5}\\\\right)^2=1$, we can write $\\\\frac{3}{5}=\\\\cos\\\\theta$ and $\\\\frac{4}{5}=\\\\sin\\\\theta$ for some angle $\\\\theta$. Then:\\n\\n$\\\\frac{3}{5}\\\\cos x+\\\\frac{4}{5}\\\\sin x=\\\\cos\\\\theta\\\\cos x+\\\\sin\\\\theta\\\\sin x=\\\\cos(x-\\\\theta)$\\n\\nSo, the original expression can be written as:\\n\\n$5\\\\cos(x-\\\\theta)+8$\\n\\nSince the minimum value of $\\\\cos(x-\\\\theta)$ is $-1$, the minimum value of the original expression is:\\n\\n$5(-1)+8=-5+8=3$\\n\\nTherefore, the minimum value of $3\\\\cos x + 4\\\\sin x + 8$ is $\\\\boxed{3}$.\",\"modelConfigName\":\"null\"},{\"title\":\"Response 2\",\"content\":\"A nice math question!\\n\\nTo find the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$, we can use the fact that $a\\\\cos x + b\\\\sin x = \\\\sqrt{a^2 + b^2} \\\\cos(x - \\\\alpha)$, where $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{b}{a}\\\\right)$.\\n\\nIn this case, $a = 3$ and $b = 4$, so $\\\\alpha = \\\\tan^{-1}\\\\left(\\\\frac{4}{3}\\\\right)$.\\n\\nSo, we have:\\n\\n$$3 \\\\cos x + 4 \\\\sin x + 8 = \\\\sqrt{3^2 + 4^2} \\\\cos(x - \\\\alpha) + 8 = 5 \\\\cos(x - \\\\alpha) + 8$$\\n\\nNow, the minimum value of $\\\\cos(x - \\\\alpha)$ is $-1$, so the minimum value of $5 \\\\cos(x - \\\\alpha) + 8$ is:\\n\\n$$5(-1) + 8 = -5 + 8 = 3$$\\n\\nTherefore, the minimum value of $3 \\\\cos x + 4 \\\\sin x + 8$ is $\\\\boxed{3}$.\\n\\nLet me know if you have any questions or need further clarification!\",\"modelConfigName\":\"null\"}]}" + }, + "media_attributes": { + "asset_type": "conversational", + "mime_type": "application/vnd.labelbox.conversational", + "labelable_ids": ["message-0"], + "message_count": 1 + } + } + mock_requests_get.return_value.text = json.dumps(data) + mock_requests_get.return_value.content = "b" + export_task = ExportTask(mock_task, is_export_v2=True) + assert export_task.result[0] == data diff --git a/requirements-dev.lock b/requirements-dev.lock index a90e24e44..68d7650ab 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:libs/labelbox alabaster==0.7.13