Skip to content

Conversation

@psainics
Copy link
Contributor

@psainics psainics commented Nov 6, 2024

ErrorDetailsProvider - BigQuery Source/Sink plugin

Jira : PLUGIN-1818

Description

Adding error details provider on bigquery.

CDAP Error logs
  • Test Case (Input invalid Json)
2024-11-13 11:19:00,929 - ERROR [Executor task launch worker for task 0.0 in stage 0.0 (TID 0):o.a.s.u.Utils@98] - Aborting task
io.cdap.cdap.api.exception.WrappedStageException: Stage 'BigQuery' encountered : io.cdap.cdap.api.exception.ProgramFailureException: Error occurred in the phase: 'Writing'. Error message: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.cdap.etl.common.ErrorDetails.handleException(ErrorDetails.java:77)
  at io.cdap.cdap.etl.spark.io.StageTrackingRecordWriter.write(StageTrackingRecordWriter.java:57)
  at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:368)
  at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:138)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
  at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
  at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.exception.ProgramFailureException: Error occurred in the phase: 'Writing'. Error message: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.cdap.api.exception.ProgramFailureException$Builder.build(ProgramFailureException.java:186)
  at io.cdap.cdap.api.exception.ErrorUtils.getProgramFailureException(ErrorUtils.java:161)
  at io.cdap.plugin.gcp.common.GCPErrorDetailsProvider.getProgramFailureException(GCPErrorDetailsProvider.java:133)
  at io.cdap.plugin.gcp.common.GCPErrorDetailsProvider.getExceptionDetails(GCPErrorDetailsProvider.java:60)
  at io.cdap.cdap.etl.common.ErrorDetails.handleException(ErrorDetails.java:75)
  ... 14 common frames omitted
Caused by: java.lang.IllegalStateException: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.writeSimpleTypes(BigQueryRecordToJson.java:192)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.write(BigQueryRecordToJson.java:96)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.write(BigQueryRecordToJson.java:71)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:51)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:32)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordWriter.write(BigQueryRecordWriter.java:62)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordWriter.write(BigQueryRecordWriter.java:33)
  at io.cdap.cdap.etl.spark.io.TrackingRecordWriter.write(TrackingRecordWriter.java:41)
  at io.cdap.cdap.etl.spark.io.StageTrackingRecordWriter.write(StageTrackingRecordWriter.java:55)
  ... 13 common frames omitted
  Suppressed: java.io.IOException: Incomplete document
  	at com.google.gson.internal.bind.JsonTreeWriter.close(JsonTreeWriter.java:196)
  	at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:56)
  	... 18 common frames omitted

Code change

  • Added BigQueryErrorDetailsProvider.java
  • Modified AbstractBigQuerySink.java
  • Modified BigQueryOutputFormat.java
  • Modified BigQueryRecordToJson.java
  • Modified BigQuerySinkUtils.java
  • Modified BigQueryAvroToStructuredTransformer.java
  • Modified BigQuerySource.java
  • Modified BigQuerySourceUtils.java
  • Modified PartitionedBigQueryInputFormat.java
  • Modified GCPErrorDetailsProvider.java

Unit Tests

  • Modified BigQueryRecordToJsonTest.java

@psainics psainics added the build Trigger unit test build label Nov 6, 2024
@psainics psainics changed the title TEST Error details provider - BigQuerySource Wrap known errors Nov 8, 2024
@psainics psainics changed the title Error details provider - BigQuerySource Wrap known errors [PLUGIN-21080] Error details provider - BigQuerySource Wrap known errors Nov 13, 2024
@psainics psainics changed the title [PLUGIN-21080] Error details provider - BigQuerySource Wrap known errors [PLUGIN-21080] ErrorDetailsProvider - BigQuery Source/Sink plugin Nov 13, 2024
@psainics psainics changed the title [PLUGIN-21080] ErrorDetailsProvider - BigQuery Source/Sink plugin [PLUGIN-1818] ErrorDetailsProvider - BigQuery Source/Sink plugin Nov 13, 2024
@psainics psainics self-assigned this Nov 13, 2024
@psainics psainics marked this pull request as ready for review November 13, 2024 06:57
* @param e The IOException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IOException e, ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is IOException is a user error? Even the error messages are not at all actionable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to convert only those exceptions to ProgramFailureException where we can get enough information from the exceptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In underlying places, if we are throwing IOException we should change it to throw ProgramFailureException rather than assuming IOException will always be user error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are IOException being thrown from BigQueryOutputFormat in code eg.

throw new IOException("The output path '" + outputPath + "' already exists.");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In underlying places, if we are throwing IOException we should change it to throw ProgramFailureException rather than assuming IOException will always be user error.

Ok got it, wll do these changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed underlying IOException to ProgramFailureException.

* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, how is IllegalStateException a user error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An IllegalStateException usually represents a programming or configuration error in the code. Error Type System might be better fit as it suggests something went wrong with the program's state or lifecycle that's under the application's control rather than being an external client issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In code there are places where we can reach IllegalStateException by incorrect user inputs, either we should change those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " +

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this above should be changed to IllegalArgumentException right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed IllegalStateException to SYSTEM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this above should be changed to IllegalArgumentException right ?

right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated !

*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed !

@psainics psainics force-pushed the fem/big-query branch 3 times, most recently from 60c479d to 9d501d3 Compare November 13, 2024 10:33
errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.SYSTEM, true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think error type here should be UNKNOWN, how are we sure it is a system issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What metric can be used to better distinguish between system / unknown error type ?

Here the job failed on BigQuery "System" since the logs are pulled in from the BigQuery, so the reason will likely be known.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job can fail on BigQuery due to many reasons like permission issues as well right? So, we need to decide based on error codes & error reason cannot directly say that it is a system error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to type UNKNOWN

, elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think error type here should be UNKNOWN, how are we sure it is a system issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, UNKNOW is more appropriate here , updated !

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

@psainics psainics force-pushed the fem/big-query branch 2 times, most recently from 170fc3b to 9e79da9 Compare November 14, 2024 09:27
throw new IOException(errorMessage.get(), e);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Bucket already exists", errorMessage.get(),
ErrorType.UNKNOWN, true, e);
Copy link
Member

@itsankit-google itsankit-google Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can decide on error type here based on status code: see

ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);

Only 409 means bucket already exists for others it means the request failed to create bucket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated !

try {
  GCPUtils.createBucket(storage, bucket, location, cmekKeyName);
} catch (StorageException e) {
  if (e.getCode() != 409) {
    // A conflict means the bucket already exists
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}

throw new IOException(errorMessage.get(), e);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Dataset already exists", errorMessage.get(),
ErrorType.UNKNOWN, true, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here:

ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);

Only 409 means dataset already exists for others it means the request failed to create dataset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated !

try {
  bigQuery.create(builder.build());
} catch (BigQueryException e) {
  if (e.getCode() != 409) {
    // A conflict means the dataset already exists (https://cloud.google.com/bigquery/troubleshooting-errors)
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}

*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String subCategory = "IllegalState";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't sound like an appropriate category. Categories do not contain lower level exception name rather high level generic exception types like access issues

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we can point out the exact subCategory, we can omit adding it rather than adding generic stuff in subcategories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, removed, also removed the String subCategory = "IllegalArgument"

collector.getOrThrowException();
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit remove extra empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed !

@itsankit-google
Copy link
Member

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to wrap actual exception in Ioexception here now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment in all places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!
I have made changes in a new commit to help with the review, i will squash those before merging.

@psainics
Copy link
Contributor Author

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

The two places that mention IllegalArgumentException is thrown in description on the condition when out schema has more field than BQ Table, but the case is already handled by adding the error in a collector

    for (String field : remainingBQFields) {
      if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
        collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
                             String.format("Add '%s' to the schema.", field));
      }

is there somewhere i missed it ?

@itsankit-google
Copy link
Member

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

The two places that mention IllegalArgumentException is thrown in description on the condition when out schema has more field than BQ Table, but the case is already handled by adding the error in a collector

    for (String field : remainingBQFields) {
      if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
        collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
                             String.format("Add '%s' to the schema.", field));
      }

is there somewhere i missed it ?

sg.

String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, true, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dependency might not be true in this case since the exception is thrown from our code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, updated !

String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
Copy link
Member

@itsankit-google itsankit-google Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here dependency should not be true unless error is thrown from dependent services client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, updated !

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of minor comments. Otherwise , PR looks good.

Please check once all the e2e tests pass.

@psainics
Copy link
Contributor Author

couple of minor comments. Otherwise , PR looks good.

Please check once all the e2e tests pass.

image

This test seems to be failing on dev branch too, i tried to run this locally, but seems fine.
I even testing table creation manually.
It looks like the table name is being set to null, what are the env changes running locally / on K8 that may be causing this issue ?

@itsankit-google
Copy link
Member

image

This test seems to be failing on dev branch too, i tried to run this locally, but seems fine. I even testing table creation manually. It looks like the table name is being set to null, what are the env changes running locally / on K8 that may be causing this issue ?

Please check with @AnkitCLI from Cloud Sufi team.

@psainics psainics force-pushed the fem/big-query branch 3 times, most recently from e4e302f to 1ea8bba Compare November 19, 2024 15:34
@psainics
Copy link
Contributor Author

This PR will fix BQ E2E error, merge this PR after the fixes.

@psainics psainics merged commit dfc6703 into data-integrations:develop Nov 30, 2024
16 checks passed
@psainics psainics deleted the fem/big-query branch November 30, 2024 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Trigger unit test build error-management

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants