Skip to content

Commit e02f96c

Browse files
committed
Refactored timeoutExecutor to be a member variable. 🫩
1 parent 7cc99a3 commit e02f96c

File tree

2 files changed

+27
-36
lines changed

2 files changed

+27
-36
lines changed

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@
165165
import java.util.Properties;
166166
import java.util.Set;
167167
import java.util.concurrent.Executors;
168-
import java.util.concurrent.Future;
169168
import java.util.concurrent.ScheduledExecutorService;
170169
import java.util.concurrent.TimeUnit;
171170
import java.util.stream.Collectors;
@@ -248,6 +247,7 @@ public class VenicePushJob implements AutoCloseable {
248247
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
249248
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
250249
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;
250+
private final ScheduledExecutorService timeoutExecutor;
251251

252252
/**
253253
* @param jobId id of the job
@@ -256,6 +256,7 @@ public class VenicePushJob implements AutoCloseable {
256256
public VenicePushJob(String jobId, Properties vanillaProps) {
257257
this.jobId = jobId;
258258
this.props = getVenicePropsFromVanillaProps(Objects.requireNonNull(vanillaProps, "VPJ props cannot be null"));
259+
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
259260
LOGGER.info("Constructing {}: {}", VenicePushJob.class.getSimpleName(), props.toString(true));
260261
this.sslProperties = Lazy.of(() -> {
261262
try {
@@ -659,36 +660,23 @@ DataWriterComputeJob getDataWriterComputeJob() {
659660
* @throws VeniceException
660661
*/
661662
public void run() {
662-
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
663-
pushJobSetting.enableSSL,
664-
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
665-
this.sslProperties);
666-
initControllerClient(pushJobSetting.storeName, sslFactory);
667-
668-
ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
669-
long bootstrapToOnlineTimeoutInHours =
670-
getStoreResponse(pushJobSetting.storeName).getStore().getBootstrapToOnlineTimeoutInHours();
671-
Future<?> timeoutFuture = timeoutExecutor.schedule(() -> {
672-
LOGGER.error("Timeout reached. Stopping the job.");
673-
cancel();
674-
}, bootstrapToOnlineTimeoutInHours, TimeUnit.HOURS);
675-
676-
try {
677-
runPushJob();
678-
} finally {
679-
timeoutFuture.cancel(true);
680-
timeoutExecutor.shutdown();
681-
}
682-
}
683-
684-
private void runPushJob() {
685663
try {
664+
initControllerClient(pushJobSetting.storeName);
686665
pushJobSetting.clusterName = controllerClient.getClusterName();
687666
LOGGER.info(
688667
"The store {} is discovered in Venice cluster {}",
689668
pushJobSetting.storeName,
690669
pushJobSetting.clusterName);
691670

671+
long bootstrapToOnlineTimeoutInHours =
672+
getStoreResponse(pushJobSetting.storeName).getStore().getBootstrapToOnlineTimeoutInHours();
673+
timeoutExecutor.schedule(() -> {
674+
cancel();
675+
throw new VeniceException(
676+
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after "
677+
+ bootstrapToOnlineTimeoutInHours + " hours.");
678+
}, bootstrapToOnlineTimeoutInHours, TimeUnit.HOURS);
679+
692680
if (pushJobSetting.isSourceKafka) {
693681
initKIFRepushDetails();
694682
}
@@ -1214,9 +1202,12 @@ protected InputDataInfoProvider getInputDataInfoProvider() {
12141202
* 2. A mock controller client is provided
12151203
*
12161204
* @param storeName
1217-
* @param sslFactory
12181205
*/
1219-
private void initControllerClient(String storeName, Optional<SSLFactory> sslFactory) {
1206+
private void initControllerClient(String storeName) {
1207+
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
1208+
pushJobSetting.enableSSL,
1209+
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
1210+
this.sslProperties);
12201211
final String controllerD2ZkHost;
12211212
if (pushJobSetting.multiRegion) {
12221213
// In multi region mode, push jobs will communicate with parent controller
@@ -2565,8 +2556,6 @@ private String pushJobPropertiesToString(
25652556
/**
25662557
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
25672558
* the job exceeding bootstrapToOnlineTimeoutInHours.
2568-
*
2569-
* @throws Exception
25702559
*/
25712560
public void cancel() {
25722561
killJob(pushJobSetting, controllerClient);
@@ -2678,6 +2667,7 @@ private static Path getLatestPath(Path path, FileSystem fs) throws IOException {
26782667

26792668
@Override
26802669
public void close() {
2670+
timeoutExecutor.shutdownNow();
26812671
closeVeniceWriter();
26822672
Utils.closeQuietlyWithErrorLogged(dataWriterComputeJob);
26832673
Utils.closeQuietlyWithErrorLogged(controllerClient);

clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,15 +278,21 @@ public void testPushJobSettingWithLivenessHeartbeat() {
278278
}
279279
}
280280

281+
@DataProvider(name = "DataWriterJobClasses")
282+
public Object[][] getDataWriterJobClasses() {
283+
return new Object[][] { { DataWriterMRJob.class }, { DataWriterSparkJob.class } };
284+
}
285+
281286
/**
282287
* Test that VenicePushJob.cancel() is called after bootstrapToOnlineTimeoutInHours is reached.
283288
* UNKNOWN status is returned for pollStatusUntilComplete() to stall the job until cancel() can be called.
284289
*/
285-
@Test
286-
public void testPushJobTimeout() throws Exception {
290+
@Test(dataProvider = "DataWriterJobClasses")
291+
public void testPushJobTimeout(Class<? extends DataWriterComputeJob> dataWriterJobClass) throws Exception {
287292
Properties props = getVpjRequiredProperties();
288293
props.put(KEY_FIELD_PROP, "id");
289294
props.put(VALUE_FIELD_PROP, "name");
295+
props.put(DATA_WRITER_COMPUTE_JOB_CLASS, dataWriterJobClass.getName());
290296
ControllerClient client = getClient();
291297
JobStatusQueryResponse response = mock(JobStatusQueryResponse.class);
292298
doReturn("UNKNOWN").when(response).getStatus();
@@ -312,16 +318,11 @@ public void testPushJobTimeout() throws Exception {
312318
}
313319
}
314320

315-
@DataProvider(name = "DataWriterJobClasses")
316-
public Object[][] getDataWriterJobClasses() {
317-
return new Object[][] { { DataWriterMRJob.class }, { DataWriterSparkJob.class } };
318-
}
319-
320321
/**
321322
* Ensures that the data writer job is killed if the job times out. Uses an Answer to stall the data writer job
322323
* while it's running in order for it to get killed properly.
323324
*/
324-
@Test(timeOut = 5 * Time.MS_PER_SECOND, dataProvider = "DataWriterJobClasses")
325+
@Test(dataProvider = "DataWriterJobClasses")
325326
public void testDataWriterComputeJobTimeout(Class<? extends DataWriterComputeJob> dataWriterJobClass)
326327
throws Exception {
327328
Properties props = getVpjRequiredProperties();

0 commit comments

Comments
 (0)