Skip to content

Commit 77a2f18

Browse files
committed
Add BigQuery Error Details Provider
1 parent b49a189 commit 77a2f18

File tree

12 files changed

+235
-76
lines changed

12 files changed

+235
-76
lines changed

src/e2e-test/resources/errorMessage.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ errorMessageInvalidBucketName=Invalid bucket name in path
2828
errorMessageInvalidFormat=Input has multi-level structure that cannot be represented appropriately as csv. \
2929
Consider using json, avro or parquet to write data.
3030
errorMessageMultipleFileWithFirstRowAsHeaderDisabled=Spark program 'phase-1' failed with error: Found a row with 6 fields when the schema only contains 4 fields. Check that the schema contains the right number of fields.. Please check the system logs for more details.
31-
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Year of Jupyter
32-
errorMessageMultipleFileWithoutClearDefaultSchema=Found a row with 4 fields when the schema only contains 2 fields
31+
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error: For input string:
32+
errorMessageMultipleFileWithoutClearDefaultSchema=Spark program 'phase-1' failed with error: Found a row with 4 fields when the schema only contains 2 fields.
3333
errorMessageInvalidSourcePath=Invalid bucket name in path 'abc@'. Bucket name should
3434
errorMessageInvalidDestPath=Invalid bucket name in path 'abc@'. Bucket name should
3535
errorMessageInvalidEncryptionKey=CryptoKeyName.parse: formattedString not in valid format: Parameter "abc@" must be
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigquery.common;
18+
19+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
20+
21+
/**
22+
* A custom ErrorDetailsProvider for BigQuery plugins.
23+
*/
24+
public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider {
25+
26+
@Override
27+
protected String getExternalDocumentationLink() {
28+
return "https://cloud.google.com/bigquery/docs/error-messages";
29+
}
30+
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import io.cdap.cdap.etl.api.FailureCollector;
3535
import io.cdap.cdap.etl.api.batch.BatchSink;
3636
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
37+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3738
import io.cdap.plugin.common.Asset;
39+
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
3840
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
3941
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
4042
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
@@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
116118
storage, bucket, bucketName,
117119
config.getLocation(), cmekKeyName);
118120
}
121+
// set error details provider
122+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
119123
prepareRunInternal(context, bigQuery, bucketName);
120124
}
121125

@@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
124128
String gcsPath;
125129
String bucket = getConfig().getBucket();
126130
if (bucket == null) {
127-
gcsPath = String.format("gs://%s", runUUID.toString());
131+
gcsPath = String.format("gs://%s", runUUID);
128132
} else {
129-
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
133+
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
130134
}
131135
try {
132136
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
@@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle
327331
*
328332
* @return Hadoop configuration
329333
*/
330-
protected Configuration getOutputConfiguration() throws IOException {
331-
Configuration configuration = new Configuration(baseConfiguration);
332-
return configuration;
334+
protected Configuration getOutputConfiguration() {
335+
return new Configuration(baseConfiguration);
333336
}
334337

335338
/**

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
import com.google.common.base.Strings;
6262
import com.google.common.collect.Lists;
6363
import io.cdap.cdap.api.data.format.StructuredRecord;
64+
import io.cdap.cdap.api.exception.ErrorCategory;
65+
import io.cdap.cdap.api.exception.ErrorType;
66+
import io.cdap.cdap.api.exception.ErrorUtils;
67+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
6468
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
6569
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
6670
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
@@ -103,6 +107,7 @@
103107
*/
104108
public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> {
105109
private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class);
110+
private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
106111

107112
@Override
108113
public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
@@ -165,19 +170,30 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
165170
// Error if the output path already exists.
166171
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
167172
if (outputFileSystem.exists(outputPath)) {
168-
throw new IOException("The output path '" + outputPath + "' already exists.");
173+
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
174+
throw ErrorUtils.getProgramFailureException(
175+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
176+
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
177+
new IOException(errorMessage));
169178
}
170179

171180
// Error if compression is set as there's mixed support in BigQuery.
172181
if (FileOutputFormat.getCompressOutput(job)) {
173-
throw new IOException("Compression isn't supported for this OutputFormat.");
182+
String errorMessage = "Compression isn't supported for this OutputFormat.";
183+
throw ErrorUtils.getProgramFailureException(
184+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
185+
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
186+
new IOException(errorMessage));
174187
}
175188

176189
// Error if unable to create a BigQuery helper.
177190
try {
178191
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
179192
} catch (GeneralSecurityException gse) {
180-
throw new IOException("Failed to create BigQuery client", gse);
193+
String errorMessage = "Failed to create BigQuery client";
194+
throw ErrorUtils.getProgramFailureException(
195+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat,
196+
ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse);
181197
}
182198

183199
// Let delegate process its checks.
@@ -208,7 +224,10 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
208224
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
209225
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
210226
} catch (GeneralSecurityException e) {
211-
throw new IOException("Failed to create Bigquery client.", e);
227+
String errorMessage = "Failed to create BigQuery client";
228+
throw ErrorUtils.getProgramFailureException(
229+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
230+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
212231
}
213232
}
214233

@@ -266,7 +285,10 @@ public void commitJob(JobContext jobContext) throws IOException {
266285
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
267286
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
268287
} catch (Exception e) {
269-
throw new IOException("Failed to import GCS into BigQuery. ", e);
288+
String errorMessage = "Failed to import GCS into BigQuery.";
289+
throw ErrorUtils.getProgramFailureException(
290+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
291+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
270292
}
271293

272294
cleanup(jobContext);
@@ -566,26 +588,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
566588
int numOfErrors;
567589
String errorMessage;
568590
if (errors == null || errors.isEmpty()) {
569-
errorMessage = pollJob.getStatus().getErrorResult().getMessage();
591+
errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(),
592+
pollJob.getStatus().getErrorResult().getMessage());
570593
numOfErrors = 1;
571594
} else {
572-
errorMessage = errors.get(errors.size() - 1).getMessage();
595+
errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(),
596+
errors.get(errors.size() - 1).getMessage());
573597
numOfErrors = errors.size();
574598
}
575599
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
576-
throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." +
577-
" There are total %s error(s) for BigQuery job %s. Please look at " +
578-
"BigQuery job logs for more information.",
579-
errorMessage, numOfErrors, jobReference.getJobId()));
600+
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
601+
" There are total %s error(s) for BigQuery job %s. Please look at " +
602+
"BigQuery job logs for more information.",
603+
errorMessage, numOfErrors, jobReference.getJobId());
604+
throw ErrorUtils.getProgramFailureException(
605+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
606+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
607+
new IOException(errorMessageException));
608+
580609
}
581610
} else {
582611
long millisToWait = pollBackOff.nextBackOffMillis();
583612
if (millisToWait == BackOff.STOP) {
584-
throw new IOException(
585-
String.format(
586-
"Job %s failed to complete after %s millis.",
587-
jobReference.getJobId(),
588-
elapsedTime));
613+
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
614+
, elapsedTime);
615+
throw ErrorUtils.getProgramFailureException(
616+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
617+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
618+
new IOException(errorMessage));
589619
}
590620
// Pause execution for the configured duration before polling job status again.
591621
Thread.sleep(millisToWait);
@@ -621,8 +651,11 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
621651
TableSchema tableSchema = createTableSchemaFromFields(fieldsJson);
622652
return Optional.of(tableSchema);
623653
} catch (IOException e) {
624-
throw new IOException(
625-
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e);
654+
String errorMessage = String.format("Unable to parse key '%s'.",
655+
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
656+
throw ErrorUtils.getProgramFailureException(
657+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
658+
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
626659
}
627660
}
628661
return Optional.empty();

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA
189189
} else if (jsonString.startsWith("[") && jsonString.endsWith("]")) {
190190
writeJsonArrayToWriter(gson.fromJson(jsonString, JsonArray.class), writer);
191191
} else {
192-
throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " +
192+
throw new IllegalArgumentException(String.format("Expected value of Field '%s' to be a valid JSON " +
193193
"object or array.", name));
194194
}
195195
break;

0 commit comments

Comments
 (0)