@@ -66,38 +66,25 @@ def create_data_rows_from_table(
66
66
)
67
67
68
68
if type (table ) == bool :
69
- return None
69
+ return None
70
70
71
- global_key_col = global_key_col if global_key_col else row_data_col
72
- external_id_col = external_id_col if external_id_col else global_key_col
73
-
74
- metadata_schema_to_name_key = self .base_client .get_metadata_schema_to_name_key (
75
- lb_mdo = False ,
76
- divider = divider ,
77
- invert = False
78
- )
79
- metadata_name_key_to_schema = self .base_client .get_metadata_schema_to_name_key (
80
- lb_mdo = False ,
81
- divider = divider ,
82
- invert = True
71
+ global_key_to_upload_dict = connector .create_upload_dict (
72
+ table = table ,
73
+ local_files = local_files ,
74
+ lb_client = self .lb_client ,
75
+ row = row ,
76
+ row_data_col = row_data_col ,
77
+ global_key_col = global_key_col ,
78
+ external_id_col = external_id_col ,
79
+ metadata_index = metadata_index ,
80
+ divider = divider
83
81
)
84
-
85
- global_key_to_upload_dict = {}
86
- futures = []
87
- with ThreadPoolExecutor () as exc :
88
- for index , row in table .iterrows ():
89
- futures .append (
90
- exc .submit (
91
- connector .create_data_rows , local_files , self .lb_client , row , row_data_col ,
92
- global_key_col , external_id_col , metadata_index , metadata_name_key_to_schema ,
93
- metadata_schema_to_name_key , divider
94
- )
95
- )
96
- for f in as_completed (futures ):
97
- res = f .result ()
98
- global_key_to_upload_dict [str (res ["global_key" ])] = res
99
-
100
- upload_results = self .base_client .batch_create_data_rows (lb_dataset , global_key_to_upload_dict , skip_duplicates , divider )
82
+
83
+ upload_results = self .base_client .batch_create_data_rows (
84
+ dataset = lb_dataset ,
85
+ global_key_to_upload_dict = global_key_to_upload_dict ,
86
+ skip_duplicates = skip_duplicates ,
87
+ divider = divider )
101
88
102
89
return upload_results
103
90
0 commit comments