Skip to content

Commit 78192a6

Browse files
committed
Add BigQuery Error Details Provider
1 parent 601f62b commit 78192a6

File tree

7 files changed

+166
-29
lines changed

7 files changed

+166
-29
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigquery.common;
18+
19+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
20+
21+
/**
22+
* A custom ErrorDetailsProvider for BigQuery plugins.
23+
*/
24+
public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider {
25+
26+
@Override
27+
protected String getExternalDocumentationLink() {
28+
return "https://cloud.google.com/bigquery/docs/error-messages";
29+
}
30+
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import io.cdap.cdap.etl.api.FailureCollector;
3535
import io.cdap.cdap.etl.api.batch.BatchSink;
3636
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
37+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3738
import io.cdap.plugin.common.Asset;
39+
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
3840
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
3941
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
4042
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
@@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
116118
storage, bucket, bucketName,
117119
config.getLocation(), cmekKeyName);
118120
}
121+
// set error details provider
122+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
119123
prepareRunInternal(context, bigQuery, bucketName);
120124
}
121125

@@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
124128
String gcsPath;
125129
String bucket = getConfig().getBucket();
126130
if (bucket == null) {
127-
gcsPath = String.format("gs://%s", runUUID.toString());
131+
gcsPath = String.format("gs://%s", runUUID);
128132
} else {
129-
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
133+
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
130134
}
131135
try {
132136
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
@@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle
327331
*
328332
* @return Hadoop configuration
329333
*/
330-
protected Configuration getOutputConfiguration() throws IOException {
331-
Configuration configuration = new Configuration(baseConfiguration);
332-
return configuration;
334+
protected Configuration getOutputConfiguration() {
335+
return new Configuration(baseConfiguration);
333336
}
334337

335338
/**

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import io.cdap.cdap.api.data.format.StructuredRecord;
2121
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2222
import io.cdap.cdap.api.data.schema.Schema;
23+
import io.cdap.cdap.api.exception.ErrorCategory;
24+
import io.cdap.cdap.api.exception.ErrorType;
25+
import io.cdap.cdap.api.exception.ErrorUtils;
2326
import io.cdap.plugin.common.RecordConverter;
2427
import org.apache.avro.generic.GenericRecord;
2528

@@ -90,11 +93,11 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti
9093
try {
9194
LocalDateTime.parse(field.toString());
9295
} catch (DateTimeParseException exception) {
93-
throw new UnexpectedFormatException(
94-
String.format("Datetime field with value '%s' is not in ISO-8601 format.",
95-
fieldSchema.getDisplayName(),
96-
field.toString()),
97-
exception);
96+
String errorMessage = String.format("Datetime field %s with value '%s' is not in ISO-8601 format.",
97+
fieldSchema.getDisplayName(), field);
98+
throw ErrorUtils.getProgramFailureException(
99+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, "DataError"),
100+
errorMessage, exception.getMessage(), ErrorType.UNKNOWN, true, exception);
98101
}
99102
//If properly formatted return the string
100103
return field.toString();
@@ -110,7 +113,9 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti
110113
}
111114
}
112115
} catch (ArithmeticException e) {
113-
throw new IOException("Field type %s has value that is too large." + fieldType);
116+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN,
117+
"DataError"),
118+
"Field type %s has value that is too large.", e.getMessage(), ErrorType.UNKNOWN, true, e);
114119
}
115120

116121
// Complex types like maps and unions are not supported in BigQuery plugins.

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@
4848
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
4949
import io.cdap.cdap.etl.api.connector.Connector;
5050
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
51+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
5152
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5253
import io.cdap.plugin.common.Asset;
5354
import io.cdap.plugin.common.LineageRecorder;
55+
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
5456
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
5557
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
5658
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
@@ -135,7 +137,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
135137

136138
// Create BigQuery client
137139
String serviceAccount = config.getServiceAccount();
138-
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
140+
Credentials credentials = null;
141+
try {
142+
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
143+
} catch (Exception e) {
144+
String errorReason = "Unable to load service account credentials.";
145+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
146+
.withStacktrace(e.getStackTrace());
147+
collector.getOrThrowException();
148+
}
149+
150+
139151
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
140152
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
141153
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
@@ -144,19 +156,30 @@ public void prepareRun(BatchSourceContext context) throws Exception {
144156
bucketPath = UUID.randomUUID().toString();
145157
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
146158
collector.getOrThrowException();
147-
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
148-
config.getServiceAccountType());
159+
try {
160+
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
161+
config.getServiceAccountType());
162+
} catch (Exception e) {
163+
String errorReason = "Failed to create BigQuery configuration.";
164+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
165+
.withStacktrace(e.getStackTrace());
166+
collector.getOrThrowException();
167+
}
149168

150169
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), null,
151170
dataset, config.getBucket());
152171

153172
// Configure GCS Bucket to use
154-
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration,
155-
storage,
156-
bucketName,
157-
dataset,
158-
bucketPath,
159-
cmekKeyName);
173+
String bucket = null;
174+
try {
175+
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
176+
cmekKeyName);
177+
} catch (Exception e) {
178+
String errorReason = "Failed to create bucket.";
179+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
180+
.withStacktrace(e.getStackTrace());
181+
collector.getOrThrowException();
182+
}
160183

161184
// Configure Service account credentials
162185
BigQuerySourceUtils.configureServiceAccount(configuration, config.getConnection());
@@ -166,10 +189,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
166189

167190
// Configure BigQuery input format.
168191
String temporaryGcsPath = BigQuerySourceUtils.getTemporaryGcsPath(bucket, bucketPath, bucketPath);
169-
BigQuerySourceUtils.configureBigQueryInput(configuration,
170-
DatasetId.of(config.getDatasetProject(), config.getDataset()),
171-
config.getTable(),
172-
temporaryGcsPath);
192+
try {
193+
BigQuerySourceUtils.configureBigQueryInput(configuration,
194+
DatasetId.of(config.getDatasetProject(), config.getDataset()),
195+
config.getTable(),
196+
temporaryGcsPath);
197+
} catch (Exception e) {
198+
String errorReason = "Failed to configure BigQuery input.";
199+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
200+
.withStacktrace(e.getStackTrace());
201+
collector.getOrThrowException();
202+
}
173203

174204
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
175205
// We call emitLineage before since it creates the dataset with schema.
@@ -178,6 +208,10 @@ public void prepareRun(BatchSourceContext context) throws Exception {
178208
.setFqn(BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()))
179209
.setLocation(dataset.getLocation())
180210
.build();
211+
212+
// set error details provider
213+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
214+
181215
emitLineage(context, configuredSchema, sourceTableType, config.getTable(), asset);
182216
setInputFormat(context, configuredSchema);
183217
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.google.cloud.kms.v1.CryptoKeyName;
2626
import com.google.cloud.storage.Storage;
2727
import com.google.cloud.storage.StorageException;
28+
import io.cdap.cdap.api.exception.ErrorCategory;
29+
import io.cdap.cdap.api.exception.ErrorType;
30+
import io.cdap.cdap.api.exception.ErrorUtils;
2831
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
2932
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
3033
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
@@ -93,10 +96,12 @@ public static String getOrCreateBucket(Configuration configuration,
9396
// Ignore this and move on, since all that matters is that the bucket exists.
9497
return bucket;
9598
}
96-
throw new IOException(String.format("Unable to create Cloud Storage bucket '%s' in the same " +
99+
String errorMessage = String.format("Unable to create Cloud Storage bucket '%s' in the same " +
97100
"location ('%s') as BigQuery dataset '%s'. " + "Please use a bucket " +
98101
"that is in the same location as the dataset.",
99-
bucket, dataset.getLocation(), dataset.getDatasetId().getDataset()), e);
102+
bucket, dataset.getLocation(), dataset.getDatasetId().getDataset());
103+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
104+
errorMessage, e.getMessage(), ErrorType.USER, true, e);
100105
}
101106
}
102107

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import com.google.common.annotations.VisibleForTesting;
3838
import com.google.common.base.Preconditions;
3939
import com.google.common.base.Strings;
40+
import io.cdap.cdap.api.exception.ErrorCategory;
41+
import io.cdap.cdap.api.exception.ErrorType;
42+
import io.cdap.cdap.api.exception.ErrorUtils;
4043
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
4144
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
4245
import io.cdap.plugin.gcp.common.GCPUtils;
@@ -110,7 +113,8 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
110113
try {
111114
bigQueryHelper = getBigQueryHelper(configuration);
112115
} catch (GeneralSecurityException gse) {
113-
throw new IOException("Failed to create BigQuery client", gse);
116+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
117+
"Failed to create BigQuery client", gse.getMessage(), ErrorType.UNKNOWN, true, gse);
114118
}
115119

116120
List<HadoopConfigurationProperty<?>> hadoopConfigurationProperties = new ArrayList<>(

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import com.google.common.base.Throwables;
2323
import io.cdap.cdap.api.exception.ErrorCategory;
2424
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
25+
import io.cdap.cdap.api.exception.ErrorType;
2526
import io.cdap.cdap.api.exception.ErrorUtils;
2627
import io.cdap.cdap.api.exception.ProgramFailureException;
2728
import io.cdap.cdap.etl.api.exception.ErrorContext;
2829
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
2930

31+
import java.io.IOException;
3032
import java.util.List;
3133

3234
/**
@@ -41,7 +43,6 @@ public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
4143
* @param e The Throwable to get the error information from.
4244
* @return A ProgramFailureException with the given error information, otherwise null.
4345
*/
44-
@Override
4546
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
4647
List<Throwable> causalChain = Throwables.getCausalChain(e);
4748
for (Throwable t : causalChain) {
@@ -52,6 +53,15 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
5253
if (t instanceof HttpResponseException) {
5354
return getProgramFailureException((HttpResponseException) t, errorContext);
5455
}
56+
if (t instanceof IllegalArgumentException) {
57+
return getProgramFailureException((IllegalArgumentException) t, errorContext);
58+
}
59+
if (t instanceof IllegalStateException) {
60+
return getProgramFailureException((IllegalStateException) t, errorContext);
61+
}
62+
if (t instanceof IOException) {
63+
return getProgramFailureException((IOException) t, errorContext);
64+
}
5565
}
5666
return null;
5767
}
@@ -64,7 +74,7 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
6474
* @return A ProgramFailureException with the given error information.
6575
*/
6676
private ProgramFailureException getProgramFailureException(HttpResponseException e,
67-
ErrorContext errorContext) {
77+
ErrorContext errorContext) {
6878
Integer statusCode = e.getStatusCode();
6979
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
7080
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
@@ -93,6 +103,52 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
93103
pair.getErrorType(), true, e);
94104
}
95105

106+
107+
/**
108+
* Get a ProgramFailureException with the given error
109+
* information from {@link IllegalArgumentException}.
110+
*
111+
* @param e The IllegalArgumentException to get the error information from.
112+
* @return A ProgramFailureException with the given error information.
113+
*/
114+
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
115+
String errorMessage = e.getMessage();
116+
String subCategory = "IllegalArgument";
117+
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
118+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
119+
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
120+
}
121+
122+
/**
123+
* Get a ProgramFailureException with the given error
124+
* information from {@link IllegalStateException}.
125+
*
126+
* @param e The IllegalStateException to get the error information from.
127+
* @return A ProgramFailureException with the given error information.
128+
*/
129+
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
130+
String errorMessage = e.getMessage();
131+
String subCategory = "IllegalState";
132+
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
133+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
134+
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
135+
}
136+
137+
/**
138+
* Get a ProgramFailureException with the given error
139+
* information from {@link IOException}.
140+
*
141+
* @param e The IOException to get the error information from.
142+
* @return A ProgramFailureException with the given error information.
143+
*/
144+
private ProgramFailureException getProgramFailureException(IOException e, ErrorContext errorContext) {
145+
String errorMessage = e.getMessage();
146+
String subCategory = "IOException";
147+
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
148+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
149+
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
150+
}
151+
96152
/**
97153
* Get the external documentation link for the client errors if available.
98154
*

0 commit comments

Comments
 (0)