Skip to content
This repository was archived by the owner on Jul 29, 2024. It is now read-only.

Commit f5b1554

Browse files
Update connector.py
1 parent 0a35b79 commit f5b1554

File tree

1 file changed

+0
-190
lines changed

1 file changed

+0
-190
lines changed

labelpandas/connector.py

Lines changed: 0 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -1,194 +1,4 @@
1-
from labelbase.metadata import get_metadata_schema_to_name_key, process_metadata_value
2-
from labelbase.ontology import get_ontology_schema_to_name_path
3-
from labelbox import labelboxClient
41
import pandas
5-
from concurrent.futures import ThreadPoolExecutor, as_completed
6-
from tqdm.autonotebook import tqdm
7-
import math
8-
9-
def create_batches(table=pandas.core.frame.DataFrame, global_key_col:str, project_id_col:str, global_key_to_data_row_id:dict):
10-
""" From a Pandas DataFrame, creates a dictionary where {key=project_id : value=list_of_data_row_ids}
11-
Args:
12-
table : Required (pandas.core.frame.DataFrame) - Pandas DataFrame
13-
global_key_col : Required (str) - Column name containing the data row global key - defaults to row data
14-
project_id_col : Required (str) - Column name containing the project ID to batch a given row to
15-
global_key_to_data_row_id : Required (dict) - Dictionary where {key=global_key : value=data_row_id}
16-
Returns:
17-
Dictionary where {key=project_id : value=list_of_data_row_ids}
18-
"""
19-
project_id_to_batch_dict = {}
20-
errors = []
21-
if not project_id_col:
22-
errors = f"No project_id_col provided - please provide a column indicating what project to batch data rows to"
23-
else:
24-
try:
25-
column_names = get_columns_function(table)
26-
if project_id_col not in column_names:
27-
raise ValueError(f"Provided value for project_id_col `{project_id_col}` not in provided table column names")
28-
for index, row in table.iterrows():
29-
project_id = row[project_id_col]
30-
data_row_id = global_key_to_data_row_id[row[global_key_col]]
31-
if project_id not in project_id_to_batch_dict.keys():
32-
project_id_to_batch_dict[project_id] = []
33-
project_id_to_batch_dict[project_id].append(data_row_id)
34-
except Exception as e:
35-
errors = e
36-
return project_id_to_batch_dict, errors
37-
38-
def create_annotation_upload_dict(client:labelboxClient, table:pandas.core.frame.DataFrame, row_data_col:str, global_key_col:str,
39-
project_id_col:str, annotation_index:dict, divider:str="///", verbose:bool=False):
40-
if not annotation_index:
41-
project_id_to_upload_dict = {}
42-
errors = f"No annotation index provided - no annotations uploaded"
43-
else:
44-
try:
45-
project_id_to_upload_dict = {project_id : [] for project_id in get_unique_values_function(table, project_id_col)}
46-
for project_id in project_id_to_upload_dict:
47-
project_id_to_upload_dict[project_id] = []
48-
project_id_to_ontology_index[project_id] = get_ontology_schema_to_name_path(
49-
ontology=client.get_project(project_id).ontology(), divider=divider, invert=True
50-
)
51-
if verbose:
52-
for index, row in tqdm(table.iterrows()):
53-
for column_name in annotation_index.keys():
54-
ndjsons = create_ndjsons(
55-
annotation_values=row[column_name],
56-
annotation_type=annotation_index[column_name],
57-
ontology_index=project_id_to_ontology_index[row[project_id_col]],
58-
divide=divider
59-
)
60-
for ndjson in ndjsons:
61-
project_id_to_upload_dict[row[project_id_col]].append(ndjson)
62-
for index, row in table.iterrows():
63-
for column_name in annotation_index.keys():
64-
ndjsons = create_ndjsons(
65-
annotation_values=row[column_name],
66-
annotation_type=annotation_index[column_name],
67-
ontology_index=project_id_to_ontology_index[row[project_id_col]],
68-
divide=divider
69-
)
70-
for ndjson in ndjsons:
71-
project_id_to_upload_dict[row[project_id_col]].append(ndjson)
72-
except Exception as e:
73-
errors = e
74-
return project_id_to_upload_dict, errors
75-
76-
def create_data_row_upload_dict(client:labelboxClient, table:pandas.core.frame.DataFrame, row_data_col:str,
77-
global_key_col:str="", external_id_col:str="", metadata_index:dict={}, attachment_index:dict=attachment_index
78-
local_files:bool=False, divider:str="///", verbose=False):
79-
""" Multithreads over a Pandas DataFrame, calling create_data_rows() on each row to return an upload dictionary
80-
Args:
81-
table : Required (pandas.core.frame.DataFrame) - Pandas DataFrame
82-
client : Required (labelbox.client.Client) - Labelbox Client object
83-
row_data_col : Required (str) - Column containing asset URL or file path
84-
global_key_col : Optional (str) - Column name containing the data row global key - defaults to row data
85-
external_id_col : Optional (str) - Column name containing the data row external ID - defaults to global key
86-
metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type}
87-
metadata_type must be either "enum", "string", "datetime" or "number"
88-
local_files : Optional (bool) - Determines how to handle row_data_col values
89-
If True, treats row_data_col values as file paths uploads the local files to Labelbox
90-
If False, treats row_data_col values as urls (assuming delegated access is set up)
91-
divider : Optional (str) - String delimiter for all name keys generated for parent/child schemas
92-
verbose : Optional (bool) - If True, prints details about code execution; if False, prints minimal information
93-
Returns:
94-
Two values:
95-
- global_key_to_upload_dict - Dictionary where {key=global_key : value=data row dictionary in upload format}
96-
- errors - List of dictionaries containing conversion error information; see connector.create_data_rows() for more information
97-
"""
98-
global_key_col = global_key_col if global_key_col else row_data_col
99-
external_id_col = external_id_col if external_id_col else global_key_col
100-
if verbose:
101-
print(f'Creating upload list - {get_table_length_function(table)} rows in Pandas DataFrame')
102-
if get_table_length_function(table=table) != get_unique_values_function(table=table, column_name=global_key_col):
103-
print(f"Warning: Your global key column is not unique - upload will resume, only uploading 1 data row for duplicate global keys")
104-
metadata_schema_to_name_key = get_metadata_schema_to_name_key(client=lb_client, lb_mdo=False, divider=divider, invert=False)
105-
metadata_name_key_to_schema = get_metadata_schema_to_name_key(client=lb_client, lb_mdo=False, divider=divider, invert=True)
106-
with ThreadPoolExecutor(max_workers=8) as exc:
107-
global_key_to_upload_dict = {}
108-
errors = []
109-
futures = []
110-
if verbose:
111-
print(f'Submitting data rows...')
112-
for index, row in tqdm(table.iterrows()):
113-
futures.append(exc.submit(
114-
create_data_rows, client, row, metadata_name_key_to_schema, metadata_schema_to_name_key,
115-
row_data_col, global_key_col, external_id_col, metadata_index, attachment_index, local_files, divider
116-
))
117-
else:
118-
for index, row in table.iterrows():
119-
futures.append(exc.submit(
120-
create_data_rows, client, row, metadata_name_key_to_schema, metadata_schema_to_name_key,
121-
row_data_col, global_key_col, external_id_col, metadata_index, attachment_index, local_files, divider
122-
))
123-
if verbose:
124-
print(f'Processing data rows...')
125-
for f in tqdm(as_completed(futures)):
126-
res = f.result()
127-
if res['error']:
128-
errors.append(res)
129-
else:
130-
global_key_to_upload_dict[str(res['data_row']["global_key"])] = res['data_row']
131-
else:
132-
for f in as_completed(futures):
133-
res = f.result()
134-
if res['error']:
135-
errors.append(res)
136-
else:
137-
global_key_to_upload_dict[str(res['data_row']["global_key"])] = res['data_row']
138-
if verbose:
139-
print(f'Generated upload list - {len(global_key_to_upload_dict)} data rows to upload')
140-
return global_key_to_upload_dict, errors
141-
142-
def create_data_rows(client:labelboxClient, row:pandas.core.series.Series,
143-
metadata_name_key_to_schema:dict, metadata_schema_to_name_key:dict, row_data_col:str,
144-
global_key_col:str, external_id_col:str, metadata_index:dict, attachment_index:dict, local_files:bool, divider:str):
145-
""" Function to-be-multithreaded to create data row dictionaries from a Pandas DataFrame
146-
Args:
147-
client. : Required (labelbox.client.Client) - Labelbox Client object
148-
row : Required (pandas.core.series.Series) - Pandas Series object, corresponds to one row in a df.iterrow()
149-
metadata_name_key_to_schema : Required (dict) - Dictionary where {key=metadata_field_name_key : value=metadata_schema_id}
150-
metadata_schema_to_name_key : Required (dict) - Inverse of metadata_name_key_to_schema
151-
row_data_col : Required (str) - Column containing asset URL or file path
152-
global_key_col : Required (str) - Column name containing the data row global key
153-
external_id_col : Required (str) - Column name containing the data row external ID
154-
metadata_index : Required (dict) - Dictionary where {key=column_name : value=metadata_type}
155-
metadata_type must be either "enum", "string", "datetime" or "number"
156-
attachment_index : Required (dict) - Dictionary where {key=column_name : value=attachment_type}
157-
attachment_type must be one of "IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL"
158-
local_files : Required (bool) - Determines how to handle row_data_col values
159-
If True, treats row_data_col values as file paths uploads the local files to Labelbox
160-
If False, treats row_data_col values as urls (assuming delegated access is set up)
161-
divider : Required (str) - String delimiter for all name keys generated for parent/child schemas
162-
Returns:
163-
A dictionary with "error" and "data_row" keys:
164-
- "error" - If there's value in the "error" key, the script will scip it on upload and return the error at the end
165-
- "data_row" - Dictionary with "global_key" "external_id" "row_data" and "metadata_fields" keys in the proper format to-be-uploaded
166-
"""
167-
return_value = {"error" : None, "data_row" : {}}
168-
try:
169-
return_value["data_row"]["row_data"] = client.upload_file(str(row[row_data_col])) if local_files else str(row[row_data_col])
170-
return_value["data_row"]["global_key"] = str(row[global_key_col])
171-
return_value["data_row"]["external_id"] = str(row[external_id_col])
172-
metadata_fields = [{"schema_id" : metadata_name_key_to_schema['lb_integration_source'], "value" : "Pandas"}]
173-
if metadata_index:
174-
for metadata_field_name in metadata_index.keys():
175-
input_metadata = process_metadata_value(
176-
client=client, metadata_value=row[metadata_field_name], metadata_type=metadata_index[metadata_field_name],
177-
parent_name=metadata_field_name, metadata_name_key_to_schema=metadata_name_key_to_schema, divider=divider
178-
)
179-
if input_metadata:
180-
metadata_fields.append({"schema_id" : metadata_name_key_to_schema[metadata_field_name], "value" : input_metadata})
181-
else:
182-
continue
183-
return_value["data_row"]["metadata_fields"] = metadata_fields
184-
if attachment_index:
185-
return_value['data_row']['attachments'] = []
186-
for column_name in attachment_index:
187-
return_value['data_row']['attachments'].append({"type" : attachment_index[column_name], "value" : row[column_name]})
188-
except Exception as e:
189-
return_value["error"] = e
190-
return_value["data_row"]["global_key"] = str(row[global_key_col])
191-
return return_value
1922

1933
def get_columns_function(table:pandas.core.frame.DataFrame, extra_client=None):
1944
"""Grabs all column names from a Pandas DataFrame

0 commit comments

Comments
 (0)