@@ -69,13 +69,111 @@ def create_data_row(self, **kwargs):
69
69
row_data = kwargs [DataRow .row_data .name ]
70
70
if os .path .exists (row_data ):
71
71
kwargs [DataRow .row_data .name ] = self .client .upload_file (row_data )
72
-
73
72
kwargs [DataRow .dataset .name ] = self
74
-
75
73
return self .client ._create (DataRow , kwargs )
76
74
75
+ def create_data_rows_sync (self , items ):
76
+ """ Synchronously bulk upload data rows.
77
+
78
+ Use this instead of `Dataset.create_data_rows` for smaller batches of data rows that need to be uploaded quickly.
79
+ Cannot use this for uploads containing more than 1000 data rows.
80
+ Each data row is also limited to 5 attachments.
81
+
82
+ Args:
83
+ items (iterable of (dict or str)):
84
+ See the docstring for `Dataset._create_descriptor_file` for more information.
85
+ Returns:
86
+ None. If the function doesn't raise an exception then the import was successful.
87
+
88
+ Raises:
89
+ InvalidQueryError: If the `items` parameter does not conform to
90
+ the specification in Dataset._create_descriptor_file or if the server did not accept the
91
+ DataRow creation request (unknown reason).
92
+ InvalidAttributeError: If there are fields in `items` not valid for
93
+ a DataRow.
94
+ ValueError: When the upload parameters are invalid
95
+ """
96
+ max_data_rows_supported = 1000
97
+ max_attachments_per_data_row = 5
98
+ if len (items ) > max_data_rows_supported :
99
+ raise ValueError (
100
+ f"Dataset.create_data_rows_sync() supports a max of { max_data_rows_supported } data rows."
101
+ " For larger imports use the async function Dataset.create_data_rows()"
102
+ )
103
+ descriptor_url = self ._create_descriptor_file (
104
+ items , max_attachments_per_data_row = max_attachments_per_data_row )
105
+ dataset_param = "datasetId"
106
+ url_param = "jsonUrl"
107
+ query_str = """mutation AppendRowsToDatasetSyncPyApi($%s: ID!, $%s: String!){
108
+ appendRowsToDatasetSync(data:{datasetId: $%s, jsonFileUrl: $%s}
109
+ ){dataset{id}}} """ % (dataset_param , url_param , dataset_param ,
110
+ url_param )
111
+ self .client .execute (query_str , {
112
+ dataset_param : self .uid ,
113
+ url_param : descriptor_url
114
+ })
115
+
77
116
def create_data_rows (self , items ):
78
- """ Creates multiple DataRow objects based on the given `items`.
117
+ """ Asynchronously bulk upload data rows
118
+
119
+ Use this instead of `Dataset.create_data_rows_sync` uploads for batches that contain more than 100 data rows.
120
+
121
+ Args:
122
+ items (iterable of (dict or str)): See the docstring for `Dataset._create_descriptor_file` for more information
123
+
124
+ Returns:
125
+ Task representing the data import on the server side. The Task
126
+ can be used for inspecting task progress and waiting until it's done.
127
+
128
+ Raises:
129
+ InvalidQueryError: If the `items` parameter does not conform to
130
+ the specification above or if the server did not accept the
131
+ DataRow creation request (unknown reason).
132
+ ResourceNotFoundError: If unable to retrieve the Task for the
133
+ import process. This could imply that the import failed.
134
+ InvalidAttributeError: If there are fields in `items` not valid for
135
+ a DataRow.
136
+ ValueError: When the upload parameters are invalid
137
+ """
138
+ descriptor_url = self ._create_descriptor_file (items )
139
+ # Create data source
140
+ dataset_param = "datasetId"
141
+ url_param = "jsonUrl"
142
+ query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
143
+ appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
144
+ ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
145
+ dataset_param , url_param )
146
+
147
+ res = self .client .execute (query_str , {
148
+ dataset_param : self .uid ,
149
+ url_param : descriptor_url
150
+ })
151
+ res = res ["appendRowsToDataset" ]
152
+ if not res ["accepted" ]:
153
+ msg = res ['errorMessage' ]
154
+ raise InvalidQueryError (
155
+ f"Server did not accept DataRow creation request. { msg } " )
156
+
157
+ # Fetch and return the task.
158
+ task_id = res ["taskId" ]
159
+ user = self .client .get_user ()
160
+ task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
161
+ # Cache user in a private variable as the relationship can't be
162
+ # resolved due to server-side limitations (see Task.created_by)
163
+ # for more info.
164
+ if len (task ) != 1 :
165
+ raise ResourceNotFoundError (Entity .Task , task_id )
166
+ task = task [0 ]
167
+ task ._user = user
168
+ return task
169
+
170
+ def _create_descriptor_file (self , items , max_attachments_per_data_row = None ):
171
+ """
172
+ This function is shared by both `Dataset.create_data_rows` and `Dataset.create_data_rows_sync`
173
+ to prepare the input file. The user defined input is validated, processed, and json stringified.
174
+ Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed to
175
+
176
+
79
177
80
178
Each element in `items` can be either a `str` or a `dict`. If
81
179
it is a `str`, then it is interpreted as a local file path. The file
@@ -102,22 +200,23 @@ def create_data_rows(self, items):
102
200
103
201
Args:
104
202
items (iterable of (dict or str)): See above for details.
203
+ max_attachments_per_data_row (Optional[int]): Param used during attachment validation to determine
204
+ if the user has provided too many attachments.
105
205
106
206
Returns:
107
- Task representing the data import on the server side. The Task
108
- can be used for inspecting task progress and waiting until it's done.
207
+ uri (string): A reference to the uploaded json data.
109
208
110
209
Raises:
111
210
InvalidQueryError: If the `items` parameter does not conform to
112
211
the specification above or if the server did not accept the
113
212
DataRow creation request (unknown reason).
114
- ResourceNotFoundError: If unable to retrieve the Task for the
115
- import process. This could imply that the import failed.
116
213
InvalidAttributeError: If there are fields in `items` not valid for
117
214
a DataRow.
215
+ ValueError: When the upload parameters are invalid
118
216
"""
119
217
file_upload_thread_count = 20
120
218
DataRow = Entity .DataRow
219
+ AssetAttachment = Entity .AssetAttachment
121
220
122
221
def upload_if_necessary (item ):
123
222
row_data = item ['row_data' ]
@@ -134,9 +233,14 @@ def validate_attachments(item):
134
233
attachments = item .get ('attachments' )
135
234
if attachments :
136
235
if isinstance (attachments , list ):
236
+ if max_attachments_per_data_row and len (
237
+ attachments ) > max_attachments_per_data_row :
238
+ raise ValueError (
239
+ f"Max attachments number of supported attachments per data row is { max_attachments_per_data_row } ."
240
+ f" Found { len (attachments )} . Condense multiple attachments into one with the HTML attachment type if necessary."
241
+ )
137
242
for attachment in attachments :
138
- Entity .AssetAttachment .validate_attachment_json (
139
- attachment )
243
+ AssetAttachment .validate_attachment_json (attachment )
140
244
else :
141
245
raise ValueError (
142
246
f"Attachments must be a list. Found { type (attachments )} "
@@ -198,40 +302,9 @@ def convert_item(item):
198
302
with ThreadPoolExecutor (file_upload_thread_count ) as executor :
199
303
futures = [executor .submit (convert_item , item ) for item in items ]
200
304
items = [future .result () for future in as_completed (futures )]
201
-
202
305
# Prepare and upload the desciptor file
203
306
data = json .dumps (items )
204
- descriptor_url = self .client .upload_data (data )
205
- # Create data source
206
- dataset_param = "datasetId"
207
- url_param = "jsonUrl"
208
- query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
209
- appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
210
- ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
211
- dataset_param , url_param )
212
-
213
- res = self .client .execute (query_str , {
214
- dataset_param : self .uid ,
215
- url_param : descriptor_url
216
- })
217
- res = res ["appendRowsToDataset" ]
218
- if not res ["accepted" ]:
219
- msg = res ['errorMessage' ]
220
- raise InvalidQueryError (
221
- f"Server did not accept DataRow creation request. { msg } " )
222
-
223
- # Fetch and return the task.
224
- task_id = res ["taskId" ]
225
- user = self .client .get_user ()
226
- task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
227
- # Cache user in a private variable as the relationship can't be
228
- # resolved due to server-side limitations (see Task.created_by)
229
- # for more info.
230
- if len (task ) != 1 :
231
- raise ResourceNotFoundError (Entity .Task , task_id )
232
- task = task [0 ]
233
- task ._user = user
234
- return task
307
+ return self .client .upload_data (data )
235
308
236
309
def data_rows_for_external_id (self , external_id , limit = 10 ):
237
310
""" Convenience method for getting a single `DataRow` belonging to this
0 commit comments