@@ -203,17 +203,19 @@ def create_table(self,
203
203
TableInput = table_input )
204
204
205
205
def add_partitions (self , database , table , partition_paths , file_format ,
206
- extra_args ):
206
+ compression , extra_args ):
207
207
if not partition_paths :
208
208
return None
209
209
partitions = list ()
210
210
for partition in partition_paths :
211
211
if file_format == "parquet" :
212
212
partition_def = Glue .parquet_partition_definition (
213
- partition = partition )
213
+ partition = partition , compression = compression )
214
214
elif file_format == "csv" :
215
215
partition_def = Glue .csv_partition_definition (
216
- partition = partition , extra_args = extra_args )
216
+ partition = partition ,
217
+ compression = compression ,
218
+ extra_args = extra_args )
217
219
else :
218
220
raise UnsupportedFileFormat (file_format )
219
221
partitions .append (partition_def )
@@ -225,8 +227,12 @@ def add_partitions(self, database, table, partition_paths, file_format,
225
227
DatabaseName = database ,
226
228
TableName = table ,
227
229
PartitionInputList = page )
228
- if len (res ["Errors" ]) > 0 :
229
- raise ApiError (f"{ res ['Errors' ][0 ]} " )
230
+ for error in res ["Errors" ]:
231
+ if "ErrorDetail" in error :
232
+ if "ErrorCode" in error ["ErrorDetail" ]:
233
+ if error ["ErrorDetail" ][
234
+ "ErrorCode" ] != "AlreadyExistsException" :
235
+ raise ApiError (f"{ error } " )
230
236
231
237
def get_connection_details (self , name ):
232
238
return self ._client_glue .get_connection (
@@ -355,7 +361,7 @@ def csv_table_definition(table, partition_cols_schema, schema, path,
355
361
"InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
356
362
"OutputFormat" :
357
363
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" ,
358
- "Compressed" : True ,
364
+ "Compressed" : compressed ,
359
365
"NumberOfBuckets" : - 1 ,
360
366
"SerdeInfo" : {
361
367
"Parameters" : param ,
@@ -375,7 +381,8 @@ def csv_table_definition(table, partition_cols_schema, schema, path,
375
381
}
376
382
377
383
@staticmethod
378
- def csv_partition_definition (partition , extra_args ):
384
+ def csv_partition_definition (partition , compression , extra_args ):
385
+ compressed = False if compression is None else True
379
386
sep = extra_args ["sep" ] if "sep" in extra_args else ","
380
387
serde = extra_args .get ("serde" )
381
388
if serde == "OpenCSVSerDe" :
@@ -394,6 +401,7 @@ def csv_partition_definition(partition, extra_args):
394
401
"StorageDescriptor" : {
395
402
"InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
396
403
"Location" : partition [0 ],
404
+ "Compressed" : compressed ,
397
405
"SerdeInfo" : {
398
406
"Parameters" : param ,
399
407
"SerializationLibrary" : serde_fullname ,
@@ -454,11 +462,13 @@ def parquet_table_definition(table, partition_cols_schema, schema, path,
454
462
}
455
463
456
464
@staticmethod
457
- def parquet_partition_definition (partition ):
465
+ def parquet_partition_definition (partition , compression ):
466
+ compressed = False if compression is None else True
458
467
return {
459
468
"StorageDescriptor" : {
460
469
"InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
461
470
"Location" : partition [0 ],
471
+ "Compressed" : compressed ,
462
472
"SerdeInfo" : {
463
473
"Parameters" : {
464
474
"serialization.format" : "1"
0 commit comments