Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/e2e-test/resources/errorMessage.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ errorMessageInvalidBucketName=Invalid bucket name in path
errorMessageInvalidFormat=Input has multi-level structure that cannot be represented appropriately as csv. \
Consider using json, avro or parquet to write data.
errorMessageMultipleFileWithFirstRowAsHeaderDisabled=Spark program 'phase-1' failed with error: Found a row with 6 fields when the schema only contains 4 fields. Check that the schema contains the right number of fields.. Please check the system logs for more details.
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Year of Jupyter
errorMessageMultipleFileWithoutClearDefaultSchema=Found a row with 4 fields when the schema only contains 2 fields
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error:
errorMessageMultipleFileWithoutClearDefaultSchema=Spark program 'phase-1' failed with error: Found a row with 4 fields when the schema only contains 2 fields.
errorMessageInvalidSourcePath=Invalid bucket name in path 'abc@'. Bucket name should
errorMessageInvalidDestPath=Invalid bucket name in path 'abc@'. Bucket name should
errorMessageInvalidEncryptionKey=CryptoKeyName.parse: formattedString not in valid format: Parameter "abc@" must be
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.common;

import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;

/**
* A custom ErrorDetailsProvider for BigQuery plugins.
*/
public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider {

@Override
protected String getExternalDocumentationLink() {
return "https://cloud.google.com/bigquery/docs/error-messages";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
Expand Down Expand Up @@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
storage, bucket, bucketName,
config.getLocation(), cmekKeyName);
}
// set error details provider
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
prepareRunInternal(context, bigQuery, bucketName);
}

Expand All @@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = String.format("gs://%s", runUUID.toString());
gcsPath = String.format("gs://%s", runUUID);
} else {
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
}
try {
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
Expand Down Expand Up @@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle
*
* @return Hadoop configuration
*/
protected Configuration getOutputConfiguration() throws IOException {
Configuration configuration = new Configuration(baseConfiguration);
return configuration;
protected Configuration getOutputConfiguration() {
return new Configuration(baseConfiguration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
Expand Down Expand Up @@ -103,6 +107,7 @@
*/
public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class);
private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

@Override
public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
Expand Down Expand Up @@ -165,19 +170,30 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
throw new IOException("The output path '" + outputPath + "' already exists.");
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
}

// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
throw new IOException("Compression isn't supported for this OutputFormat.");
String errorMessage = "Compression isn't supported for this OutputFormat.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
}

// Error if unable to create a BigQuery helper.
try {
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat,
ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse);
}

// Let delegate process its checks.
Expand Down Expand Up @@ -208,7 +224,10 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
} catch (GeneralSecurityException e) {
throw new IOException("Failed to create Bigquery client.", e);
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
}
}

Expand Down Expand Up @@ -266,7 +285,10 @@ public void commitJob(JobContext jobContext) throws IOException {
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
} catch (Exception e) {
throw new IOException("Failed to import GCS into BigQuery. ", e);
String errorMessage = "Failed to import GCS into BigQuery.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
}

cleanup(jobContext);
Expand Down Expand Up @@ -566,26 +588,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
int numOfErrors;
String errorMessage;
if (errors == null || errors.isEmpty()) {
errorMessage = pollJob.getStatus().getErrorResult().getMessage();
errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(),
pollJob.getStatus().getErrorResult().getMessage());
numOfErrors = 1;
} else {
errorMessage = errors.get(errors.size() - 1).getMessage();
errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(),
errors.get(errors.size() - 1).getMessage());
numOfErrors = errors.size();
}
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId()));
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
new IOException(errorMessageException));

}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
throw new IOException(
String.format(
"Job %s failed to complete after %s millis.",
jobReference.getJobId(),
elapsedTime));
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
, elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
new IOException(errorMessage));
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand Down Expand Up @@ -621,8 +651,11 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
TableSchema tableSchema = createTableSchemaFromFields(fieldsJson);
return Optional.of(tableSchema);
} catch (IOException e) {
throw new IOException(
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e);
String errorMessage = String.format("Unable to parse key '%s'.",
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA
} else if (jsonString.startsWith("[") && jsonString.endsWith("]")) {
writeJsonArrayToWriter(gson.fromJson(jsonString, JsonArray.class), writer);
} else {
throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " +
throw new IllegalArgumentException(String.format("Expected value of Field '%s' to be a valid JSON " +
"object or array.", name));
}
break;
Expand Down
Loading
Loading