From 6d62c69f2aa1a82e7439e6de896fb01af2c223b7 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 25 Jun 2024 11:00:09 -0400 Subject: [PATCH 01/13] Refactor Dataset create_data_rows_sync to use upsert --- libs/labelbox/src/labelbox/schema/dataset.py | 49 ++++++++++++------- .../tests/integration/test_data_rows.py | 11 ++--- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index c2617cc0d..c4ff99ec5 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -15,7 +15,7 @@ from io import StringIO import requests -from labelbox.exceptions import InvalidQueryError, LabelboxError, ResourceNotFoundError, InvalidAttributeError +from labelbox.exceptions import InvalidQueryError, LabelboxError, ResourceNotFoundError, ResourceCreationError from labelbox.orm.comparison import Comparison from labelbox.orm.db_object import DbObject, Updateable, Deletable, experimental from labelbox.orm.model import Entity, Field, Relationship @@ -214,7 +214,10 @@ def convert_field_keys(items): res = self.client.execute(query_str, {**args, 'dataset': self.uid}) return DataRow(self.client, res['createDataRow']) - def create_data_rows_sync(self, items) -> None: + def create_data_rows_sync( + self, + items, + file_upload_thread_count=FILE_UPLOAD_THREAD_COUNT) -> None: """ Synchronously bulk upload data rows. Use this instead of `Dataset.create_data_rows` for smaller batches of data rows that need to be uploaded quickly. @@ -228,6 +231,7 @@ def create_data_rows_sync(self, items) -> None: None. If the function doesn't raise an exception then the import was successful. Raises: + ResourceCreationError: Errors in data row upload InvalidQueryError: If the `items` parameter does not conform to the specification in Dataset._create_descriptor_file or if the server did not accept the DataRow creation request (unknown reason). @@ -242,18 +246,25 @@ def create_data_rows_sync(self, items) -> None: f"Dataset.create_data_rows_sync() supports a max of {max_data_rows_supported} data rows." " For larger imports use the async function Dataset.create_data_rows()" ) - descriptor_url = DescriptorFileCreator(self.client).create_one( - items, max_attachments_per_data_row=max_attachments_per_data_row) - dataset_param = "datasetId" - url_param = "jsonUrl" - query_str = """mutation AppendRowsToDatasetSyncPyApi($%s: ID!, $%s: String!){ - appendRowsToDatasetSync(data:{datasetId: $%s, jsonFileUrl: $%s} - ){dataset{id}}} """ % (dataset_param, url_param, dataset_param, - url_param) - self.client.execute(query_str, { - dataset_param: self.uid, - url_param: descriptor_url - }) + if file_upload_thread_count < 1: + raise ValueError( + "file_upload_thread_count must be a positive integer") + + upload_items = self._separate_and_process_items(items) + specs = DataRowCreateItem.build(self.uid, upload_items) + task: DataUpsertTask = self._exec_upsert_data_rows( + specs, file_upload_thread_count) + task.wait_till_done() + + if task.has_errors(): + raise ResourceCreationError( + f"Data row upload errors: {task.errors}", cause=task.uid) + if task.status != "COMPLETE": + raise ResourceCreationError( + f"Data row upload did not complete, task status {task.status} task id {task.uid}" + ) + + return None def create_data_rows(self, items, @@ -287,14 +298,18 @@ def create_data_rows(self, raise ValueError( "file_upload_thread_count must be a positive integer") + # Usage example + upload_items = self._separate_and_process_items(items) + specs = DataRowCreateItem.build(self.uid, upload_items) + return self._exec_upsert_data_rows(specs, file_upload_thread_count) + + def _separate_and_process_items(self, items): string_items = [item for item in items if isinstance(item, str)] dict_items = [item for item in items if isinstance(item, dict)] dict_string_items = [] if len(string_items) > 0: dict_string_items = self._build_from_local_paths(string_items) - specs = DataRowCreateItem.build(self.uid, - dict_items + dict_string_items) - return self._exec_upsert_data_rows(specs, file_upload_thread_count) + return dict_items + dict_string_items def _build_from_local_paths( self, diff --git a/libs/labelbox/tests/integration/test_data_rows.py b/libs/labelbox/tests/integration/test_data_rows.py index c3c8dc9cb..95d0049be 100644 --- a/libs/labelbox/tests/integration/test_data_rows.py +++ b/libs/labelbox/tests/integration/test_data_rows.py @@ -10,10 +10,9 @@ from labelbox.schema.media_type import MediaType from labelbox import DataRow, AssetAttachment -from labelbox.exceptions import MalformedQueryException -from labelbox.schema.task import Task +from labelbox.exceptions import MalformedQueryException, ResourceCreationError +from labelbox.schema.task import Task, DataUpsertTask from labelbox.schema.data_row_metadata import DataRowMetadataField, DataRowMetadataKind -import labelbox.exceptions SPLIT_SCHEMA_ID = "cko8sbczn0002h2dkdaxb5kal" TEST_SPLIT_ID = "cko8scbz70005h2dkastwhgqt" @@ -1050,7 +1049,7 @@ def test_data_row_bulk_creation_sync_with_same_global_keys( dataset, sample_image): global_key_1 = str(uuid.uuid4()) - with pytest.raises(labelbox.exceptions.MalformedQueryException) as exc_info: + with pytest.raises(ResourceCreationError) as exc_info: dataset.create_data_rows_sync([{ DataRow.row_data: sample_image, DataRow.global_key: global_key_1 @@ -1061,8 +1060,8 @@ def test_data_row_bulk_creation_sync_with_same_global_keys( assert len(list(dataset.data_rows())) == 1 assert list(dataset.data_rows())[0].global_key == global_key_1 - assert "Some data rows were not imported. Check error output here" in str( - exc_info.value) + assert "Duplicate global key" in str(exc_info.value) + assert exc_info.value.args[1] # task id @pytest.fixture From de33f0948e003515683362cc55636c5b6b07d2b4 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 25 Jun 2024 14:00:22 -0400 Subject: [PATCH 02/13] Refactor Dataset create_data_row to use upsert --- libs/labelbox/src/labelbox/schema/dataset.py | 91 +++++--------------- 1 file changed, 22 insertions(+), 69 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index c4ff99ec5..c0f3e7089 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -141,78 +141,31 @@ def create_data_row(self, items=None, **kwargs) -> "DataRow": any of the field names given in `kwargs`. """ - invalid_argument_error = "Argument to create_data_row() must be either a dictionary, or kwargs containing `row_data` at minimum" - - def convert_field_keys(items): - if not isinstance(items, dict): - raise InvalidQueryError(invalid_argument_error) - return { - key.name if isinstance(key, Field) else key: value - for key, value in items.items() - } + file_upload_thread_count = 1 + args = items if items is not None else kwargs - if items is not None and len(kwargs) > 0: - raise InvalidQueryError(invalid_argument_error) + upload_items = self._separate_and_process_items([args]) + specs = DataRowCreateItem.build(self.uid, upload_items) + task: DataUpsertTask = self._exec_upsert_data_rows( + specs, file_upload_thread_count) - DataRow = Entity.DataRow - args = convert_field_keys(items) if items is not None else kwargs - - if DataRow.row_data.name not in args: - raise InvalidQueryError( - "DataRow.row_data missing when creating DataRow.") - - row_data = args[DataRow.row_data.name] - - if isinstance(row_data, str) and row_data.startswith("s3:/"): - raise InvalidQueryError( - "row_data: s3 assets must start with 'https'.") - - if not isinstance(row_data, str): - # If the row data is an object, upload as a string - args[DataRow.row_data.name] = json.dumps(row_data) - elif os.path.exists(row_data): - # If row data is a local file path, upload it to server. - args[DataRow.row_data.name] = self.client.upload_file(row_data) - - # Parse metadata fields, if they are provided - if DataRow.metadata_fields.name in args: - mdo = self.client.get_data_row_metadata_ontology() - args[DataRow.metadata_fields.name] = mdo.parse_upsert_metadata( - args[DataRow.metadata_fields.name]) - - if "embeddings" in args: - args["embeddings"] = [ - EmbeddingVector(**e).to_gql() for e in args["embeddings"] - ] + task.wait_till_done() - query_str = """mutation CreateDataRowPyApi( - $row_data: String!, - $metadata_fields: [DataRowCustomMetadataUpsertInput!], - $attachments: [DataRowAttachmentInput!], - $media_type : MediaType, - $external_id : String, - $global_key : String, - $dataset: ID!, - $embeddings: [DataRowEmbeddingVectorInput!] - ){ - createDataRow( - data: - { - rowData: $row_data - mediaType: $media_type - metadataFields: $metadata_fields - externalId: $external_id - globalKey: $global_key - attachments: $attachments - dataset: {connect: {id: $dataset}} - embeddings: $embeddings - } - ) - {%s} - } - """ % query.results_query_part(Entity.DataRow) - res = self.client.execute(query_str, {**args, 'dataset': self.uid}) - return DataRow(self.client, res['createDataRow']) + if task.has_errors(): + raise ResourceCreationError( + f"Data row upload errors: {task.errors}", cause=task.uid) + if task.status != "COMPLETE": + raise ResourceCreationError( + f"Data row upload did not complete, task status {task.status} task id {task.uid}" + ) + + res = task.result + if res is None or len(res) == 0: + raise ResourceCreationError( + f"Data row upload did not complete, task status {task.status} task id {task.uid}" + ) + + return self.client.get_data_row(res[0]['id']) def create_data_rows_sync( self, From 62d4fad8e747993d60db06d8022f436f667603c4 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 25 Jun 2024 20:55:01 -0400 Subject: [PATCH 03/13] Fix tests --- .../tests/integration/test_data_rows.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/libs/labelbox/tests/integration/test_data_rows.py b/libs/labelbox/tests/integration/test_data_rows.py index 95d0049be..ba386ae47 100644 --- a/libs/labelbox/tests/integration/test_data_rows.py +++ b/libs/labelbox/tests/integration/test_data_rows.py @@ -10,7 +10,7 @@ from labelbox.schema.media_type import MediaType from labelbox import DataRow, AssetAttachment -from labelbox.exceptions import MalformedQueryException, ResourceCreationError +from labelbox.exceptions import MalformedQueryException, ResourceCreationError, InvalidQueryError from labelbox.schema.task import Task, DataUpsertTask from labelbox.schema.data_row_metadata import DataRowMetadataField, DataRowMetadataKind @@ -128,11 +128,11 @@ def test_get_data_row(data_row, client): def test_create_invalid_aws_data_row(dataset, client): - with pytest.raises(labelbox.exceptions.InvalidQueryError) as exc: + with pytest.raises(InvalidQueryError) as exc: dataset.create_data_row(row_data="s3://labelbox-public-data/invalid") assert "s3" in exc.value.message - with pytest.raises(labelbox.exceptions.InvalidQueryError) as exc: + with pytest.raises(InvalidQueryError) as exc: dataset.create_data_rows([{ "row_data": "s3://labelbox-public-data/invalid" }]) @@ -359,11 +359,11 @@ def test_create_data_row_with_dict_unpacked(dataset, image_url): def test_create_data_row_with_invalid_input(dataset, image_url): - with pytest.raises(labelbox.exceptions.InvalidQueryError) as exc: + with pytest.raises(ResourceCreationError) as exc: dataset.create_data_row("asdf") dr = {"row_data": image_url} - with pytest.raises(labelbox.exceptions.InvalidQueryError) as exc: + with pytest.raises(ResourceCreationError) as exc: dataset.create_data_row(dr, row_data=image_url) @@ -421,7 +421,7 @@ def test_create_data_row_with_invalid_metadata(dataset, image_url): fields.append( DataRowMetadataField(schema_id=TEXT_SCHEMA_ID, value='some msg')) - with pytest.raises(labelbox.exceptions.MalformedQueryException): + with pytest.raises(ResourceCreationError): dataset.create_data_row(row_data=image_url, metadata_fields=fields) @@ -1103,7 +1103,7 @@ def test_invalid_media_type(dataset, conversational_content): ]]: # TODO: What error kind should this be? It looks like for global key we are # using malformed query. But for invalid contents in FileUploads we use InvalidQueryError - with pytest.raises(labelbox.exceptions.InvalidQueryError): + with pytest.raises(ResourceCreationError): dataset.create_data_rows_sync([{ **conversational_content, 'media_type': 'IMAGE' }]) @@ -1137,11 +1137,10 @@ def test_create_data_row_with_attachments(dataset): def test_create_data_row_with_media_type(dataset, image_url): - with pytest.raises(labelbox.exceptions.InvalidQueryError) as exc: + with pytest.raises(ResourceCreationError) as exc: dr = dataset.create_data_row( row_data={'invalid_object': 'invalid_value'}, media_type="IMAGE") - assert "Media type validation failed, expected: 'image/*', was: application/json" in str( - exc.value) + assert "Expected type image/*, detected: application/json" in str(exc.value) dataset.create_data_row(row_data=image_url, media_type="IMAGE") From ecfca90292e03294fc9069db3f5667bd94920cf7 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 25 Jun 2024 21:42:08 -0400 Subject: [PATCH 04/13] Fixing more tests --- .../tests/data/annotation_import/conftest.py | 13 ++++++++----- libs/labelbox/tests/integration/test_data_rows.py | 4 ---- libs/labelbox/tests/integration/test_dataset.py | 4 ++-- .../tests/integration/test_project_model_config.py | 1 - 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/libs/labelbox/tests/data/annotation_import/conftest.py b/libs/labelbox/tests/data/annotation_import/conftest.py index 55453ade3..b37976653 100644 --- a/libs/labelbox/tests/data/annotation_import/conftest.py +++ b/libs/labelbox/tests/data/annotation_import/conftest.py @@ -622,12 +622,15 @@ def configured_project(client, initial_dataset, ontology, rand_gen, image_url): data_row_ids = [] ontologies = ontology["tools"] + ontology["classifications"] + data_row_data = [] for ind in range(len(ontologies)): - data_row_ids.append( - dataset.create_data_row( - row_data=image_url, - global_key=f"gk_{ontologies[ind]['name']}_{rand_gen(str)}", - ).uid) + data_row_data.append({ + "row_data": image_url, + "global_key": f"gk_{ontologies[ind]['name']}_{rand_gen(str)}" + }) + task = dataset.create_data_rows(data_row_data) + task.wait_till_done() + data_row_ids = [row['id'] for row in task.result] project._wait_until_data_rows_are_processed(data_row_ids=data_row_ids, sleep_interval=3) diff --git a/libs/labelbox/tests/integration/test_data_rows.py b/libs/labelbox/tests/integration/test_data_rows.py index ba386ae47..dea4db482 100644 --- a/libs/labelbox/tests/integration/test_data_rows.py +++ b/libs/labelbox/tests/integration/test_data_rows.py @@ -362,10 +362,6 @@ def test_create_data_row_with_invalid_input(dataset, image_url): with pytest.raises(ResourceCreationError) as exc: dataset.create_data_row("asdf") - dr = {"row_data": image_url} - with pytest.raises(ResourceCreationError) as exc: - dataset.create_data_row(dr, row_data=image_url) - def test_create_data_row_with_metadata(mdo, dataset, image_url): client = dataset.client diff --git a/libs/labelbox/tests/integration/test_dataset.py b/libs/labelbox/tests/integration/test_dataset.py index 381c48fb2..51a43a09c 100644 --- a/libs/labelbox/tests/integration/test_dataset.py +++ b/libs/labelbox/tests/integration/test_dataset.py @@ -2,7 +2,7 @@ import requests from unittest.mock import MagicMock from labelbox import Dataset -from labelbox.exceptions import ResourceNotFoundError, InvalidQueryError +from labelbox.exceptions import ResourceNotFoundError, ResourceCreationError from labelbox.schema.internal.descriptor_file_creator import DescriptorFileCreator @@ -128,7 +128,7 @@ def test_create_pdf(dataset): }, media_type="PDF") - with pytest.raises(InvalidQueryError): + with pytest.raises(ResourceCreationError): # Wrong media type dataset.create_data_row(row_data={ "pdfUrl": diff --git a/libs/labelbox/tests/integration/test_project_model_config.py b/libs/labelbox/tests/integration/test_project_model_config.py index 220995e27..7b564b2af 100644 --- a/libs/labelbox/tests/integration/test_project_model_config.py +++ b/libs/labelbox/tests/integration/test_project_model_config.py @@ -1,4 +1,3 @@ -from _pytest import config import pytest from labelbox.exceptions import ResourceNotFoundError From 22b7e19dcc82bf4bcc21598ed7e86c88cf3c7522 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Tue, 25 Jun 2024 22:06:16 -0400 Subject: [PATCH 05/13] Trigger Build --- libs/labelbox/src/labelbox/schema/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index c0f3e7089..660fa9464 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -124,7 +124,7 @@ def data_rows( def create_data_row(self, items=None, **kwargs) -> "DataRow": """ Creates a single DataRow belonging to this dataset. - + Now users upsert >>> dataset.create_data_row(row_data="http://my_site.com/photos/img_01.jpg") Args: From e87caded25d9fef9b00e03ffc851382ee8eea369 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Wed, 26 Jun 2024 08:50:33 -0400 Subject: [PATCH 06/13] Fix more tests --- libs/labelbox/tests/integration/test_data_rows.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/libs/labelbox/tests/integration/test_data_rows.py b/libs/labelbox/tests/integration/test_data_rows.py index dea4db482..454d55b87 100644 --- a/libs/labelbox/tests/integration/test_data_rows.py +++ b/libs/labelbox/tests/integration/test_data_rows.py @@ -1111,8 +1111,6 @@ def test_create_tiled_layer(dataset, tile_content): **tile_content, 'media_type': 'TMS_GEO' }, tile_content, - # Old way to check for backwards compatibility - tile_content['row_data'] ] dataset.create_data_rows_sync(examples) data_rows = list(dataset.data_rows()) From 998fde3a817799b43a0c9c4b83f746684fa49657 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Wed, 26 Jun 2024 20:41:17 -0400 Subject: [PATCH 07/13] Update docstrings --- libs/labelbox/src/labelbox/schema/dataset.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index 660fa9464..2c957a32a 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -124,7 +124,6 @@ def data_rows( def create_data_row(self, items=None, **kwargs) -> "DataRow": """ Creates a single DataRow belonging to this dataset. - Now users upsert >>> dataset.create_data_row(row_data="http://my_site.com/photos/img_01.jpg") Args: @@ -139,7 +138,7 @@ def create_data_row(self, items=None, **kwargs) -> "DataRow": in `kwargs`. InvalidAttributeError: in case the DB object type does not contain any of the field names given in `kwargs`. - + ResourceCreationError: If data row creation failed on the server side. """ file_upload_thread_count = 1 args = items if items is not None else kwargs @@ -184,8 +183,7 @@ def create_data_rows_sync( None. If the function doesn't raise an exception then the import was successful. Raises: - ResourceCreationError: Errors in data row upload - InvalidQueryError: If the `items` parameter does not conform to + ResourceCreationError: If the `items` parameter does not conform to the specification in Dataset._create_descriptor_file or if the server did not accept the DataRow creation request (unknown reason). InvalidAttributeError: If there are fields in `items` not valid for From 0e18d06d997ddd27650751d2db15e389caa8b417 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Thu, 27 Jun 2024 11:59:47 -0400 Subject: [PATCH 08/13] Optimize some tests to create data rows in batch instead of 1 at a time --- .../tests/integration/test_global_keys.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/libs/labelbox/tests/integration/test_global_keys.py b/libs/labelbox/tests/integration/test_global_keys.py index 9349b85d4..3fd3d84d9 100644 --- a/libs/labelbox/tests/integration/test_global_keys.py +++ b/libs/labelbox/tests/integration/test_global_keys.py @@ -108,22 +108,30 @@ def test_long_global_key_validation(client, dataset, image_url): def test_global_key_with_whitespaces_validation(client, dataset, image_url): - dr_1 = dataset.create_data_row(row_data=image_url) - dr_2 = dataset.create_data_row(row_data=image_url) - dr_3 = dataset.create_data_row(row_data=image_url) + data_row_items = [{ + "row_data": image_url, + }, { + "row_data": image_url, + }, { + "row_data": image_url, + }] + task = dataset.create_data_rows(data_row_items) + task.wait_till_done() + assert task.status == "COMPLETE" + dr_1_uid, dr_2_uid, dr_3_uid = [t['id'] for t in task.result] gk_1 = ' global key' gk_2 = 'global key' gk_3 = 'global key ' assignment_inputs = [{ - "data_row_id": dr_1.uid, + "data_row_id": dr_1_uid, "global_key": gk_1 }, { - "data_row_id": dr_2.uid, + "data_row_id": dr_2_uid, "global_key": gk_2 }, { - "data_row_id": dr_3.uid, + "data_row_id": dr_3_uid, "global_key": gk_3 }] res = client.assign_global_keys_to_data_rows(assignment_inputs) @@ -134,7 +142,7 @@ def test_global_key_with_whitespaces_validation(client, dataset, image_url): assign_errors_ids = set([e['data_row_id'] for e in res['errors']]) assign_errors_gks = set([e['global_key'] for e in res['errors']]) assign_errors_msgs = set([e['error'] for e in res['errors']]) - assert assign_errors_ids == set([dr_1.uid, dr_2.uid, dr_3.uid]) + assert assign_errors_ids == set([dr_1_uid, dr_2_uid, dr_3_uid]) assert assign_errors_gks == set([gk_1, gk_2, gk_3]) assert assign_errors_msgs == set([ 'Invalid assignment. Either DataRow does not exist, or globalKey is invalid', From 34a5f92968f624617e27e01b041fde0bcb6e9621 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Thu, 27 Jun 2024 14:42:40 -0400 Subject: [PATCH 09/13] PR feedback: implement create_data_row using create_data_rows_sync --- libs/labelbox/src/labelbox/schema/dataset.py | 36 +++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index 2c957a32a..28ef1adeb 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -140,25 +140,18 @@ def create_data_row(self, items=None, **kwargs) -> "DataRow": any of the field names given in `kwargs`. ResourceCreationError: If data row creation failed on the server side. """ - file_upload_thread_count = 1 - args = items if items is not None else kwargs + invalid_argument_error = "Argument to create_data_row() must be either a dictionary, or kwargs containing `row_data` at minimum" - upload_items = self._separate_and_process_items([args]) - specs = DataRowCreateItem.build(self.uid, upload_items) - task: DataUpsertTask = self._exec_upsert_data_rows( - specs, file_upload_thread_count) + if items is not None and len(kwargs) > 0: + raise InvalidQueryError(invalid_argument_error) - task.wait_till_done() + args = items if items is not None else kwargs - if task.has_errors(): - raise ResourceCreationError( - f"Data row upload errors: {task.errors}", cause=task.uid) - if task.status != "COMPLETE": - raise ResourceCreationError( - f"Data row upload did not complete, task status {task.status} task id {task.uid}" - ) + file_upload_thread_count = 1 + completed_task = self._create_data_rows_sync( + [args], file_upload_thread_count=file_upload_thread_count) - res = task.result + res = completed_task.result if res is None or len(res) == 0: raise ResourceCreationError( f"Data row upload did not complete, task status {task.status} task id {task.uid}" @@ -190,8 +183,17 @@ def create_data_rows_sync( a DataRow. ValueError: When the upload parameters are invalid """ - max_data_rows_supported = 1000 max_attachments_per_data_row = 5 + self._create_data_rows_sync( + items, file_upload_thread_count=file_upload_thread_count) + + return None # Return None if no exception is raised + + def _create_data_rows_sync(self, + items, + file_upload_thread_count=FILE_UPLOAD_THREAD_COUNT + ) -> "DataUpsertTask": + max_data_rows_supported = 1000 if len(items) > max_data_rows_supported: raise ValueError( f"Dataset.create_data_rows_sync() supports a max of {max_data_rows_supported} data rows." @@ -215,7 +217,7 @@ def create_data_rows_sync( f"Data row upload did not complete, task status {task.status} task id {task.uid}" ) - return None + return task def create_data_rows(self, items, From 9d919410f54fd5a6f370943d0f983d96b2bbfc10 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Thu, 27 Jun 2024 17:33:53 -0400 Subject: [PATCH 10/13] Removed max_attachments_per_data_row, not needed any more --- libs/labelbox/src/labelbox/schema/dataset.py | 3 +- .../internal/descriptor_file_creator.py | 38 +++---------------- 2 files changed, 6 insertions(+), 35 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index 28ef1adeb..5cd237f75 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -154,7 +154,7 @@ def create_data_row(self, items=None, **kwargs) -> "DataRow": res = completed_task.result if res is None or len(res) == 0: raise ResourceCreationError( - f"Data row upload did not complete, task status {task.status} task id {task.uid}" + f"Data row upload did not complete, task status {completed_task.status} task id {completed_task.uid}" ) return self.client.get_data_row(res[0]['id']) @@ -183,7 +183,6 @@ def create_data_rows_sync( a DataRow. ValueError: When the upload parameters are invalid """ - max_attachments_per_data_row = 5 self._create_data_rows_sync( items, file_upload_thread_count=file_upload_thread_count) diff --git a/libs/labelbox/src/labelbox/schema/internal/descriptor_file_creator.py b/libs/labelbox/src/labelbox/schema/internal/descriptor_file_creator.py index ac59b5e14..07128fdd1 100644 --- a/libs/labelbox/src/labelbox/schema/internal/descriptor_file_creator.py +++ b/libs/labelbox/src/labelbox/schema/internal/descriptor_file_creator.py @@ -33,26 +33,10 @@ class DescriptorFileCreator: def __init__(self, client: "Client"): self.client = client - """" - This method is used to convert a list to json and upload it in a file to gcs. - It will create multiple files if the size of upload is greater than max_chunk_size_bytes in bytes, - It uploads the files to gcs in parallel, and return a list of urls - Args: - items: The list to upload - is_upsert (bool): Whether the upload is an upsert - max_attachments_per_data_row (int): The maximum number of attachments per data row - max_chunk_size_bytes (int): The maximum size of the file in bytes - """ - - def create(self, - items, - max_attachments_per_data_row=None, - max_chunk_size_bytes=None) -> List[str]: + def create(self, items, max_chunk_size_bytes=None) -> List[str]: is_upsert = True # This class will only support upsert use cases - items = self._prepare_items_for_upload(items, - max_attachments_per_data_row, - is_upsert=is_upsert) + items = self._prepare_items_for_upload(items, is_upsert=is_upsert) json_chunks = self._chunk_down_by_bytes(items, max_chunk_size_bytes) with ThreadPoolExecutor(FILE_UPLOAD_THREAD_COUNT) as executor: futures = [ @@ -62,19 +46,15 @@ def create(self, ] return [future.result() for future in as_completed(futures)] - def create_one(self, items, max_attachments_per_data_row=None) -> List[str]: - items = self._prepare_items_for_upload(items, - max_attachments_per_data_row) + def create_one(self, items) -> List[str]: + items = self._prepare_items_for_upload(items,) # Prepare and upload the descriptor file data = json.dumps(items) return self.client.upload_data(data, content_type="application/json", filename="json_import.json") - def _prepare_items_for_upload(self, - items, - max_attachments_per_data_row=None, - is_upsert=False): + def _prepare_items_for_upload(self, items, is_upsert=False): """ This function is used to prepare the input file. The user defined input is validated, processed, and json stringified. Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed as a parameter to a mutation that uploads data rows @@ -102,8 +82,6 @@ def _prepare_items_for_upload(self, Args: items (iterable of (dict or str)): See above for details. - max_attachments_per_data_row (Optional[int]): Param used during attachment validation to determine - if the user has provided too many attachments. Returns: uri (string): A reference to the uploaded json data. @@ -137,12 +115,6 @@ def validate_attachments(item): attachments = item.get('attachments') if attachments: if isinstance(attachments, list): - if max_attachments_per_data_row and len( - attachments) > max_attachments_per_data_row: - raise ValueError( - f"Max attachments number of supported attachments per data row is {max_attachments_per_data_row}." - f" Found {len(attachments)}. Condense multiple attachments into one with the HTML attachment type if necessary." - ) for attachment in attachments: AssetAttachment.validate_attachment_json(attachment) else: From 3fa003ea9f3670a0a6146f7cdd9a5cff2c77f7ba Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Fri, 28 Jun 2024 09:42:12 -0400 Subject: [PATCH 11/13] Add deprecation --- libs/labelbox/src/labelbox/schema/dataset.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index 5cd237f75..0157cc60d 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -183,6 +183,10 @@ def create_data_rows_sync( a DataRow. ValueError: When the upload parameters are invalid """ + warnings.warn( + "This method is deprecated and will be " + "removed in a future release. Please use create_data_rows instead.") + self._create_data_rows_sync( items, file_upload_thread_count=file_upload_thread_count) From b5e247a2c54a6b5595d873f470ba4b9361012b26 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Fri, 28 Jun 2024 13:34:08 -0400 Subject: [PATCH 12/13] Update _create_data_rows_sync to user create_data_rows --- libs/labelbox/src/labelbox/schema/dataset.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libs/labelbox/src/labelbox/schema/dataset.py b/libs/labelbox/src/labelbox/schema/dataset.py index 0157cc60d..3d026a623 100644 --- a/libs/labelbox/src/labelbox/schema/dataset.py +++ b/libs/labelbox/src/labelbox/schema/dataset.py @@ -206,10 +206,8 @@ def _create_data_rows_sync(self, raise ValueError( "file_upload_thread_count must be a positive integer") - upload_items = self._separate_and_process_items(items) - specs = DataRowCreateItem.build(self.uid, upload_items) - task: DataUpsertTask = self._exec_upsert_data_rows( - specs, file_upload_thread_count) + task: DataUpsertTask = self.create_data_rows(items, + file_upload_thread_count) task.wait_till_done() if task.has_errors(): From 0f74e70161b7f8c22afa97fb36449e11229c7612 Mon Sep 17 00:00:00 2001 From: Val Brodsky Date: Fri, 28 Jun 2024 15:06:15 -0400 Subject: [PATCH 13/13] Trigger Build