|
| 1 | +import pandas as pd |
| 2 | +from labelbox import Client as labelboxClient |
| 3 | +import labelbase |
| 4 | +from labelpandas import connector |
| 5 | +from tqdm import tqdm |
| 6 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
| 7 | + |
| 8 | +def create_data_row_upload_dict(client:labelboxClient, table:pd.core.frame.DataFrame, |
| 9 | + row_data_col:str, global_key_col:str, external_id_col:str, |
| 10 | + metadata_index:dict, attachment_index:dict, |
| 11 | + divider:str, verbose:bool): |
| 12 | + """ Multithreads over a Pandas DataFrame, calling create_data_rows() on each row to return an upload dictionary |
| 13 | + Args: |
| 14 | + table : Required (pandas.core.frame.DataFrame) - Pandas DataFrame |
| 15 | + client : Required (labelbox.client.Client) - Labelbox Client object |
| 16 | + row_data_col : Required (str) - Column containing asset URL or file path |
| 17 | + global_key_col : Required (str) - Column name containing the data row global key - defaults to row data |
| 18 | + external_id_col : Required (str) - Column name containing the data row external ID - defaults to global key |
| 19 | + metadata_index : Required (dict) - Dictonary where {key=column_name : value=metadata_type} |
| 20 | + attachment_index : Required (dict) - Dictonary where {key=column_name : value=attachment_type} |
| 21 | + divider : Required (str) - String delimiter for all name keys generated for parent/child schemas |
| 22 | + verbose : Required (bool) - If True, prints details about code execution; if False, prints minimal information |
| 23 | + Returns: |
| 24 | + Two values: |
| 25 | + - global_key_to_upload_dict - Dictionary where {key=global_key : value=data row dictionary in upload format} |
| 26 | + - errors - List of dictionaries containing conversion error information; see connector.create_data_rows() for more information |
| 27 | + """ |
| 28 | + table_length = connector.get_table_length_function(table=table) |
| 29 | + if verbose: |
| 30 | + print(f'Creating upload list - {table_length} rows in Pandas DataFrame') |
| 31 | + if table_length != connector.get_unique_values_function(table=table, column_name=global_key_col): |
| 32 | + print(f"Warning: Your global key column is not unique - upload will resume, only uploading 1 data row per unique global key") |
| 33 | + metadata_schema_to_name_key = labelbase.metadata.get_metadata_schema_to_name_key(client=lb_client, lb_mdo=False, divider=divider, invert=False) |
| 34 | + metadata_name_key_to_schema = labelbase.metadata.get_metadata_schema_to_name_key(client=lb_client, lb_mdo=False, divider=divider, invert=True) |
| 35 | + with ThreadPoolExecutor(max_workers=8) as exc: |
| 36 | + global_key_to_upload_dict = {} |
| 37 | + errors = [] |
| 38 | + futures = [] |
| 39 | + if verbose: |
| 40 | + print(f'Submitting data rows...') |
| 41 | + for index, row in tqdm(table.iterrows()): |
| 42 | + futures.append(exc.submit( |
| 43 | + create_data_rows, client, row, metadata_name_key_to_schema, metadata_schema_to_name_key, |
| 44 | + row_data_col, global_key_col, external_id_col, metadata_index, attachment_index, divider |
| 45 | + )) |
| 46 | + else: |
| 47 | + for index, row in table.iterrows(): |
| 48 | + futures.append(exc.submit( |
| 49 | + create_data_rows, client, row, metadata_name_key_to_schema, metadata_schema_to_name_key, |
| 50 | + row_data_col, global_key_col, external_id_col, metadata_index, attachment_index, divider |
| 51 | + )) |
| 52 | + if verbose: |
| 53 | + print(f'Processing data rows...') |
| 54 | + for f in tqdm(as_completed(futures)): |
| 55 | + res = f.result() |
| 56 | + if res['error']: |
| 57 | + errors.append(res) |
| 58 | + else: |
| 59 | + global_key_to_upload_dict[str(res['data_row']["global_key"])] = res['data_row'] |
| 60 | + else: |
| 61 | + for f in as_completed(futures): |
| 62 | + res = f.result() |
| 63 | + if res['error']: |
| 64 | + errors.append(res) |
| 65 | + else: |
| 66 | + global_key_to_upload_dict[str(res['data_row']["global_key"])] = res['data_row'] |
| 67 | + if verbose: |
| 68 | + print(f'Generated upload list - {len(global_key_to_upload_dict)} data rows to upload') |
| 69 | + return global_key_to_upload_dict, errors |
| 70 | + |
| 71 | +def create_data_rows(client:labelboxClient, row:pandas.core.series.Series, |
| 72 | + metadata_name_key_to_schema:dict, metadata_schema_to_name_key:dict, |
| 73 | + row_data_col:str, global_key_col:str, external_id_col:str, |
| 74 | + metadata_index:dict, attachment_index:dict, |
| 75 | + divider:str): |
| 76 | + """ Function to-be-multithreaded to create data row dictionaries from a Pandas DataFrame |
| 77 | + Args: |
| 78 | + client : Required (labelbox.client.Client) - Labelbox Client object |
| 79 | + row : Required (pandas.core.series.Series) - Pandas Series object, corresponds to one row in a df.iterrow() |
| 80 | + metadata_name_key_to_schema : Required (dict) - Dictionary where {key=metadata_field_name_key : value=metadata_schema_id} |
| 81 | + metadata_schema_to_name_key : Required (dict) - Inverse of metadata_name_key_to_schema |
| 82 | + row_data_col : Required (str) - Column containing asset URL or file path |
| 83 | + global_key_col : Required (str) - Column name containing the data row global key |
| 84 | + external_id_col : Required (str) - Column name containing the data row external ID |
| 85 | + metadata_index : Required (dict) - Dictonary where {key=column_name : value=metadata_type} |
| 86 | + attachment_index : Required (dict) - Dictonary where {key=column_name : value=attachment_type} |
| 87 | + divider : Required (str) - String delimiter for all name keys generated for parent/child schemas |
| 88 | + Returns: |
| 89 | + A dictionary with "error" and "data_row" keys: |
| 90 | + - "error" - If there's value in the "error" key, the script will scip it on upload and return the error at the end |
| 91 | + - "data_row" - Dictionary with "global_key" "external_id" "row_data" and "metadata_fields" keys in the proper format to-be-uploaded |
| 92 | + """ |
| 93 | + return_value = {"error" : None, "data_row" : {}} |
| 94 | + try: |
| 95 | + return_value["data_row"]["row_data"] = str(row[row_data_col]) |
| 96 | + return_value["data_row"]["global_key"] = str(row[global_key_col]) |
| 97 | + return_value["data_row"]["external_id"] = str(row[external_id_col]) |
| 98 | + metadata_fields = [{"schema_id" : metadata_name_key_to_schema['lb_integration_source'], "value" : "Pandas"}] |
| 99 | + if metadata_index: |
| 100 | + for metadata_field_name in metadata_index.keys(): |
| 101 | + input_metadata = labelbase.metadata.process_metadata_value( |
| 102 | + client=client, metadata_value=row[metadata_field_name], metadata_type=metadata_index[metadata_field_name], |
| 103 | + parent_name=metadata_field_name, metadata_name_key_to_schema=metadata_name_key_to_schema, divider=divider |
| 104 | + ) |
| 105 | + if input_metadata: |
| 106 | + metadata_fields.append({"schema_id" : metadata_name_key_to_schema[metadata_field_name], "value" : input_metadata}) |
| 107 | + else: |
| 108 | + continue |
| 109 | + return_value["data_row"]["metadata_fields"] = metadata_fields |
| 110 | + if attachment_index: |
| 111 | + return_value['data_row']['attachments'] = [] |
| 112 | + for column_name in attachment_index: |
| 113 | + return_value['data_row']['attachments'].append({"type" : attachment_index[column_name], "value" : row[column_name]}) |
| 114 | + except Exception as e: |
| 115 | + return_value["error"] = e |
| 116 | + return_value["data_row"]["global_key"] = str(row[global_key_col]) |
| 117 | + return return_value |
0 commit comments