|  | 
| 61 | 61 | import com.google.common.base.Strings; | 
| 62 | 62 | import com.google.common.collect.Lists; | 
| 63 | 63 | import io.cdap.cdap.api.data.format.StructuredRecord; | 
|  | 64 | +import io.cdap.cdap.api.exception.ErrorCategory; | 
|  | 65 | +import io.cdap.cdap.api.exception.ErrorType; | 
|  | 66 | +import io.cdap.cdap.api.exception.ErrorUtils; | 
|  | 67 | +import io.cdap.cdap.etl.api.exception.ErrorPhase; | 
| 64 | 68 | import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings; | 
| 65 | 69 | import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; | 
| 66 | 70 | import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; | 
|  | 
| 103 | 107 |  */ | 
| 104 | 108 | public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> { | 
| 105 | 109 |   private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class); | 
|  | 110 | +  private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | 
| 106 | 111 | 
 | 
| 107 | 112 |   @Override | 
| 108 | 113 |   public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) | 
| @@ -165,19 +170,30 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, | 
| 165 | 170 |     // Error if the output path already exists. | 
| 166 | 171 |     FileSystem outputFileSystem = outputPath.getFileSystem(conf); | 
| 167 | 172 |     if (outputFileSystem.exists(outputPath)) { | 
| 168 |  | -      throw new IOException("The output path '" + outputPath + "' already exists."); | 
|  | 173 | +      String errorMessage = String.format("The output path '%s' already exists.", outputPath); | 
|  | 174 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 175 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 176 | +        String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, | 
|  | 177 | +        new IOException(errorMessage)); | 
| 169 | 178 |     } | 
| 170 | 179 | 
 | 
| 171 | 180 |     // Error if compression is set as there's mixed support in BigQuery. | 
| 172 | 181 |     if (FileOutputFormat.getCompressOutput(job)) { | 
| 173 |  | -      throw new IOException("Compression isn't supported for this OutputFormat."); | 
|  | 182 | +      String errorMessage = "Compression isn't supported for this OutputFormat."; | 
|  | 183 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 184 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 185 | +        String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, | 
|  | 186 | +        new IOException(errorMessage)); | 
| 174 | 187 |     } | 
| 175 | 188 | 
 | 
| 176 | 189 |     // Error if unable to create a BigQuery helper. | 
| 177 | 190 |     try { | 
| 178 | 191 |       new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf); | 
| 179 | 192 |     } catch (GeneralSecurityException gse) { | 
| 180 |  | -      throw new IOException("Failed to create BigQuery client", gse); | 
|  | 193 | +      String errorMessage = "Failed to create BigQuery client"; | 
|  | 194 | +      throw ErrorUtils.getProgramFailureException( | 
|  | 195 | +        new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat, | 
|  | 196 | +          ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse); | 
| 181 | 197 |     } | 
| 182 | 198 | 
 | 
| 183 | 199 |     // Let delegate process its checks. | 
| @@ -208,7 +224,10 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput | 
| 208 | 224 |         BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES); | 
| 209 | 225 |         this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration()); | 
| 210 | 226 |       } catch (GeneralSecurityException e) { | 
| 211 |  | -        throw new IOException("Failed to create Bigquery client.", e); | 
|  | 227 | +        String errorMessage = "Failed to create BigQuery client"; | 
|  | 228 | +        throw ErrorUtils.getProgramFailureException( | 
|  | 229 | +          new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 230 | +          String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); | 
| 212 | 231 |       } | 
| 213 | 232 |     } | 
| 214 | 233 | 
 | 
| @@ -266,7 +285,10 @@ public void commitJob(JobContext jobContext) throws IOException { | 
| 266 | 285 |                       writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField, | 
| 267 | 286 |                       requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf); | 
| 268 | 287 |       } catch (Exception e) { | 
| 269 |  | -        throw new IOException("Failed to import GCS into BigQuery. ", e); | 
|  | 288 | +        String errorMessage = "Failed to import GCS into BigQuery."; | 
|  | 289 | +        throw ErrorUtils.getProgramFailureException( | 
|  | 290 | +          new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 291 | +          String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); | 
| 270 | 292 |       } | 
| 271 | 293 | 
 | 
| 272 | 294 |       cleanup(jobContext); | 
| @@ -566,26 +588,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p | 
| 566 | 588 |             int numOfErrors; | 
| 567 | 589 |             String errorMessage; | 
| 568 | 590 |             if (errors == null || errors.isEmpty()) { | 
| 569 |  | -              errorMessage = pollJob.getStatus().getErrorResult().getMessage(); | 
|  | 591 | +              errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(), | 
|  | 592 | +                pollJob.getStatus().getErrorResult().getMessage()); | 
| 570 | 593 |               numOfErrors = 1; | 
| 571 | 594 |             } else { | 
| 572 |  | -              errorMessage = errors.get(errors.size() - 1).getMessage(); | 
|  | 595 | +              errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(), | 
|  | 596 | +                errors.get(errors.size() - 1).getMessage()); | 
| 573 | 597 |               numOfErrors = errors.size(); | 
| 574 | 598 |             } | 
| 575 | 599 |             // Only add first error message in the exception. For other errors user should look at BigQuery job logs. | 
| 576 |  | -            throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." + | 
| 577 |  | -                                                  " There are total %s error(s) for BigQuery job %s. Please look at " + | 
| 578 |  | -                                                  "BigQuery job logs for more information.", | 
| 579 |  | -                                                errorMessage, numOfErrors, jobReference.getJobId())); | 
|  | 600 | +            String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." + | 
|  | 601 | +                " There are total %s error(s) for BigQuery job %s. Please look at " + | 
|  | 602 | +                "BigQuery job logs for more information.", | 
|  | 603 | +              errorMessage, numOfErrors, jobReference.getJobId()); | 
|  | 604 | +            throw ErrorUtils.getProgramFailureException( | 
|  | 605 | +              new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException, | 
|  | 606 | +              String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true, | 
|  | 607 | +              new IOException(errorMessageException)); | 
|  | 608 | + | 
| 580 | 609 |           } | 
| 581 | 610 |         } else { | 
| 582 | 611 |           long millisToWait = pollBackOff.nextBackOffMillis(); | 
| 583 | 612 |           if (millisToWait == BackOff.STOP) { | 
| 584 |  | -            throw new IOException( | 
| 585 |  | -              String.format( | 
| 586 |  | -                "Job %s failed to complete after %s millis.", | 
| 587 |  | -                jobReference.getJobId(), | 
| 588 |  | -                elapsedTime)); | 
|  | 613 | +            String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId() | 
|  | 614 | +              , elapsedTime); | 
|  | 615 | +            throw ErrorUtils.getProgramFailureException( | 
|  | 616 | +              new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 617 | +              String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true, | 
|  | 618 | +              new IOException(errorMessage)); | 
| 589 | 619 |           } | 
| 590 | 620 |           // Pause execution for the configured duration before polling job status again. | 
| 591 | 621 |           Thread.sleep(millisToWait); | 
| @@ -621,8 +651,11 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I | 
| 621 | 651 |           TableSchema tableSchema = createTableSchemaFromFields(fieldsJson); | 
| 622 | 652 |           return Optional.of(tableSchema); | 
| 623 | 653 |         } catch (IOException e) { | 
| 624 |  | -          throw new IOException( | 
| 625 |  | -            "Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e); | 
|  | 654 | +          String errorMessage = String.format("Unable to parse key '%s'.", | 
|  | 655 | +            BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey()); | 
|  | 656 | +          throw ErrorUtils.getProgramFailureException( | 
|  | 657 | +            new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | 
|  | 658 | +            String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e); | 
| 626 | 659 |         } | 
| 627 | 660 |       } | 
| 628 | 661 |       return Optional.empty(); | 
|  | 
0 commit comments