Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
157 changes: 157 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import java.io.IOException;

/**
* Utility class to handle exceptions.
*/
public class ExceptionUtils {

/** Functional interfaces for lambda-friendly method invocations */
@FunctionalInterface
public interface IOOperation {
void execute() throws IOException;
}

/**
* Functional interfaces for lambda-friendly method invocations.
*
* @param <T> the return type of the function
*/
@FunctionalInterface
public interface IOFunction<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider using Callable / Runnable instead of Function / Operation. This terms are more used in Java

T execute() throws IOException;
}

/** Functional interfaces for lambda-friendly method invocations */
@FunctionalInterface
public interface IOInterruptibleOperation {
void execute() throws IOException, InterruptedException;
}

/**
* Functional interfaces for lambda-friendly method invocations.
*
* @param <T> the return type of the function
*/
@FunctionalInterface
public interface IOInterruptibleFunction<T> {

T execute () throws IOException, InterruptedException;
}

// Generic helper method to handle IOException propagation
public static void invokeWithProgramFailureHandling(IOOperation operation) throws IOException {
try {
operation.execute();
} catch (IOException e) {
ProgramFailureException exception = getProgramFailureException(e);
if (exception != null) {
throw exception;
}
throw e;
}
}

// Helper method for returning values (for methods like {@link OutputCommitter#needsTaskCommit})
public static <T> T invokeWithProgramFailureHandling(IOFunction<T> function) throws IOException {
try {
return function.execute();
} catch (IOException e) {
ProgramFailureException exception = getProgramFailureException(e);
if (exception != null) {
throw exception;
}
throw e;
}
}

// Helper method for handling both IOException and InterruptedException
public static void invokeWithProgramFailureAndInterruptionHandling(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be able to go with less boilerplate by using generic exceptions: https://www.mscharhag.com/java/java-exceptions-and-generic-types

IOInterruptibleOperation operation) throws IOException, InterruptedException {
try {
operation.execute();
} catch (IOException e) {
ProgramFailureException exception = getProgramFailureException(e);
if (exception != null) {
throw exception;
}
throw e;
}
}

// Helper method for handling both IOException and InterruptedException
public static <T> T invokeWithProgramFailureAndInterruptionHandling(
IOInterruptibleFunction<T> function) throws IOException, InterruptedException {
try {
return function.execute();
} catch (IOException e) {
ProgramFailureException exception = getProgramFailureException(e);
if (exception != null) {
throw exception;
}
throw e;
}
}

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private static ProgramFailureException getProgramFailureException(HttpResponseException e) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());

String errorMessage = e.getMessage();
if (e instanceof GoogleJsonResponseException) {
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
exception.getMessage();
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, pair.getErrorType(), true, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link IOException}.
*
* @param e The IOException to get the error information from.
* @return A ProgramFailureException with the given error information, otherwise null.
*/
private static ProgramFailureException getProgramFailureException(IOException e) {
Throwable target = e instanceof HttpResponseException ? e : e.getCause();
if (target instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) target);
}
return null;
}
}
7 changes: 6 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.gcp.common;

import com.google.api.client.http.HttpResponseException;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
Expand All @@ -35,6 +36,10 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,7 +84,7 @@ public class GCPUtils {
"https://www.googleapis.com/auth/bigquery");
public static final String FQN_RESERVED_CHARACTERS_PATTERN = ".*[.:` \t\n].*";
public static final int MILLISECONDS_MULTIPLIER = 1000;

public static final String WRAPPED_OUTPUTFORMAT_CLASSNAME = "wrapped.outputformat.classname";
/**
* Load a service account from the local file system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
} catch (IOException e) {
throw new RuntimeException(e);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
}
}

@Override
public void refresh() throws IOException {
getCredentials().refresh();
try {
getCredentials().refresh();
} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to refresh service account access token.", e.getMessage(),
ErrorType.UNKNOWN, true, e);
}
}

private GoogleCredentials getCredentials() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
taskAttemptContext.getConfiguration(), tableName));

//Wrap output committer into the GCS Output Committer.
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
ForwardingOutputCommitter gcsOutputCommitter =
new ForwardingOutputCommitter(
new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext)));

gcsOutputCommitter.setupJob(taskAttemptContext);
gcsOutputCommitter.setupTask(taskAttemptContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
Expand Down Expand Up @@ -46,11 +51,13 @@ public DelegatingGCSOutputFormat() {
* Get required configuration properties for this Output Format
*/
public static Map<String, String> configure(String delegateClassName,
String wrappedClassName,
String filterField,
String outputBaseDir,
String outputSuffix) {
Map<String, String> config = new HashMap<>();
config.put(DELEGATE_CLASS, delegateClassName);
config.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, wrappedClassName);
config.put(PARTITION_FIELD, filterField);
config.put(OUTPUT_PATH_BASE_DIR, outputBaseDir);
config.put(OUTPUT_PATH_SUFFIX, outputSuffix);
Expand All @@ -62,7 +69,8 @@ public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptC
Configuration hConf = context.getConfiguration();
String partitionField = hConf.get(PARTITION_FIELD);

return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context));
return new ForwardingRecordWriter(new DelegatingGCSRecordWriter(context, partitionField,
getOutputCommitter(context), this));
}

@Override
Expand All @@ -71,8 +79,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
}

@Override
public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new DelegatingGCSOutputCommitter(context);
public ForwardingOutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new ForwardingOutputCommitter(new DelegatingGCSOutputCommitter(context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
Expand All @@ -36,7 +40,9 @@ public static OutputFormat<NullWritable, StructuredRecord> getDelegateFormat(Con
(Class<OutputFormat<NullWritable, StructuredRecord>>) hConf.getClassByName(delegateClassName);
return delegateClass.newInstance();
} catch (Exception e) {
throw new IOException("Unable to instantiate output format for class " + delegateClassName, e);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Unable to instantiate output format for class '%s'.", delegateClassName),
e.getMessage(), ErrorType.SYSTEM, false, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, Struct
private final TaskAttemptContext context;
private final String partitionField;
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
private final DelegatingGCSOutputCommitter delegatingGCSOutputCommitter;
private final ForwardingOutputCommitter delegatingGCSOutputCommitter;
private final DelegatingGCSOutputFormat outputFormat;

DelegatingGCSRecordWriter(TaskAttemptContext context,
String partitionField,
DelegatingGCSOutputCommitter delegatingGCSOutputCommitter) {
DelegatingGCSRecordWriter(TaskAttemptContext context, String partitionField,
ForwardingOutputCommitter delegatingGCSOutputCommitter, DelegatingGCSOutputFormat outputFormat) {
this.context = context;
this.partitionField = partitionField;
this.delegateMap = new HashMap<>();
this.delegatingGCSOutputCommitter = delegatingGCSOutputCommitter;
this.outputFormat = outputFormat;
}

@Override
Expand All @@ -55,6 +56,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
if (delegateMap.containsKey(tableName)) {
delegate = delegateMap.get(tableName);
} else {

//Get output format from configuration.
OutputFormat<NullWritable, StructuredRecord> format =
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());
Expand All @@ -63,7 +65,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);

//Add record writer to delegate map.
delegate = format.getRecordWriter(context);
delegate = new ForwardingRecordWriter(format.getRecordWriter(context));
delegateMap.put(tableName, delegate);
}

Expand Down
Loading
Loading