-
Notifications
You must be signed in to change notification settings - Fork 68
[PLT-43] Vb/create datarows chunking plt 43 #1627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
65066b7
Refactor upsert code so that it can be reused for create
3d9e528
Add support for task errors
ce942d8
Support for uploading from files
cefbede
Fixing tests
986078a
Add hash function to task
d4fc695
Remove 150K data rows limitation
79d3d0c
Add docstrings
6ba61bb
PR feedback
50098fe
Remove result_all error_all
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
287 changes: 287 additions & 0 deletions
287
libs/labelbox/src/labelbox/schema/internal/data_row_uploader.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
import json | ||
import os | ||
from concurrent.futures import ThreadPoolExecutor, as_completed | ||
|
||
from typing import Iterable, List | ||
|
||
from labelbox.exceptions import InvalidQueryError | ||
from labelbox.exceptions import InvalidAttributeError | ||
from labelbox.exceptions import MalformedQueryException | ||
from labelbox.orm.model import Entity | ||
from labelbox.orm.model import Field | ||
from labelbox.schema.embedding import EmbeddingVector | ||
from labelbox.pydantic_compat import BaseModel | ||
from labelbox.schema.internal.datarow_upload_constants import ( | ||
MAX_DATAROW_PER_API_OPERATION, FILE_UPLOAD_THREAD_COUNT) | ||
from labelbox.schema.internal.data_row_upsert_item import DataRowUpsertItem | ||
|
||
|
||
class UploadManifest(BaseModel): | ||
source: str | ||
item_count: int | ||
chunk_uris: List[str] | ||
|
||
|
||
class DataRowUploader: | ||
|
||
@staticmethod | ||
def create_descriptor_file(client, | ||
items, | ||
max_attachments_per_data_row=None, | ||
is_upsert=False): | ||
""" | ||
This function is shared by `Dataset.create_data_rows`, `Dataset.create_data_rows_sync` and `Dataset.update_data_rows`. | ||
It is used to prepare the input file. The user defined input is validated, processed, and json stringified. | ||
Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed as a parameter to a mutation that uploads data rows | ||
|
||
Each element in `items` can be either a `str` or a `dict`. If | ||
it is a `str`, then it is interpreted as a local file path. The file | ||
is uploaded to Labelbox and a DataRow referencing it is created. | ||
|
||
If an item is a `dict`, then it could support one of the two following structures | ||
1. For static imagery, video, and text it should map `DataRow` field names to values. | ||
At the minimum an `items` passed as a `dict` must contain a `row_data` key and value. | ||
If the value for row_data is a local file path and the path exists, | ||
then the local file will be uploaded to labelbox. | ||
|
||
2. For tiled imagery the dict must match the import structure specified in the link below | ||
https://docs.labelbox.com/data-model/en/index-en#tiled-imagery-import | ||
|
||
>>> dataset.create_data_rows([ | ||
>>> {DataRow.row_data:"http://my_site.com/photos/img_01.jpg"}, | ||
>>> {DataRow.row_data:"/path/to/file1.jpg"}, | ||
>>> "path/to/file2.jpg", | ||
>>> {DataRow.row_data: {"tileLayerUrl" : "http://", ...}} | ||
>>> {DataRow.row_data: {"type" : ..., 'version' : ..., 'messages' : [...]}} | ||
>>> ]) | ||
|
||
For an example showing how to upload tiled data_rows see the following notebook: | ||
https://github.com/Labelbox/labelbox-python/blob/ms/develop/model_assisted_labeling/tiled_imagery_mal.ipynb | ||
|
||
Args: | ||
items (iterable of (dict or str)): See above for details. | ||
max_attachments_per_data_row (Optional[int]): Param used during attachment validation to determine | ||
if the user has provided too many attachments. | ||
|
||
Returns: | ||
uri (string): A reference to the uploaded json data. | ||
|
||
Raises: | ||
InvalidQueryError: If the `items` parameter does not conform to | ||
the specification above or if the server did not accept the | ||
DataRow creation request (unknown reason). | ||
InvalidAttributeError: If there are fields in `items` not valid for | ||
a DataRow. | ||
ValueError: When the upload parameters are invalid | ||
""" | ||
file_upload_thread_count = FILE_UPLOAD_THREAD_COUNT | ||
DataRow = Entity.DataRow | ||
AssetAttachment = Entity.AssetAttachment | ||
|
||
def upload_if_necessary(item): | ||
if is_upsert and 'row_data' not in item: | ||
# When upserting, row_data is not required | ||
return item | ||
row_data = item['row_data'] | ||
if isinstance(row_data, str) and os.path.exists(row_data): | ||
item_url = client.upload_file(row_data) | ||
item['row_data'] = item_url | ||
if 'external_id' not in item: | ||
# Default `external_id` to local file name | ||
item['external_id'] = row_data | ||
return item | ||
|
||
def validate_attachments(item): | ||
attachments = item.get('attachments') | ||
if attachments: | ||
if isinstance(attachments, list): | ||
if max_attachments_per_data_row and len( | ||
attachments) > max_attachments_per_data_row: | ||
raise ValueError( | ||
f"Max attachments number of supported attachments per data row is {max_attachments_per_data_row}." | ||
f" Found {len(attachments)}. Condense multiple attachments into one with the HTML attachment type if necessary." | ||
) | ||
for attachment in attachments: | ||
AssetAttachment.validate_attachment_json(attachment) | ||
else: | ||
raise ValueError( | ||
f"Attachments must be a list. Found {type(attachments)}" | ||
) | ||
return attachments | ||
|
||
def validate_embeddings(item): | ||
embeddings = item.get("embeddings") | ||
if embeddings: | ||
item["embeddings"] = [ | ||
EmbeddingVector(**e).to_gql() for e in embeddings | ||
] | ||
|
||
def validate_conversational_data(conversational_data: list) -> None: | ||
""" | ||
Checks each conversational message for keys expected as per https://docs.labelbox.com/reference/text-conversational#sample-conversational-json | ||
|
||
Args: | ||
conversational_data (list): list of dictionaries. | ||
""" | ||
|
||
def check_message_keys(message): | ||
accepted_message_keys = set([ | ||
"messageId", "timestampUsec", "content", "user", "align", | ||
"canLabel" | ||
]) | ||
for key in message.keys(): | ||
if not key in accepted_message_keys: | ||
raise KeyError( | ||
f"Invalid {key} key found! Accepted keys in messages list is {accepted_message_keys}" | ||
) | ||
|
||
if conversational_data and not isinstance(conversational_data, | ||
list): | ||
raise ValueError( | ||
f"conversationalData must be a list. Found {type(conversational_data)}" | ||
) | ||
|
||
[check_message_keys(message) for message in conversational_data] | ||
|
||
def parse_metadata_fields(item): | ||
metadata_fields = item.get('metadata_fields') | ||
if metadata_fields: | ||
mdo = client.get_data_row_metadata_ontology() | ||
item['metadata_fields'] = mdo.parse_upsert_metadata( | ||
metadata_fields) | ||
|
||
def format_row(item): | ||
# Formats user input into a consistent dict structure | ||
if isinstance(item, dict): | ||
# Convert fields to strings | ||
item = { | ||
key.name if isinstance(key, Field) else key: value | ||
for key, value in item.items() | ||
} | ||
elif isinstance(item, str): | ||
# The main advantage of using a string over a dict is that the user is specifying | ||
# that the file should exist locally. | ||
# That info is lost after this section so we should check for it here. | ||
if not os.path.exists(item): | ||
raise ValueError(f"Filepath {item} does not exist.") | ||
item = {"row_data": item, "external_id": item} | ||
return item | ||
|
||
def validate_keys(item): | ||
if not is_upsert and 'row_data' not in item: | ||
raise InvalidQueryError( | ||
"`row_data` missing when creating DataRow.") | ||
|
||
if isinstance(item.get('row_data'), | ||
str) and item.get('row_data').startswith("s3:/"): | ||
raise InvalidQueryError( | ||
"row_data: s3 assets must start with 'https'.") | ||
allowed_extra_fields = { | ||
'attachments', 'media_type', 'dataset_id', 'embeddings' | ||
} | ||
invalid_keys = set(item) - {f.name for f in DataRow.fields() | ||
} - allowed_extra_fields | ||
if invalid_keys: | ||
raise InvalidAttributeError(DataRow, invalid_keys) | ||
return item | ||
|
||
def format_legacy_conversational_data(item): | ||
messages = item.pop("conversationalData") | ||
version = item.pop("version", 1) | ||
type = item.pop("type", "application/vnd.labelbox.conversational") | ||
if "externalId" in item: | ||
external_id = item.pop("externalId") | ||
item["external_id"] = external_id | ||
if "globalKey" in item: | ||
global_key = item.pop("globalKey") | ||
item["globalKey"] = global_key | ||
validate_conversational_data(messages) | ||
one_conversation = \ | ||
{ | ||
"type": type, | ||
"version": version, | ||
"messages": messages | ||
} | ||
item["row_data"] = one_conversation | ||
return item | ||
|
||
def convert_item(data_row_item): | ||
if isinstance(data_row_item, DataRowUpsertItem): | ||
item = data_row_item.payload | ||
else: | ||
item = data_row_item | ||
|
||
if "tileLayerUrl" in item: | ||
validate_attachments(item) | ||
return item | ||
|
||
if "conversationalData" in item: | ||
format_legacy_conversational_data(item) | ||
|
||
# Convert all payload variations into the same dict format | ||
item = format_row(item) | ||
# Make sure required keys exist (and there are no extra keys) | ||
validate_keys(item) | ||
# Make sure attachments are valid | ||
validate_attachments(item) | ||
# Make sure embeddings are valid | ||
validate_embeddings(item) | ||
# Parse metadata fields if they exist | ||
parse_metadata_fields(item) | ||
# Upload any local file paths | ||
item = upload_if_necessary(item) | ||
|
||
if isinstance(data_row_item, DataRowUpsertItem): | ||
return {'id': data_row_item.id, 'payload': item} | ||
else: | ||
return item | ||
|
||
if not isinstance(items, Iterable): | ||
raise ValueError( | ||
f"Must pass an iterable to create_data_rows. Found {type(items)}" | ||
) | ||
|
||
if len(items) > MAX_DATAROW_PER_API_OPERATION: | ||
raise MalformedQueryException( | ||
f"Cannot create more than {MAX_DATAROW_PER_API_OPERATION} DataRows per function call." | ||
) | ||
|
||
with ThreadPoolExecutor(file_upload_thread_count) as executor: | ||
futures = [executor.submit(convert_item, item) for item in items] | ||
items = [future.result() for future in as_completed(futures)] | ||
# Prepare and upload the desciptor file | ||
data = json.dumps(items) | ||
return client.upload_data(data, | ||
content_type="application/json", | ||
filename="json_import.json") | ||
|
||
@staticmethod | ||
def upload_in_chunks(client, specs: List[DataRowUpsertItem], | ||
file_upload_thread_count: int, | ||
upsert_chunk_size: int) -> UploadManifest: | ||
empty_specs = list(filter(lambda spec: spec.is_empty(), specs)) | ||
|
||
if empty_specs: | ||
ids = list(map(lambda spec: spec.id.get("value"), empty_specs)) | ||
raise ValueError( | ||
f"The following items have an empty payload: {ids}") | ||
|
||
chunks = [ | ||
specs[i:i + upsert_chunk_size] | ||
for i in range(0, len(specs), upsert_chunk_size) | ||
] | ||
|
||
def _upload_chunk(chunk): | ||
return DataRowUploader.create_descriptor_file(client, | ||
chunk, | ||
is_upsert=True) | ||
|
||
with ThreadPoolExecutor(file_upload_thread_count) as executor: | ||
futures = [ | ||
executor.submit(_upload_chunk, chunk) for chunk in chunks | ||
] | ||
chunk_uris = [future.result() for future in as_completed(futures)] | ||
|
||
return UploadManifest(source="SDK", | ||
item_count=len(specs), | ||
chunk_uris=chunk_uris) |
50 changes: 50 additions & 0 deletions
50
libs/labelbox/src/labelbox/schema/internal/data_row_upsert_item.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from typing import List, Tuple, Optional | ||
|
||
from labelbox.schema.identifiable import UniqueId, GlobalKey | ||
from labelbox.pydantic_compat import BaseModel | ||
|
||
|
||
class DataRowUpsertItem(BaseModel): | ||
""" | ||
Base class for creating payloads for upsert operations. | ||
""" | ||
id: dict | ||
payload: dict | ||
|
||
@classmethod | ||
def build( | ||
cls, | ||
dataset_id: str, | ||
items: List[dict], | ||
key_types: Optional[Tuple[type, ...]] = () | ||
) -> List["DataRowUpsertItem"]: | ||
upload_items = [] | ||
|
||
for item in items: | ||
# enforce current dataset's id for all specs | ||
item['dataset_id'] = dataset_id | ||
key = item.pop('key', None) | ||
if not key: | ||
key = {'type': 'AUTO', 'value': ''} | ||
elif isinstance(key, key_types): # type: ignore | ||
key = {'type': key.id_type.value, 'value': key.key} | ||
else: | ||
if not key_types: | ||
raise ValueError( | ||
f"Can not have a key for this item, got: {key}") | ||
raise ValueError( | ||
f"Key must be an instance of {', '.join([t.__name__ for t in key_types])}, got: {type(item['key']).__name__}" | ||
) | ||
item = { | ||
k: v for k, v in item.items() if v is not None | ||
} # remove None values | ||
upload_items.append(cls(payload=item, id=key)) | ||
return upload_items | ||
|
||
def is_empty(self) -> bool: | ||
""" | ||
The payload is considered empty if it's actually empty or the only key is `dataset_id`. | ||
:return: bool | ||
""" | ||
return (not self.payload or | ||
len(self.payload.keys()) == 1 and "dataset_id" in self.payload) |
4 changes: 4 additions & 0 deletions
4
libs/labelbox/src/labelbox/schema/internal/datarow_upload_constants.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
MAX_DATAROW_PER_API_OPERATION = 150_000 | ||
FILE_UPLOAD_THREAD_COUNT = 20 | ||
UPSERT_CHUNK_SIZE = 10_000 | ||
DOWNLOAD_RESULT_PAGE_SIZE = 5_000 |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.