1
1
import labelbox
2
2
from labelbox import Client as labelboxClient
3
3
from labelbox .schema .data_row_metadata import DataRowMetadataKind
4
- from labelbase .metadata import sync_metadata_fields , get_metadata_schema_to_name_key
4
+ from labelbase .downloader import export_and_flatten_labels
5
5
from google .cloud import bigquery
6
6
from google .oauth2 import service_account
7
+ from uuid import uuid4
8
+ import pandas as pd
9
+ from datetime import datetime
7
10
8
11
12
+ # BigQuery limits special characters that can be used in column names and they have to be unicode
13
+ DIVIDER_MAPPINGS = {'&' : '\u0026 ' , '%' : '\u0025 ' , '>' : '\u003E ' , '#' : '\u0023 ' , '|' : '\u007c ' }
14
+
9
15
class Client :
10
16
""" A LabelBigQuery Client, containing a Labelbox Client and BigQuery Client Object
11
17
Args:
@@ -36,9 +42,18 @@ def __init__(
36
42
lb_app_url = "https://app.labelbox.com" ):
37
43
38
44
self .lb_client = labelboxClient (lb_api_key , endpoint = lb_endpoint , enable_experimental = lb_enable_experimental , app_url = lb_app_url )
39
- bq_creds = service_account .Credentials .from_service_account_file (google_key ) if google_key else None
40
- self .bq_client = bigquery .Client (project = google_project_name , credentials = bq_creds )
41
-
45
+ self .bq_creds = service_account .Credentials .from_service_account_file (google_key ) if google_key else None
46
+ self .bq_client = bigquery .Client (project = google_project_name , credentials = self .bq_creds )
47
+ self .google_project_name = google_project_name
48
+
49
+ def _validate_divider (self , divider ):
50
+ unicode_divider = ''
51
+ for char in divider :
52
+ if char not in DIVIDER_MAPPINGS :
53
+ raise ValueError (f"Restricted character(s) found in divider - { char } . The allowed characters are { [key for key in DIVIDER_MAPPINGS .keys ()]} " )
54
+ unicode_divider += DIVIDER_MAPPINGS [char ]
55
+ return unicode_divider
56
+
42
57
def _sync_metadata_fields (self , bq_table_id , metadata_index = {}):
43
58
""" Ensures Labelbox's Metadata Ontology has all necessary metadata fields given a metadata_index
44
59
Args:
@@ -174,8 +189,91 @@ def __check_global_keys(client, global_keys):
174
189
else :
175
190
upload_results .extend (task .result )
176
191
return upload_results
192
+
193
+ def export_to_BigQuery (self , project , bq_dataset_id :str , bq_table_name :str , create_table :bool = False ,
194
+ include_metadata :bool = False , include_performance :bool = False , include_agreement :bool = False ,
195
+ include_label_details :bool = False , verbose :bool = False , mask_method :str = "png" , divider = "|||" ,
196
+ export_filters :dict = None ):
197
+
198
+ divider = self ._validate_divider (divider )
199
+ flattened_labels_dict = export_and_flatten_labels (
200
+ client = self .lb_client , project = project , include_metadata = include_metadata ,
201
+ include_performance = include_performance , include_agreement = include_agreement ,
202
+ include_label_details = include_label_details , mask_method = mask_method , verbose = verbose , divider = divider ,
203
+ export_filters = export_filters
204
+ )
205
+ if len (flattened_labels_dict ) == 0 :
206
+ if verbose :
207
+ print ("No labels were found in the project export" )
208
+ return
209
+
210
+ #Make sure all
211
+ flattened_labels_dict = [{key : str (val ) for key , val in dict .items ()} for dict in flattened_labels_dict ]
212
+
213
+ table = pd .DataFrame .from_dict (flattened_labels_dict )
214
+ label_ids = table ['label_id' ].to_numpy ()
215
+ labels_str = ""
216
+ for label_id in label_ids :
217
+ labels_str += "'" + label_id + "',"
218
+ labels_str = labels_str [:- 1 ]
219
+ columns = table .columns .values .tolist ()
220
+ table_schema = [bigquery .SchemaField (col , "STRING" ) for col in columns ]
221
+ bq_table_name = bq_table_name .replace ("-" ,"_" ) # BigQuery tables shouldn't have "-" in them, as this causes errors when performing SQL updates
222
+
223
+ if create_table :
224
+ bq_table = self .bq_client .create_table (bigquery .Table (f"{ self .google_project_name } .{ bq_dataset_id } .{ bq_table_name } " , schema = table_schema ))
225
+ if verbose :
226
+ print (f'Created BigQuery Table with ID { bq_table .table_id } ' )
227
+ labels_to_insert = flattened_labels_dict
228
+ else :
229
+ bq_table = self .bq_client .get_table (bigquery .Table (f"{ self .google_project_name } .{ bq_dataset_id } .{ bq_table_name } " ))
230
+ query = """
231
+ SELECT updated_at, label_id
232
+ FROM {0}
233
+ WHERE label_id in ({1})
234
+ """
235
+ query = query .format (f"{ self .google_project_name } .{ bq_dataset_id } .{ bq_table_name } " , labels_str )
236
+ query_job = self .bq_client .query (query )
237
+ rows = list (query_job .result ())
238
+ labels_to_update = []
239
+ labels_to_insert = []
240
+ for label in flattened_labels_dict :
241
+ label_in_table = False
242
+ for row in rows :
243
+ if label ['label_id' ] == row [1 ]:
244
+ label_in_table = True
245
+ row_time = datetime .strptime (row [0 ], "%Y-%m-%dT%H:%M:%S.%f%z" )
246
+ label_time = datetime .strptime (label ["updated_at" ], "%Y-%m-%dT%H:%M:%S.%f%z" )
247
+ if label_time > row_time :
248
+ labels_to_update .append (label )
249
+ if not label_in_table :
250
+ labels_to_insert .append (label )
251
+ if len (labels_to_update ) > 0 :
252
+ job_config = bigquery .LoadJobConfig (
253
+ schema = table_schema ,
254
+ write_disposition = "WRITE_TRUNCATE" ,
255
+ )
256
+ job = self .bq_client .load_table_from_json (
257
+ flattened_labels_dict , f"{ self .google_project_name } .{ bq_dataset_id } .{ bq_table_name } " , job_config = job_config
258
+ )
259
+ errors = job .result ().errors
260
+ if not errors and verbose :
261
+ print (f'Successfully updated table. { len (labels_to_update )} rows were updated and { len (labels_to_insert )} new rows were inserted' )
262
+ elif verbose :
263
+ print (errors )
264
+ return errors
265
+ if verbose :
266
+ print (f"inserting { len (labels_to_insert )} data rows to table" )
267
+ errors = self .bq_client .insert_rows_json (bq_table , labels_to_insert )
268
+ if not errors and verbose :
269
+ print (f'Insert job successful' )
270
+ elif verbose :
271
+ print (f"There are errors present:\n { errors } " )
272
+ return errors
177
273
178
- def create_data_rows_from_table (self , bq_table_id , lb_dataset , row_data_col , global_key_col = None , external_id_col = None , metadata_index = {}, attachment_index = {}, skip_duplicates = False ):
274
+ def create_data_rows_from_table (
275
+ self , bq_table_id :str = "" , lb_dataset :labelbox .schema .dataset .Dataset = None , row_data_col :str = "" , global_key_col :str = None ,
276
+ external_id_col :str = None , metadata_index :dict = {}, attachment_index :dict = {}, skip_duplicates :bool = False , divider :str = "|||" ):
179
277
""" Creates Labelbox data rows given a BigQuery table and a Labelbox Dataset
180
278
Args:
181
279
bq_table_id : Required (str) - BigQuery Table ID structured in the following format: "google_project_name.dataset_name.table_name"
@@ -186,9 +284,12 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
186
284
metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} - metadata_type must be one of "enum", "string", "datetime" or "number"
187
285
attachment_index : Optional (dict) - Dictionary where {key=column_name : value=attachment_type} - attachment_type must be one of "IMAGE", "VIDEO", "TEXT", "HTML"
188
286
skip_duplicates : Optional (bool) - If True, will skip duplicate global_keys, otherwise will generate a unique global_key with a suffix "_1", "_2" and so on
287
+ divider : Optional (str) - String delimiter for schema name keys and suffix added to duplocate global keys
189
288
Returns:
190
289
List of errors from data row upload - if successful, is an empty list
191
290
"""
291
+
292
+ divider = self ._validate_divider (divider )
192
293
# Sync metadata index keys with metadata ontology
193
294
check = self ._sync_metadata_fields (bq_table_id , metadata_index )
194
295
if not check :
@@ -240,9 +341,9 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
240
341
query_lookup [mdf ] = index_value
241
342
index_value += 1
242
343
if attachment_index :
344
+ attachment_whitelist = ["IMAGE" , "VIDEO" , "RAW_TEXT" , "HTML" , "TEXT_URL" ]
243
345
for attachment_field_name in attachment_index :
244
346
atf = attachment_field_name .replace (" " , "_" )
245
- attachment_whitelist = ["IMAGE" , "VIDEO" , "RAW_TEXT" , "HTML" , "TEXT_URL" ]
246
347
if attachment_index [attachment_field_name ] not in attachment_whitelist :
247
348
print (f'Error: Invalid value for attachment_index key { attachment_field_name } : { attachment_index [attachment_field_name ]} \n must be one of { attachment_whitelist } ' )
248
349
return None
@@ -259,10 +360,15 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
259
360
# Iterate over your query payload to construct a list of data row dictionaries in Labelbox format
260
361
global_key_to_upload_dict = {}
261
362
for row in query_job :
363
+ if len (row [query_lookup [row_data_col ]]) <= 200 :
364
+ global_key = row [query_lookup [row_data_col ]]
365
+ else :
366
+ print ("Global key too long (>200 characters). Replacing with randomly generated global key." )
367
+ global_key = str (uuid4 ())
262
368
data_row_upload_dict = {
263
369
"row_data" : row [query_lookup [row_data_col ]],
264
370
"metadata_fields" : [{"schema_id" :metadata_name_key_to_schema ['lb_integration_source' ],"value" :"BigQuery" }],
265
- "global_key" : str (row [ query_lookup [ global_key_col ]] )
371
+ "global_key" : str (global_key )
266
372
}
267
373
if external_id_col :
268
374
data_row_upload_dict ['external_id' ] = row [query_lookup [external_id_col ]]
@@ -283,9 +389,11 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
283
389
data_row_upload_dict ['attachments' ] = [{"type" : attachment_index [attachment_field_name ], "value" : row [query_lookup [attachment_field_name ]]} for attachment_field_name in attachment_index ]
284
390
global_key_to_upload_dict [row [query_lookup [global_key_col ]]] = data_row_upload_dict
285
391
# Batch upload your list of data row dictionaries in Labelbox format
286
- if type (lb_dataset ) == str ) :
392
+ if type (lb_dataset ) == str :
287
393
lb_dataset = self .lb_client .get_dataset (lb_dataset )
288
394
upload_results = self .__batch_create_data_rows (client = self .lb_client , dataset = lb_dataset , global_key_to_upload_dict = global_key_to_upload_dict )
395
+
396
+
289
397
print (f'Success' )
290
398
return upload_results
291
399
@@ -340,6 +448,7 @@ def create_table_from_dataset(self, bq_dataset_id, bq_table_name, lb_dataset, me
340
448
if metadata_field_name in field_to_value .keys ():
341
449
row_dict [mdf ] = field_to_value [metadata_field_name ]
342
450
rows_to_insert .append (row_dict )
451
+ print (len (rows_to_insert ))
343
452
errors = self .bq_client .insert_rows_json (bq_table , rows_to_insert )
344
453
if not errors :
345
454
print (f'Success\n Created BigQuery Table with ID { bq_table .table_id } ' )
@@ -442,4 +551,4 @@ def upsert_labelbox_metadata(self, bq_table_id, global_key_col, global_keys_list
442
551
field .value = metadata_name_key_to_schema [name_key ] if name_key in metadata_name_key_to_schema .keys () else table_value
443
552
upload_metadata .append (labelbox .schema .data_row_metadata .DataRowMetadata (data_row_id = drid , fields = new_metadata ))
444
553
results = lb_mdo .bulk_upsert (upload_metadata )
445
- return results
554
+ return results
0 commit comments