Skip to content

Commit c42ca87

Browse files
authored
[vpj] Push Job Timeout (#1786)
There should be a timeout on the entire push job as a whole. If exceeded, the push job should be cancelled. 😭
1 parent 5f6b97f commit c42ca87

File tree

5 files changed

+146
-69
lines changed

5 files changed

+146
-69
lines changed

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import com.linkedin.venice.exceptions.ErrorType;
9999
import com.linkedin.venice.exceptions.VeniceException;
100100
import com.linkedin.venice.exceptions.VeniceResourceAccessException;
101+
import com.linkedin.venice.exceptions.VeniceTimeoutException;
101102
import com.linkedin.venice.hadoop.exceptions.VeniceInvalidInputException;
102103
import com.linkedin.venice.hadoop.input.kafka.KafkaInputDictTrainer;
103104
import com.linkedin.venice.hadoop.mapreduce.datawriter.jobs.DataWriterMRJob;
@@ -164,6 +165,8 @@
164165
import java.util.Optional;
165166
import java.util.Properties;
166167
import java.util.Set;
168+
import java.util.concurrent.Executors;
169+
import java.util.concurrent.ScheduledExecutorService;
167170
import java.util.concurrent.TimeUnit;
168171
import java.util.stream.Collectors;
169172
import org.apache.avro.Schema;
@@ -245,6 +248,7 @@ public class VenicePushJob implements AutoCloseable {
245248
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
246249
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
247250
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;
251+
private final ScheduledExecutorService timeoutExecutor;
248252

249253
/**
250254
* @param jobId id of the job
@@ -253,6 +257,7 @@ public class VenicePushJob implements AutoCloseable {
253257
public VenicePushJob(String jobId, Properties vanillaProps) {
254258
this.jobId = jobId;
255259
this.props = getVenicePropsFromVanillaProps(Objects.requireNonNull(vanillaProps, "VPJ props cannot be null"));
260+
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
256261
LOGGER.info("Constructing {}: {}", VenicePushJob.class.getSimpleName(), props.toString(true));
257262
this.sslProperties = Lazy.of(() -> {
258263
try {
@@ -657,11 +662,7 @@ DataWriterComputeJob getDataWriterComputeJob() {
657662
*/
658663
public void run() {
659664
try {
660-
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
661-
pushJobSetting.enableSSL,
662-
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
663-
this.sslProperties);
664-
initControllerClient(pushJobSetting.storeName, sslFactory);
665+
initControllerClient(pushJobSetting.storeName);
665666
pushJobSetting.clusterName = controllerClient.getClusterName();
666667
LOGGER.info(
667668
"The store {} is discovered in Venice cluster {}",
@@ -672,6 +673,7 @@ public void run() {
672673
initKIFRepushDetails();
673674
}
674675

676+
setupJobTimeoutMonitor();
675677
initPushJobDetails();
676678
logGreeting();
677679
sendPushJobDetailsToController();
@@ -887,6 +889,20 @@ public void run() {
887889
}
888890
}
889891

892+
/**
893+
* Timeout on the entire push job that kills the job if it runs longer than the store's configured bootstrap timeout.
894+
*/
895+
private void setupJobTimeoutMonitor() {
896+
long bootstrapToOnlineTimeoutInHours =
897+
getStoreResponse(pushJobSetting.storeName).getStore().getBootstrapToOnlineTimeoutInHours();
898+
timeoutExecutor.schedule(() -> {
899+
cancel();
900+
throw new VeniceTimeoutException(
901+
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after "
902+
+ bootstrapToOnlineTimeoutInHours + " hours.");
903+
}, bootstrapToOnlineTimeoutInHours, TimeUnit.HOURS);
904+
}
905+
890906
private void buildHDFSSchemaDir() throws IOException {
891907
// Build the full path for HDFSRmdSchemaSource:
892908
// RMD schemas: <job_temp_dir>/rmd_schemas
@@ -1199,9 +1215,12 @@ protected InputDataInfoProvider getInputDataInfoProvider() {
11991215
* 2. A mock controller client is provided
12001216
*
12011217
* @param storeName
1202-
* @param sslFactory
12031218
*/
1204-
private void initControllerClient(String storeName, Optional<SSLFactory> sslFactory) {
1219+
private void initControllerClient(String storeName) {
1220+
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
1221+
pushJobSetting.enableSSL,
1222+
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
1223+
this.sslProperties);
12051224
final String controllerD2ZkHost;
12061225
if (pushJobSetting.multiRegion) {
12071226
// In multi region mode, push jobs will communicate with parent controller
@@ -2312,7 +2331,6 @@ void pollStatusUntilComplete(
23122331
* no more than {@link DEFAULT_JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS}.
23132332
*/
23142333
long unknownStateStartTimeMs = 0;
2315-
long pollStartTimeMs = System.currentTimeMillis();
23162334

23172335
String topicToMonitor = getTopicToMonitor(pushJobSetting);
23182336

@@ -2382,14 +2400,6 @@ void pollStatusUntilComplete(
23822400
}
23832401
return;
23842402
}
2385-
long bootstrapToOnlineTimeoutInHours =
2386-
VenicePushJob.this.pushJobSetting.storeResponse.getStore().getBootstrapToOnlineTimeoutInHours();
2387-
long durationMs = LatencyUtils.getElapsedTimeFromMsToMs(pollStartTimeMs);
2388-
if (durationMs > TimeUnit.HOURS.toMillis(bootstrapToOnlineTimeoutInHours)) {
2389-
throw new VeniceException(
2390-
"Failing push-job for store " + VenicePushJob.this.pushJobSetting.storeResponse.getName()
2391-
+ " which is still running after " + TimeUnit.MILLISECONDS.toHours(durationMs) + " hours.");
2392-
}
23932403
if (!overallStatus.equals(ExecutionStatus.UNKNOWN)) {
23942404
unknownStateStartTimeMs = 0;
23952405
} else if (unknownStateStartTimeMs == 0) {
@@ -2557,9 +2567,8 @@ private String pushJobPropertiesToString(
25572567
}
25582568

25592569
/**
2560-
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions.
2561-
*
2562-
* @throws Exception
2570+
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
2571+
* the job exceeding bootstrapToOnlineTimeoutInHours.
25632572
*/
25642573
public void cancel() {
25652574
killJob(pushJobSetting, controllerClient);
@@ -2572,7 +2581,7 @@ public void cancel() {
25722581
sendPushJobDetailsToController();
25732582
}
25742583

2575-
private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
2584+
void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
25762585
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
25772586
killDataWriterJob();
25782587
if (!pushJobSetting.isIncrementalPush) {
@@ -2597,7 +2606,7 @@ private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerC
25972606
}
25982607
}
25992608

2600-
private void killDataWriterJob() {
2609+
void killDataWriterJob() {
26012610
if (dataWriterComputeJob == null) {
26022611
LOGGER.warn("No op to kill a null data writer job");
26032612
return;
@@ -2671,6 +2680,7 @@ private static Path getLatestPath(Path path, FileSystem fs) throws IOException {
26712680

26722681
@Override
26732682
public void close() {
2683+
timeoutExecutor.shutdownNow();
26742684
closeVeniceWriter();
26752685
Utils.closeQuietlyWithErrorLogged(dataWriterComputeJob);
26762686
Utils.closeQuietlyWithErrorLogged(controllerClient);
@@ -2684,7 +2694,6 @@ public void close() {
26842694
}
26852695

26862696
public static void main(String[] args) {
2687-
26882697
if (args.length != 1) {
26892698
Utils.exit("USAGE: java -jar venice-push-job-all.jar <VPJ_config_file_path>");
26902699
}

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public DataWriterTaskTracker getTaskTracker() {
328328
}
329329

330330
@Override
331-
protected void runComputeJob() {
331+
public void runComputeJob() {
332332
LOGGER.info("Triggering MR job for data writer");
333333
try {
334334
runningJob = JobUtils.runJobWithConfig(jobConf, jobClientWrapper);
@@ -346,6 +346,7 @@ public PushJobSetting getPushJobSetting() {
346346
public void kill() {
347347
if (runningJob == null) {
348348
LOGGER.warn("No op to kill a null running job");
349+
super.kill();
349350
return;
350351
}
351352
try {

clients/venice-push-job/src/main/java/com/linkedin/venice/jobs/DataWriterComputeJob.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.linkedin.venice.ConfigKeys.PASS_THROUGH_CONFIG_PREFIXES_LIST_KEY;
44

5+
import com.google.common.annotations.VisibleForTesting;
56
import com.linkedin.venice.ConfigKeys;
67
import com.linkedin.venice.exceptions.VeniceException;
78
import com.linkedin.venice.hadoop.PushJobSetting;
@@ -88,7 +89,8 @@ public static void populateWithPassThroughConfigs(VeniceProperties props, Config
8889

8990
public abstract DataWriterTaskTracker getTaskTracker();
9091

91-
protected void validateJob() {
92+
@VisibleForTesting
93+
public void validateJob() {
9294
DataWriterTaskTracker dataWriterTaskTracker = getTaskTracker();
9395
if (dataWriterTaskTracker == null) {
9496
throw new VeniceException("DataWriterTaskTracker is not set. Unable to validate the job status.");
@@ -160,7 +162,8 @@ protected void validateJob() {
160162
}
161163
}
162164

163-
protected abstract void runComputeJob();
165+
@VisibleForTesting
166+
public abstract void runComputeJob();
164167

165168
@Override
166169
public void configure(VeniceProperties properties) {

clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public PushJobSetting getPushJobSetting() {
323323
}
324324

325325
@Override
326-
protected void runComputeJob() {
326+
public void runComputeJob() {
327327
// Load data from input path
328328
Dataset<Row> dataFrame = getInputDataFrame();
329329
validateDataFrame(dataFrame);

0 commit comments

Comments
 (0)