|  | 
| 61 | 61 | import com.google.common.base.Strings; | 
| 62 | 62 | import com.google.common.collect.Lists; | 
| 63 | 63 | import io.cdap.cdap.api.data.format.StructuredRecord; | 
|  | 64 | +import io.cdap.cdap.api.exception.ErrorCategory; | 
|  | 65 | +import io.cdap.cdap.api.exception.ErrorType; | 
|  | 66 | +import io.cdap.cdap.api.exception.ErrorUtils; | 
|  | 67 | +import io.cdap.cdap.etl.api.exception.ErrorPhase; | 
| 64 | 68 | import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings; | 
| 65 | 69 | import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; | 
| 66 | 70 | import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; | 
|  | 
| 103 | 107 |  */ | 
| 104 | 108 | public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> { | 
| 105 | 109 |   private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class); | 
|  | 110 | +  private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | 
| 106 | 111 | 
 | 
| 107 | 112 |   @Override | 
| 108 | 113 |   public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) | 
| @@ -165,19 +170,31 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, | 
| 165 | 170 |     // Error if the output path already exists. | 
| 166 | 171 |     FileSystem outputFileSystem = outputPath.getFileSystem(conf); | 
| 167 | 172 |     if (outputFileSystem.exists(outputPath)) { | 
| 168 |  | -      throw new IOException("The output path '" + outputPath + "' already exists."); | 
|  | 173 | +      String errorMessage = String.format("The output path '%s' already exists.", outputPath); | 
|  | 174 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 175 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 176 | +        String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, | 
|  | 177 | +        new IOException(errorMessage)); | 
| 169 | 178 |     } | 
| 170 | 179 | 
 | 
| 171 | 180 |     // Error if compression is set as there's mixed support in BigQuery. | 
| 172 | 181 |     if (FileOutputFormat.getCompressOutput(job)) { | 
| 173 |  | -      throw new IOException("Compression isn't supported for this OutputFormat."); | 
|  | 182 | +      String errorMessage = "Compression isn't supported for this OutputFormat."; | 
|  | 183 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 184 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 185 | +        String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, | 
|  | 186 | +        new IOException(errorMessage)); | 
| 174 | 187 |     } | 
| 175 | 188 | 
 | 
| 176 | 189 |     // Error if unable to create a BigQuery helper. | 
| 177 | 190 |     try { | 
| 178 | 191 |       new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf); | 
| 179 | 192 |     } catch (GeneralSecurityException gse) { | 
| 180 |  | -      throw new IOException("Failed to create BigQuery client", gse); | 
|  | 193 | +      String errorMessage = "Failed to create BigQuery client"; | 
|  | 194 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 195 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 196 | +        String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, | 
|  | 197 | +        new IOException(errorMessage, gse)); | 
| 181 | 198 |     } | 
| 182 | 199 | 
 | 
| 183 | 200 |     // Let delegate process its checks. | 
| @@ -208,7 +225,11 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput | 
| 208 | 225 |         BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES); | 
| 209 | 226 |         this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration()); | 
| 210 | 227 |       } catch (GeneralSecurityException e) { | 
| 211 |  | -        throw new IOException("Failed to create Bigquery client.", e); | 
|  | 228 | +        String errorMessage = "Failed to create BigQuery client"; | 
|  | 229 | +        throw ErrorUtils.getProgramFailureException( | 
|  | 230 | +          new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 231 | +          String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, | 
|  | 232 | +          new IOException(errorMessage, e)); | 
| 212 | 233 |       } | 
| 213 | 234 |     } | 
| 214 | 235 | 
 | 
| @@ -266,7 +287,11 @@ public void commitJob(JobContext jobContext) throws IOException { | 
| 266 | 287 |                       writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField, | 
| 267 | 288 |                       requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf); | 
| 268 | 289 |       } catch (Exception e) { | 
| 269 |  | -        throw new IOException("Failed to import GCS into BigQuery. ", e); | 
|  | 290 | +        String errorMessage = "Failed to import GCS into BigQuery."; | 
|  | 291 | +        throw ErrorUtils.getProgramFailureException( | 
|  | 292 | +          new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 293 | +          String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, | 
|  | 294 | +          new IOException(errorMessage, e)); | 
| 270 | 295 |       } | 
| 271 | 296 | 
 | 
| 272 | 297 |       cleanup(jobContext); | 
| @@ -566,26 +591,33 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p | 
| 566 | 591 |             int numOfErrors; | 
| 567 | 592 |             String errorMessage; | 
| 568 | 593 |             if (errors == null || errors.isEmpty()) { | 
| 569 |  | -              errorMessage = pollJob.getStatus().getErrorResult().getMessage(); | 
|  | 594 | +              errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(), | 
|  | 595 | +                pollJob.getStatus().getErrorResult().getMessage()); | 
| 570 | 596 |               numOfErrors = 1; | 
| 571 | 597 |             } else { | 
| 572 | 598 |               errorMessage = errors.get(errors.size() - 1).getMessage(); | 
| 573 | 599 |               numOfErrors = errors.size(); | 
| 574 | 600 |             } | 
| 575 | 601 |             // Only add first error message in the exception. For other errors user should look at BigQuery job logs. | 
| 576 |  | -            throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." + | 
| 577 |  | -                                                  " There are total %s error(s) for BigQuery job %s. Please look at " + | 
| 578 |  | -                                                  "BigQuery job logs for more information.", | 
| 579 |  | -                                                errorMessage, numOfErrors, jobReference.getJobId())); | 
|  | 602 | +            String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." + | 
|  | 603 | +                " There are total %s error(s) for BigQuery job %s. Please look at " + | 
|  | 604 | +                "BigQuery job logs for more information.", | 
|  | 605 | +              errorMessage, numOfErrors, jobReference.getJobId()); | 
|  | 606 | +            throw ErrorUtils.getProgramFailureException( | 
|  | 607 | +              new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException, | 
|  | 608 | +              String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true, | 
|  | 609 | +              new IOException(errorMessageException)); | 
|  | 610 | + | 
| 580 | 611 |           } | 
| 581 | 612 |         } else { | 
| 582 | 613 |           long millisToWait = pollBackOff.nextBackOffMillis(); | 
| 583 | 614 |           if (millisToWait == BackOff.STOP) { | 
| 584 |  | -            throw new IOException( | 
| 585 |  | -              String.format( | 
| 586 |  | -                "Job %s failed to complete after %s millis.", | 
| 587 |  | -                jobReference.getJobId(), | 
| 588 |  | -                elapsedTime)); | 
|  | 615 | +            String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId() | 
|  | 616 | +              , elapsedTime); | 
|  | 617 | +            throw ErrorUtils.getProgramFailureException( | 
|  | 618 | +              new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 619 | +              String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true, | 
|  | 620 | +              new IOException(errorMessage)); | 
| 589 | 621 |           } | 
| 590 | 622 |           // Pause execution for the configured duration before polling job status again. | 
| 591 | 623 |           Thread.sleep(millisToWait); | 
| @@ -621,8 +653,12 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I | 
| 621 | 653 |           TableSchema tableSchema = createTableSchemaFromFields(fieldsJson); | 
| 622 | 654 |           return Optional.of(tableSchema); | 
| 623 | 655 |         } catch (IOException e) { | 
| 624 |  | -          throw new IOException( | 
| 625 |  | -            "Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e); | 
|  | 656 | +          String errorMessage = String.format("Unable to parse key '%s'.", | 
|  | 657 | +            BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey()); | 
|  | 658 | +          throw ErrorUtils.getProgramFailureException( | 
|  | 659 | +            new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 660 | +            String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, | 
|  | 661 | +            new IOException(errorMessage, e)); | 
| 626 | 662 |         } | 
| 627 | 663 |       } | 
| 628 | 664 |       return Optional.empty(); | 
|  | 
0 commit comments