Skip to content
This repository was archived by the owner on Jul 29, 2024. It is now read-only.

Commit 7226875

Browse files
Update connector.py
1 parent bf30c39 commit 7226875

File tree

1 file changed

+25
-1
lines changed

1 file changed

+25
-1
lines changed

labelpandas/connector.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ def create_data_rows(
2525
Two items - the global_key, and a dictionary with "row_data", "global_key", "external_id" and "metadata_fields" keys
2626
"""
2727
row_data = lb_client.upload_file(str(row[row_data_col])) if local_files else str(row[row_data_col])
28-
data_row_dict = {"row_data" : row_data, "global_key" : str(row[global_key_col]), "external_id" : row[external_id_col], "metadata_fields" : [{"schema_id" : metadata_name_key_to_schema['lb_integration_source', "value" : "Pandas"]}]}
28+
data_row_dict = {
29+
"row_data" : row_data, "global_key" : str(row[global_key_col]), "external_id" : row[external_id_col],
30+
"metadata_fields" : [{"schema_id" : metadata_name_key_to_schema['lb_integration_source', "value" : "Pandas"]}]
31+
}
2932
if metadata_index:
3033
for metadata_field_name in metadata_index.keys():
3134
name_key = f"{metadata_field_name}{divider}{row[metadata_field_name]}"
@@ -42,3 +45,24 @@ def get_unique_values_function(table, column_name:str):
4245
def add_column_function(table, column_name:str):
4346
table[column_name] = ""
4447
return table
48+
49+
def create_upload_dict(table, local_files, lb_client, row, row_data_col, global_key_col, external_id_col, metadata_index, divider):
50+
global_key_col = global_key_col if global_key_col else row_data_col
51+
external_id_col = external_id_col if external_id_col else global_key_col
52+
metadata_schema_to_name_key = get_metadata_schema_to_name_key(lb_mdo=False, divider=divider, invert=False)
53+
metadata_name_key_to_schema = get_metadata_schema_to_name_key(lb_mdo=False, divider=divider, invert=True)
54+
global_key_to_upload_dict = {}
55+
futures = []
56+
with ThreadPoolExecutor() as exc:
57+
for index, row in table.iterrows():
58+
futures.append(
59+
exc.submit(
60+
connector.create_data_rows, local_files, self.lb_client, row, row_data_col,
61+
global_key_col, external_id_col, metadata_index, metadata_name_key_to_schema,
62+
metadata_schema_to_name_key, divider
63+
)
64+
)
65+
for f in as_completed(futures):
66+
res = f.result()
67+
global_key_to_upload_dict[str(res["global_key"])] = res
68+
return global_key_to_upload_dict

0 commit comments

Comments
 (0)