Skip to content

Commit ace4e09

Browse files
code refactoring
1 parent edccf49 commit ace4e09

File tree

10 files changed

+198
-304
lines changed

10 files changed

+198
-304
lines changed

src/main/java/com/sforce/soap/partner/PartnerConnectionRetryWrapper.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616

1717
package com.sforce.soap.partner;
1818

19+
import com.sforce.soap.partner.fault.ApiFault;
1920
import com.sforce.soap.partner.sobject.SObject;
2021
import com.sforce.ws.ConnectionException;
2122
import dev.failsafe.Failsafe;
2223
import dev.failsafe.FailsafeException;
2324
import dev.failsafe.RetryPolicy;
2425
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
25-
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
2626

27+
import java.net.SocketTimeoutException;
2728
import java.util.Arrays;
2829
import java.util.Calendar;
30+
import java.util.Collections;
31+
import java.util.HashSet;
32+
import java.util.Set;
2933
import java.util.concurrent.Callable;
3034
import java.util.stream.Collectors;
3135

@@ -36,6 +40,18 @@ public class PartnerConnectionRetryWrapper extends PartnerConnection {
3640

3741
private final PartnerConnection delegate;
3842
private final RetryPolicy<Object> retryPolicy;
43+
private static final Set<String> RETRYABLE_CONNECTION_ERRORS = new HashSet<>(Arrays.asList(
44+
"connection refused",
45+
"connection timed out",
46+
"failed to send request",
47+
"socket timeout",
48+
"read timed out"
49+
));
50+
51+
private static final Set<String> RETRYABLE_API_FAULT_ERRORS = new HashSet<>(Collections.singletonList(
52+
"SERVER_UNAVAILABLE"
53+
));
54+
3955

4056
public PartnerConnectionRetryWrapper(PartnerConnection delegate, RetryPolicy<Object> retryPolicy) throws
4157
ConnectionException {
@@ -46,7 +62,7 @@ public PartnerConnectionRetryWrapper(PartnerConnection delegate, RetryPolicy<Obj
4662

4763
@Override
4864
public DescribeTab[] describeAllTabs() throws ConnectionException {
49-
return executeWithRetry(() -> delegate.describeAllTabs(), "The describeAllTabs method returned a null result.");
65+
return executeWithRetry(delegate::describeAllTabs, "The describeAllTabs method returned a null result.");
5066
}
5167

5268
@Override
@@ -77,17 +93,17 @@ public ProcessResult[] process(ProcessRequest[] actions) throws ConnectionExcept
7793

7894
@Override
7995
public DescribeGlobalResult describeGlobal() throws ConnectionException {
80-
return executeWithRetry(() -> delegate.describeGlobal(), "The describeGlobal() method returned a null result.");
96+
return executeWithRetry(delegate::describeGlobal, "The describeGlobal() method returned a null result.");
8197
}
8298

8399
@Override
84100
public GetUserInfoResult getUserInfo() throws ConnectionException {
85-
return executeWithRetry(() -> delegate.getUserInfo(), "The UserInfo or OrganizationId is null.");
101+
return executeWithRetry(delegate::getUserInfo, "The UserInfo or OrganizationId is null.");
86102
}
87103

88104
@Override
89105
public DescribeGlobalTheme describeGlobalTheme() throws ConnectionException {
90-
return executeWithRetry(() -> delegate.describeGlobalTheme(),
106+
return executeWithRetry(delegate::describeGlobalTheme,
91107
"The describeGlobalTheme method returned a null result.");
92108
}
93109

@@ -225,7 +241,7 @@ public GetDeletedResult getDeleted(String sObjectType, Calendar startDate, Calen
225241

226242
@Override
227243
public DescribeTabSetResult[] describeTabs() throws ConnectionException {
228-
return executeWithRetry(() -> delegate.describeTabs(),
244+
return executeWithRetry(delegate::describeTabs,
229245
"The describeTabs method returned a null result.");
230246
}
231247

@@ -310,7 +326,7 @@ public DescribeSObjectResult[] describeSObjects(String[] sObjectType) throws Con
310326

311327
@Override
312328
public KnowledgeSettings describeKnowledgeSettings() throws ConnectionException {
313-
return executeWithRetry(() -> delegate.describeKnowledgeSettings(),
329+
return executeWithRetry(delegate::describeKnowledgeSettings,
314330
"The describeKnowledgeSettings method returned a null result.");
315331
}
316332

@@ -380,7 +396,7 @@ public DescribeSObjectResult describeSObject(String sObjectType) throws Connecti
380396

381397
@Override
382398
public GetServerTimestampResult getServerTimestamp() throws ConnectionException {
383-
return executeWithRetry(() -> delegate.getServerTimestamp(),
399+
return executeWithRetry(delegate::getServerTimestamp,
384400
"The getServerTimestamp method returned a null result.");
385401
}
386402

@@ -405,7 +421,7 @@ public ResetPasswordResult resetPassword(String userId) throws ConnectionExcepti
405421

406422
@Override
407423
public DescribeSoftphoneLayoutResult describeSoftphoneLayout() throws ConnectionException {
408-
return executeWithRetry(() -> delegate.describeSoftphoneLayout(),
424+
return executeWithRetry(delegate::describeSoftphoneLayout,
409425
"The describeSoftphoneLayout method returned a null result.");
410426
}
411427

@@ -441,7 +457,7 @@ public DescribeListViewResult[] describeListViews(String[] sObjectType) throws C
441457

442458
@Override
443459
public DescribeDataCategoryMappingResult[] describeDataCategoryMappings() throws ConnectionException {
444-
return executeWithRetry(() -> delegate.describeDataCategoryMappings(),
460+
return executeWithRetry(delegate::describeDataCategoryMappings,
445461
"The describeDataCategoryMappings method returned a null result.");
446462
}
447463

@@ -483,17 +499,34 @@ private <T> T executeWithRetry(Callable<T> operation, String errorContext) throw
483499
}
484500
return result;
485501
} catch (ConnectionException e) {
486-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
502+
if (isRetryableConnectionError(e)) {
487503
throw new SalesforceQueryExecutionException(e);
488504
}
489505
throw e;
490506
}
491507
});
492508
} catch (FailsafeException ex) {
493-
if (ex.getCause() != null && ex.getCause() instanceof ConnectionException) {
509+
if (ex.getCause() instanceof ConnectionException) {
494510
throw (ConnectionException) ex.getCause();
495511
}
496512
throw ex;
497513
}
498514
}
515+
516+
public boolean isRetryableConnectionError(ConnectionException e) {
517+
if (e.getCause() instanceof SocketTimeoutException) {
518+
return true;
519+
}
520+
if (e.getCause() instanceof ApiFault) {
521+
ApiFault apifault = (ApiFault) e.getCause();
522+
return RETRYABLE_API_FAULT_ERRORS.contains(apifault.getExceptionCode().toString());
523+
}
524+
String error = e.getMessage();
525+
if (error == null) {
526+
return false;
527+
}
528+
error = error.toLowerCase();
529+
return RETRYABLE_CONNECTION_ERRORS.stream()
530+
.anyMatch(error::contains);
531+
}
499532
}

src/main/java/io/cdap/plugin/salesforce/SObjectsDescribeResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ public static SObjectsDescribeResult of(PartnerConnection connection, Collection
6161
// split the given sObjects into smaller partitions to ensure we don't exceed the limitation
6262
for (List<String> partition : Lists.partition(new ArrayList<>(sObjects), DESCRIBE_SOBJECTS_LIMIT)) {
6363
DescribeSObjectResult[] describeSObjectResults = connection.describeSObjects(partition.toArray(new String[0]));
64-
Stream.of(describeSObjectResults)
65-
.forEach(result -> addSObjectDescribe(result.getName(), result.getFields(), objectToFieldMap));
66-
64+
Stream.of(describeSObjectResults)
65+
.forEach(result -> addSObjectDescribe(result.getName(), result.getFields(), objectToFieldMap));
6766
}
6867
return new SObjectsDescribeResult(objectToFieldMap);
6968
}

src/main/java/io/cdap/plugin/salesforce/plugin/connector/SalesforceConnector.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.List;
6363
import java.util.Map;
6464

65-
6665
/**
6766
* Salesforce Connector Plugin
6867
*/
@@ -211,17 +210,11 @@ private List<StructuredRecord> listObjectDetails(String object, int limit) throw
211210
SoapRecordToMapTransformer soapRecordToMapTransformer = new SoapRecordToMapTransformer();
212211
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
213212
QueryResult queryResult = partnerConnection.query(query);
214-
215213
SObject[] sObjects = queryResult.getRecords();
216-
217214
Schema schema = SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor);
218-
219215
MapToRecordTransformer transformer = new MapToRecordTransformer();
220216
for (int i = 0; i < sObjects.length; i++) {
221-
record = transformer.transform(
222-
schema,
223-
soapRecordToMapTransformer.transformToMap(sObjects[i], sObjectDescriptor)
224-
);
217+
record = transformer.transform(schema, soapRecordToMapTransformer.transformToMap(sObjects[i], sObjectDescriptor));
225218
samples.add(record);
226219
}
227220
return samples;

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java

Lines changed: 7 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,17 @@
1717

1818
import com.google.common.annotations.VisibleForTesting;
1919
import com.sforce.async.AsyncApiException;
20-
import com.sforce.async.AsyncExceptionCode;
2120
import com.sforce.async.BatchInfo;
2221
import com.sforce.async.BatchStateEnum;
2322
import com.sforce.async.BulkConnection;
24-
import dev.failsafe.Failsafe;
25-
import dev.failsafe.FailsafeException;
26-
import dev.failsafe.RetryPolicy;
27-
import dev.failsafe.TimeoutExceededException;
2823
import io.cdap.cdap.api.data.schema.Schema;
2924
import io.cdap.plugin.salesforce.BulkAPIBatchException;
3025
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
3126
import io.cdap.plugin.salesforce.SalesforceConstants;
3227
import io.cdap.plugin.salesforce.authenticator.Authenticator;
3328
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
3429
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
35-
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
3630
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
37-
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
3831
import org.apache.commons.csv.CSVFormat;
3932
import org.apache.commons.csv.CSVParser;
4033
import org.apache.commons.csv.CSVRecord;
@@ -69,7 +62,6 @@ public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String,
6962
private String[] resultIds;
7063
private int resultIdIndex;
7164
private BulkConnectionRetryWrapper bulkConnectionRetryWrapper;
72-
private RetryPolicy<Object> retryPolicy;
7365

7466
public SalesforceBulkRecordReader(Schema schema) {
7567
this(schema, null, null, null, null);
@@ -112,20 +104,16 @@ public SalesforceBulkRecordReader initialize(InputSplit inputSplit, Authenticato
112104
jobId = salesforceSplit.getJobId();
113105
batchId = salesforceSplit.getBatchId();
114106
LOG.debug("Executing Salesforce Batch Id: '{}' for Job Id: '{}'", batchId, jobId);
115-
retryPolicy = SalesforceSplitUtil.getRetryPolicy(credentials.getInitialRetryDuration(),
116-
credentials.getMaxRetryDuration(),
117-
credentials.getMaxRetryCount(),
118-
credentials.isRetryOnBackendError());
119107
try {
120108
bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
121109
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, credentials.isRetryOnBackendError(),
122110
credentials.getInitialRetryDuration(),
123111
credentials.getMaxRetryDuration(),
124112
credentials.getMaxRetryCount());
125-
resultIds = waitForBatchResults(bulkConnection);
113+
resultIds = waitForBatchResults(bulkConnectionRetryWrapper);
126114
LOG.debug("Batch {} returned {} results", batchId, resultIds.length);
127115
setupParser();
128-
} catch (AsyncApiException | SalesforceQueryExecutionException e) {
116+
} catch (AsyncApiException e) {
129117
throw new RuntimeException(
130118
String.format("Failed to wait for the result of a batch: %s", e.getMessage()),
131119
e);
@@ -193,7 +181,7 @@ public void close() throws IOException {
193181
}
194182

195183
@VisibleForTesting
196-
void setupParser() throws IOException, AsyncApiException, InterruptedException {
184+
void setupParser() throws IOException, AsyncApiException {
197185
if (resultIdIndex >= resultIds.length) {
198186
throw new IllegalArgumentException(String.format("Invalid resultIdIndex %d, should be less than %d",
199187
resultIdIndex, resultIds.length));
@@ -212,32 +200,11 @@ void setupParser() throws IOException, AsyncApiException, InterruptedException {
212200
}
213201
parserIterator = csvParser.iterator();
214202
resultIdIndex++;
215-
} catch (TimeoutExceededException e) {
216-
throw new AsyncApiException("Exhausted retries trying to get query result stream", AsyncExceptionCode.Timeout);
217-
} catch (FailsafeException e) {
218-
if (e.getCause() instanceof InterruptedException) {
219-
throw (InterruptedException) e.getCause();
220-
}
221-
if (e.getCause() instanceof AsyncApiException) {
222-
throw (AsyncApiException) e.getCause();
223-
}
203+
} catch (AsyncApiException e) {
224204
throw e;
225205
}
226206
}
227207

228-
public InputStream getQueryResultStream(BulkConnection bulkConnection)
229-
throws SalesforceQueryExecutionException, AsyncApiException {
230-
try {
231-
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
232-
} catch (AsyncApiException exception) {
233-
LOG.warn("The bulk query job {} failed.", jobId);
234-
if (BulkConnectionRetryWrapper.RETRY_ON_REASON.contains(exception.getExceptionCode())) {
235-
throw new SalesforceQueryExecutionException(exception);
236-
}
237-
throw exception;
238-
}
239-
}
240-
241208
/**
242209
* Wait until a batch with given batchId succeeds, or throw an exception
243210
*
@@ -246,8 +213,8 @@ public InputStream getQueryResultStream(BulkConnection bulkConnection)
246213
* @throws AsyncApiException if there is an issue creating the job
247214
* @throws InterruptedException sleep interrupted
248215
*/
249-
private String[] waitForBatchResults(BulkConnection bulkConnection)
250-
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
216+
private String[] waitForBatchResults(BulkConnectionRetryWrapper bulkConnection)
217+
throws AsyncApiException, InterruptedException {
251218
BatchInfo info = null;
252219
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
253220
try {
@@ -261,20 +228,7 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
261228
}
262229

263230
if (info.getState() == BatchStateEnum.Completed) {
264-
try {
265-
return Failsafe.with(retryPolicy)
266-
.get(() -> getQueryResultList(bulkConnection));
267-
} catch (TimeoutExceededException e) {
268-
throw new AsyncApiException("Exhausted retries trying to get query result list", AsyncExceptionCode.Timeout);
269-
} catch (FailsafeException e) {
270-
if (e.getCause() instanceof InterruptedException) {
271-
throw (InterruptedException) e.getCause();
272-
}
273-
if (e.getCause() instanceof AsyncApiException) {
274-
throw (AsyncApiException) e.getCause();
275-
}
276-
throw e;
277-
}
231+
return bulkConnection.getQueryResultList(jobId, batchId);
278232
} else if (info.getState() == BatchStateEnum.Failed) {
279233
throw new BulkAPIBatchException("Batch failed", info);
280234
} else {
@@ -284,17 +238,4 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
284238
}
285239
throw new BulkAPIBatchException("Timeout waiting for batch results", info);
286240
}
287-
288-
private String[] getQueryResultList(BulkConnection bulkConnection)
289-
throws SalesforceQueryExecutionException, AsyncApiException {
290-
try {
291-
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
292-
} catch (AsyncApiException exception) {
293-
LOG.warn("The bulk query job {} failed.", jobId);
294-
if (BulkConnectionRetryWrapper.RETRY_ON_REASON.contains(exception.getExceptionCode())) {
295-
throw new SalesforceQueryExecutionException(exception);
296-
}
297-
throw exception;
298-
}
299-
}
300241
}

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSoapRecordReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public SalesforceSoapRecordReader(Schema schema, String query, SoapRecordToMapTr
6464
* @param inputSplit specifies batch details
6565
* @param taskAttemptContext task context
6666
*/
67-
6867
@Override
6968
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
7069
Configuration conf = taskAttemptContext.getConfiguration();
@@ -147,7 +146,7 @@ private void queryMore() throws IOException {
147146
try {
148147
sObjects = null;
149148
queryResult = partnerConnection.queryMore(queryResult.getQueryLocator());
150-
} catch (Exception e) {
149+
} catch (ConnectionException e) {
151150
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
152151
throw new IOException(String.format("Cannot create Salesforce SOAP connection for query locator: '%s' :%s",
153152
queryResult.getQueryLocator(), errorMessage), e);

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceWideRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private SObject[] fetchPartition(PartnerConnection partnerConnection, String fie
193193
String[] sObjectIds) {
194194
try {
195195
return partnerConnection.retrieve(fields, sObjectName, sObjectIds);
196-
} catch (Exception e) {
196+
} catch (ConnectionException e) {
197197
LOG.trace("Fetched SObject name: '{}', fields: '{}', Ids: '{}'", sObjectName, fields,
198198
String.join(",", sObjectIds));
199199
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public ConnectorConfig getConfig() {
9393
return bulkConnection.getConfig();
9494
}
9595

96+
public String[] getQueryResultList(String jobId, String batchId)
97+
throws AsyncApiException {
98+
return executeWithRetry(() -> bulkConnection.getQueryResultList(jobId, batchId).getResult(),
99+
String.format("The bulk query job %s failed.", jobId));
100+
}
101+
96102
private <T> T executeWithRetry(Callable<T> operation, String errorContext) throws AsyncApiException {
97103
try {
98104
return Failsafe.with(retryPolicy).get(() -> {
@@ -110,7 +116,7 @@ private <T> T executeWithRetry(Callable<T> operation, String errorContext) throw
110116
}
111117
});
112118
} catch (FailsafeException ex) {
113-
if (ex.getCause() != null && ex.getCause() instanceof AsyncApiException) {
119+
if (ex.getCause() instanceof AsyncApiException) {
114120
throw (AsyncApiException) ex.getCause();
115121
}
116122
throw ex;

0 commit comments

Comments
 (0)