- 
                Notifications
    You must be signed in to change notification settings 
- Fork 86
[PLUGIN-1818] ErrorDetailsProvider - BigQuery Source/Sink plugin #1458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
        
          
                src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      3c06ecb    to
    85d4858      
    Compare
  
    85d4858    to
    abf3b12      
    Compare
  
    abf3b12    to
    78192a6      
    Compare
  
    78192a6    to
    66e9507      
    Compare
  
    | * @param e The IOException to get the error information from. | ||
| * @return A ProgramFailureException with the given error information. | ||
| */ | ||
| private ProgramFailureException getProgramFailureException(IOException e, ErrorContext errorContext) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is IOException is a user error? Even the error messages are not at all actionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to convert only those exceptions to ProgramFailureException where we can get enough information from the exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In underlying places, if we are throwing IOException we should change it to throw ProgramFailureException rather than assuming IOException will always be user error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are IOException being thrown from BigQueryOutputFormat in code eg.
google-cloud/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java
Line 168 in 601f62b
| throw new IOException("The output path '" + outputPath + "' already exists."); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In underlying places, if we are throwing
IOExceptionwe should change it to throwProgramFailureExceptionrather than assumingIOExceptionwill always beusererror.
Ok got it, wll do these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed underlying IOException to  ProgramFailureException.
| * @param e The IllegalStateException to get the error information from. | ||
| * @return A ProgramFailureException with the given error information. | ||
| */ | ||
| private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, how is IllegalStateException a user error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An IllegalStateException usually represents a programming or configuration error in the code. Error Type System might be better fit as it suggests something went wrong with the program's state or lifecycle that's under the application's control rather than being an external client issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In code there are places where we can reach IllegalStateException by incorrect user inputs, either we should change those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
google-cloud/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java
Line 192 in 601f62b
| throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " + | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this above should be changed to IllegalArgumentException right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed IllegalStateException to SYSTEM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this above should be changed to IllegalArgumentException right ?
right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated !
| */ | ||
| private ProgramFailureException getProgramFailureException(HttpResponseException e, | ||
| ErrorContext errorContext) { | ||
| ErrorContext errorContext) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed !
66e9507    to
    5c897be      
    Compare
  
    60c479d    to
    9d501d3      
    Compare
  
    | errorMessage, numOfErrors, jobReference.getJobId()); | ||
| throw ErrorUtils.getProgramFailureException( | ||
| new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException, | ||
| String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.SYSTEM, true, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think error type here should be UNKNOWN, how are we sure it is a system issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What metric can be used to better distinguish between system / unknown error type ?
Here the job failed on BigQuery "System" since the logs are pulled in from the BigQuery, so the reason will likely be known.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job can fail on BigQuery due to many reasons like permission issues as well right? So, we need to decide based on error codes & error reason cannot directly say that it is a system error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to type UNKNOWN
| , elapsedTime); | ||
| throw ErrorUtils.getProgramFailureException( | ||
| new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | ||
| String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think error type here should be UNKNOWN, how are we sure it is a system issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, UNKNOW is more appropriate here , updated !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?
        
          
                src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java
          
            Show resolved
            Hide resolved
        
      9d501d3    to
    1e8c486      
    Compare
  
    170fc3b    to
    9e79da9      
    Compare
  
    | throw new IOException(errorMessage.get(), e); | ||
| throw ErrorUtils.getProgramFailureException( | ||
| new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Bucket already exists", errorMessage.get(), | ||
| ErrorType.UNKNOWN, true, e); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can decide on error type here based on status code: see
google-cloud/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java
Line 69 in 601f62b
| ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); | 
Only 409 means bucket already exists for others it means the request failed to create bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated !
try {
  GCPUtils.createBucket(storage, bucket, location, cmekKeyName);
} catch (StorageException e) {
  if (e.getCode() != 409) {
    // A conflict means the bucket already exists
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}| throw new IOException(errorMessage.get(), e); | ||
| throw ErrorUtils.getProgramFailureException( | ||
| new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Dataset already exists", errorMessage.get(), | ||
| ErrorType.UNKNOWN, true, e); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment here:
google-cloud/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java
Line 69 in 601f62b
| ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); | 
Only 409 means dataset already exists for others it means the request failed to create dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated !
try {
  bigQuery.create(builder.build());
} catch (BigQueryException e) {
  if (e.getCode() != 409) {
    // A conflict means the dataset already exists (https://cloud.google.com/bigquery/troubleshooting-errors)
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}| */ | ||
| private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { | ||
| String errorMessage = e.getMessage(); | ||
| String subCategory = "IllegalState"; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't sound like an appropriate category. Categories do not contain lower level exception name rather high level generic exception types like access issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless we can point out the exact subCategory, we can omit adding it rather than adding generic stuff in subcategories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair, removed, also removed the String subCategory = "IllegalArgument"
| collector.getOrThrowException(); | ||
| } | ||
|  | ||
|  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit remove extra empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed !
| 
 Are we sure that  | 
9e79da9    to
    efeef28      
    Compare
  
    | throw ErrorUtils.getProgramFailureException( | ||
| new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, | ||
| String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, | ||
| new IOException(errorMessage, e)); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to wrap actual exception in Ioexception here now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment in all places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed!
I have made changes in a new commit to help with the review, i will squash those before merging.
| 
 The two places that mention      for (String field : remainingBQFields) {
      if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
        collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
                             String.format("Add '%s' to the schema.", field));
      }is there somewhere i missed it ? | 
| 
 sg. | 
| String errorMessage = e.getMessage(); | ||
| String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | ||
| return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, | ||
| String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, true, e); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dependency might not be true in this case since the exception is thrown from our code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, updated !
| String errorMessage = e.getMessage(); | ||
| String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | ||
| return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, | ||
| String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly here dependency should not be true unless error is thrown from dependent services client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, updated !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of minor comments. Otherwise , PR looks good.
Please check once all the e2e tests pass.
550f350    to
    29c21ca      
    Compare
  
    | 
 This test seems to be failing on dev branch too, i tried to run this locally, but seems fine. | 
| 
 Please check with @AnkitCLI from Cloud Sufi team. | 
e4e302f    to
    1ea8bba      
    Compare
  
    | This PR will fix BQ E2E error, merge this PR after the fixes. | 
a2a7d51    to
    77a2f18      
    Compare
  
    77a2f18    to
    51f8794      
    Compare
  
    

ErrorDetailsProvider - BigQuery Source/Sink plugin
Jira : PLUGIN-1818
Description
Adding error details provider on bigquery.
CDAP Error logs
Code change
BigQueryErrorDetailsProvider.javaAbstractBigQuerySink.javaBigQueryOutputFormat.javaBigQueryRecordToJson.javaBigQuerySinkUtils.javaBigQueryAvroToStructuredTransformer.javaBigQuerySource.javaBigQuerySourceUtils.javaPartitionedBigQueryInputFormat.javaGCPErrorDetailsProvider.javaUnit Tests
BigQueryRecordToJsonTest.java