Skip to content

Commit a43738a

Browse files
Added generic method for retry logic
1 parent 8784878 commit a43738a

File tree

6 files changed

+108
-183
lines changed

6 files changed

+108
-183
lines changed

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
228228
);
229229

230230
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
231-
GetUserInfoResult userInfo = PartnerConnectionRetryWrapper.getUserInfoWithRetry(partnerConnection, this);
231+
GetUserInfoResult userInfo = PartnerConnectionRetryWrapper.getUserInfoWithRetry(partnerConnection);
232232
return userInfo.getOrganizationId();
233233
}
234234

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public void validateSObjects(FailureCollector collector, @Nullable OAuthInfo oAu
186186
this.getConnection().getProxyUrl());
187187
PartnerConnection partnerConnection =
188188
SalesforceConnectionUtil.getPartnerConnection(credentials);
189-
describeGlobalResult = PartnerConnectionRetryWrapper.describeGlobalWithRetry(partnerConnection, this);
189+
describeGlobalResult = PartnerConnectionRetryWrapper.describeGlobalWithRetry(partnerConnection);
190190
} catch (ConnectionException e) {
191191
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
192192
throw new IllegalArgumentException(String.format("Unable to connect to Salesforce due to error: %s", message), e);
@@ -229,7 +229,7 @@ private List<String> getSObjects() {
229229
try {
230230
PartnerConnection partnerConnection =
231231
SalesforceConnectionUtil.getPartnerConnection(getConnection().getAuthenticatorCredentials());
232-
describeGlobalResult = PartnerConnectionRetryWrapper.describeGlobalWithRetry(partnerConnection, this);
232+
describeGlobalResult = PartnerConnectionRetryWrapper.describeGlobalWithRetry(partnerConnection);
233233
} catch (ConnectionException e) {
234234
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
235235
throw new IllegalArgumentException(String.format("Unable to connect to Salesforce due to error: %s", message), e);

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSoapRecordReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,10 @@ public SalesforceSoapRecordReader(Schema schema, String query, SoapRecordToMapTr
6666
* @param taskAttemptContext task context
6767
*/
6868

69-
private Configuration configuration;
7069
@Override
7170
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
72-
this.configuration = taskAttemptContext.getConfiguration();
73-
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(configuration);
71+
Configuration conf = taskAttemptContext.getConfiguration();
72+
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
7473
initialize(credentials);
7574
}
7675

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/PartnerConnectionRetryWrapper.java

Lines changed: 102 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,64 @@
2323
import com.sforce.soap.partner.sobject.SObject;
2424
import com.sforce.ws.ConnectionException;
2525
import dev.failsafe.Failsafe;
26-
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBaseSourceConfig;
26+
import dev.failsafe.FailsafeException;
27+
import dev.failsafe.RetryPolicy;
2728

2829
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.concurrent.Callable;
32+
import java.util.function.Predicate;
33+
2934

3035

3136
/**
3237
* A utility class that wraps Salesforce PartnerConnection calls with retry logic using Failsafe.
3338
*/
3439
public class PartnerConnectionRetryWrapper {
3540

41+
/**
42+
* Executes a Salesforce PartnerConnection operation with retry logic using the provided {@link RetryPolicy}.
43+
* <p>
44+
* This method wraps the execution in {@link Failsafe} to handle transient {@link ConnectionException}s
45+
* and retries the operation if the exception is considered retryable.
46+
* It also performs a null or custom result validation check, throwing an {@link IllegalArgumentException}
47+
* if the result is invalid.
48+
*
49+
* @param retryPolicy the retry policy to use for retries
50+
* @param operation the Salesforce PartnerConnection operation to execute
51+
* @param errorContext a descriptive message used when the result is null or fails the validation check
52+
* @param nullCheck an optional predicate to validate the result; can be {@code null}
53+
* @param <T> the type of the result returned by the operation
54+
* @return the result of the successful operation
55+
* @throws ConnectionException if the operation fails with a non-retryable {@link ConnectionException}
56+
* @throws FailsafeException if the operation fails with a non-ConnectionException or all retries are exhausted
57+
* @throws IllegalArgumentException if the result is null or fails the provided validation check
58+
*/
59+
private static <T> T executeWithRetry(RetryPolicy<Object> retryPolicy, Callable<T> operation,
60+
String errorContext, Predicate<T> nullCheck) throws ConnectionException {
61+
try {
62+
return Failsafe.with(retryPolicy).get(() -> {
63+
try {
64+
T result = operation.call();
65+
if (nullCheck != null && (result == null || !nullCheck.test(result))) {
66+
throw new IllegalArgumentException(errorContext);
67+
}
68+
return result;
69+
} catch (ConnectionException e) {
70+
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
71+
throw new SalesforceQueryExecutionException(e);
72+
}
73+
throw e;
74+
}
75+
});
76+
} catch (FailsafeException ex) {
77+
if (ex.getCause() instanceof ConnectionException) {
78+
throw (ConnectionException) ex.getCause();
79+
}
80+
throw ex;
81+
}
82+
}
83+
3684
/**
3785
* Executes a SOQL query using PartnerConnection with retry logic.
3886
*
@@ -42,20 +90,12 @@ public class PartnerConnectionRetryWrapper {
4290
* @throws ConnectionException if the query fails and is not retryable
4391
*/
4492
public static QueryResult queryWithRetry(PartnerConnection connection, String query) throws ConnectionException {
45-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
46-
try {
47-
QueryResult result = connection.query(query);
48-
if (result == null) {
49-
throw new IllegalArgumentException("The query returned a null result: " + query);
50-
}
51-
return result;
52-
} catch (ConnectionException e) {
53-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
54-
throw new SalesforceQueryExecutionException(e);
55-
}
56-
throw e;
57-
}
58-
});
93+
return executeWithRetry(
94+
SalesforceSplitUtil.getDefaultRetryPolicy(),
95+
() -> connection.query(query),
96+
"The query returned a null result: " + query,
97+
Objects::nonNull
98+
);
5999
}
60100

61101
/**
@@ -69,21 +109,12 @@ public static QueryResult queryWithRetry(PartnerConnection connection, String qu
69109
*/
70110
public static QueryResult queryMoreWithRetry(PartnerConnection connection, String locator,
71111
String originalQueryForContext) throws ConnectionException {
72-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
73-
try {
74-
QueryResult result = connection.queryMore(locator);
75-
if (result == null) {
76-
throw new IllegalArgumentException("The QueryMore returned a null result for locator used in query: "
77-
+ originalQueryForContext);
78-
}
79-
return result;
80-
} catch (ConnectionException e) {
81-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
82-
throw new SalesforceQueryExecutionException(e);
83-
}
84-
throw e;
85-
}
86-
});
112+
return executeWithRetry(
113+
SalesforceSplitUtil.getDefaultRetryPolicy(),
114+
() -> connection.queryMore(locator),
115+
"The QueryMore returned a null result for locator used in query: " + originalQueryForContext,
116+
Objects::nonNull
117+
);
87118
}
88119

89120
/**
@@ -97,20 +128,12 @@ public static QueryResult queryMoreWithRetry(PartnerConnection connection, Strin
97128
public static DescribeSObjectResult[] describeSObjectsWithRetry(PartnerConnection connection,
98129
List<String> sObjectNames) throws
99130
ConnectionException {
100-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
101-
try {
102-
DescribeSObjectResult[] result = connection.describeSObjects(sObjectNames.toArray(new String[0]));
103-
if (result == null) {
104-
throw new IllegalArgumentException("The describeSObjects returned a null result: " + sObjectNames);
105-
}
106-
return result;
107-
} catch (ConnectionException e) {
108-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
109-
throw new SalesforceQueryExecutionException(e);
110-
}
111-
throw e;
112-
}
113-
});
131+
return executeWithRetry(
132+
SalesforceSplitUtil.getDefaultRetryPolicy(),
133+
() -> connection.describeSObjects(sObjectNames.toArray(new String[0])),
134+
"The describeSObjects returned a null result: " + sObjectNames,
135+
Objects::nonNull
136+
);
114137
}
115138

116139
/**
@@ -123,20 +146,12 @@ public static DescribeSObjectResult[] describeSObjectsWithRetry(PartnerConnectio
123146
*/
124147
public static DescribeSObjectResult describeSObjectWithRetry(PartnerConnection connection, String sObjectName)
125148
throws ConnectionException {
126-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
127-
try {
128-
DescribeSObjectResult result = connection.describeSObject(sObjectName);
129-
if (result == null) {
130-
throw new IllegalArgumentException("The describeSObject returned a null result: " + sObjectName);
131-
}
132-
return result;
133-
} catch (ConnectionException e) {
134-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
135-
throw new SalesforceQueryExecutionException(e);
136-
}
137-
throw e;
138-
}
139-
});
149+
return executeWithRetry(
150+
SalesforceSplitUtil.getDefaultRetryPolicy(),
151+
() -> connection.describeSObject(sObjectName),
152+
"The describeSObject returned a null result: " + sObjectName,
153+
Objects::nonNull
154+
);
140155
}
141156

142157
/**
@@ -148,76 +163,12 @@ public static DescribeSObjectResult describeSObjectWithRetry(PartnerConnection c
148163
*/
149164
public static DescribeGlobalResult describeGlobalWithRetry(PartnerConnection connection)
150165
throws ConnectionException {
151-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
152-
try {
153-
DescribeGlobalResult result = connection.describeGlobal();
154-
if (result == null) {
155-
throw new IllegalArgumentException("The describeGlobal() returned a null result");
156-
}
157-
return result;
158-
} catch (ConnectionException e) {
159-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
160-
throw new SalesforceQueryExecutionException(e);
161-
}
162-
throw e;
163-
}
164-
});
165-
}
166-
167-
/**
168-
* Describes all available Salesforce objects using describeGlobal with a custom retry policy from config.
169-
*
170-
* @param connection the Salesforce PartnerConnection
171-
* @param config the base source config containing retry parameters
172-
* @return DescribeGlobalResult containing metadata for all objects
173-
* @throws ConnectionException if the describeGlobal call fails and is not retryable
174-
*/
175-
public static DescribeGlobalResult describeGlobalWithRetry(PartnerConnection connection,
176-
SalesforceBaseSourceConfig config)
177-
throws ConnectionException {
178-
return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(config.getInitialRetryDuration(),
179-
config.getMaxRetryDuration(), config.getMaxRetryCount())).get(() -> {
180-
try {
181-
DescribeGlobalResult result = connection.describeGlobal();
182-
if (result == null) {
183-
throw new IllegalArgumentException("The describeGlobal() returned a null result");
184-
}
185-
return result;
186-
} catch (ConnectionException e) {
187-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
188-
throw new SalesforceQueryExecutionException(e);
189-
}
190-
throw e;
191-
}
192-
});
193-
}
194-
195-
/**
196-
* Retrieves user info from the Salesforce connection with retry logic using config.
197-
*
198-
* @param connection the Salesforce PartnerConnection
199-
* @param config the base source config containing retry parameters
200-
* @return GetUserInfoResult containing user and organization info
201-
* @throws ConnectionException if the getUserInfo call fails and is not retryable
202-
*/
203-
public static GetUserInfoResult getUserInfoWithRetry(PartnerConnection connection,
204-
SalesforceBaseSourceConfig config)
205-
throws ConnectionException {
206-
return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(config.getInitialRetryDuration(),
207-
config.getMaxRetryDuration(), config.getMaxRetryCount())).get(() -> {
208-
try {
209-
GetUserInfoResult userInfo = connection.getUserInfo();
210-
if (userInfo == null || userInfo.getOrganizationId() == null) {
211-
throw new IllegalArgumentException("UserInfo or OrganizationId is null");
212-
}
213-
return userInfo;
214-
} catch (ConnectionException e) {
215-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
216-
throw new SalesforceQueryExecutionException(e);
217-
}
218-
throw e;
219-
}
220-
});
166+
return executeWithRetry(
167+
SalesforceSplitUtil.getDefaultRetryPolicy(),
168+
connection::describeGlobal,
169+
"The describeGlobal() returned a null result",
170+
Objects::nonNull
171+
);
221172
}
222173

223174
/**
@@ -234,20 +185,29 @@ public static SObject[] retrieveWithRetry(PartnerConnection connection,
234185
String fields,
235186
String sObjectName,
236187
String[] sObjectIds) throws ConnectionException {
237-
return Failsafe.with(SalesforceSplitUtil.getDefaultRetryPolicy()).get(() -> {
238-
try {
239-
SObject[] result = connection.retrieve(fields, sObjectName, sObjectIds);
240-
if (result == null) {
241-
throw new IllegalArgumentException("The Retrieve returned a null result: " + sObjectName);
242-
}
243-
return result;
244-
} catch (ConnectionException e) {
245-
if (SalesforceSplitUtil.isRetryableConnectionError(e)) {
246-
throw new SalesforceQueryExecutionException(e);
247-
}
248-
throw e;
249-
}
250-
});
188+
return executeWithRetry(
189+
SalesforceSplitUtil.getDefaultRetryPolicy(),
190+
() -> connection.retrieve(fields, sObjectName, sObjectIds),
191+
"The Retrieve returned a null result: " + sObjectName,
192+
Objects::nonNull
193+
);
194+
}
195+
196+
/**
197+
* Retrieves user info from the Salesforce connection with retry logic.
198+
*
199+
* @param connection the Salesforce PartnerConnection
200+
* @return GetUserInfoResult containing user and organization info
201+
* @throws ConnectionException if the getUserInfo call fails and is not retryable
202+
*/
203+
public static GetUserInfoResult getUserInfoWithRetry(PartnerConnection connection)
204+
throws ConnectionException {
205+
return executeWithRetry(
206+
SalesforceSplitUtil.getDefaultRetryPolicy(),
207+
() -> connection.getUserInfo(),
208+
"UserInfo or OrganizationId is null",
209+
result -> result != null && result.getOrganizationId() != null
210+
);
251211
}
252212

253213
}

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,13 +259,6 @@ private static OperationEnum getOperationEnum(String operation) {
259259
}
260260
}
261261

262-
public static RetryPolicy<Object> getRetryPolicy(SalesforceSourceConfig config) {
263-
return getRetryPolicy(
264-
config.getInitialRetryDuration(),
265-
config.getMaxRetryDuration(),
266-
config.getMaxRetryCount());
267-
}
268-
269262
public static RetryPolicy<Object> getRetryPolicy(Long initialRetryDuration, Long maxRetryDuration,
270263
Integer maxRetryCount) {
271264
// Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds.

0 commit comments

Comments
 (0)