Skip to content

[vpj] Push Job Timeout #1786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8ebedc2
Removed old `bootstrapToOnlineOnlineTimeoutInHours` usage since it's …
KaiSernLim May 6, 2025
dddea75
First attempt at adding a timeout to cancel the job. 🌗
KaiSernLim May 6, 2025
82a2c2d
Implement timeout handling for VenicePushJob and add unit test for ti…
KaiSernLim May 6, 2025
cf391b1
Two unit tests for verification. 🚏
KaiSernLim May 9, 2025
387ff49
Fixed the unit test to actually test something meaningful. 🎤
KaiSernLim May 9, 2025
12902db
Fixed NPE from `storeResponse` not being populated. 🦆
KaiSernLim May 9, 2025
3d1f33d
Fixed Spotbugs warning in test. 🥱
KaiSernLim May 9, 2025
7cc99a3
Fixed `controllerClient` NPE. 🐻‍❄️
KaiSernLim May 10, 2025
e02f96c
Refactored `timeoutExecutor` to be a member variable. 🫩
KaiSernLim May 12, 2025
210c28c
Minor review changes by Nisarg. 🪡
KaiSernLim May 20, 2025
eac4a66
Try setting `bootstrapToOnlineTimeoutInHours` directly, instead of re…
KaiSernLim May 21, 2025
5540f7f
Revert "Try setting `bootstrapToOnlineTimeoutInHours` directly, inste…
KaiSernLim May 21, 2025
2f9f9fe
Try changing the condition in the test. 😪
KaiSernLim May 21, 2025
07d8ab9
Only check `KILLED` status in unit test. 🤖
KaiSernLim May 21, 2025
eca6ef3
I can't tell which branches the unit test is going into in CI. 😭
KaiSernLim May 22, 2025
74da931
Is the latch even counting down? 😭
KaiSernLim May 22, 2025
e96fd4a
Is the latch even counting down? 😭
KaiSernLim May 22, 2025
e305f09
Try reordering the unit tests? 😭
KaiSernLim May 22, 2025
6da0bcd
It was the `configure()` taking a long time, wasn't it? 🤪
KaiSernLim May 22, 2025
545f841
Added the finishing touches.
KaiSernLim May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceResourceAccessException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.hadoop.exceptions.VeniceInvalidInputException;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputDictTrainer;
import com.linkedin.venice.hadoop.mapreduce.datawriter.jobs.DataWriterMRJob;
Expand Down Expand Up @@ -164,6 +165,8 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -245,6 +248,7 @@ public class VenicePushJob implements AutoCloseable {
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;
private final ScheduledExecutorService timeoutExecutor;

/**
* @param jobId id of the job
Expand All @@ -253,6 +257,7 @@ public class VenicePushJob implements AutoCloseable {
public VenicePushJob(String jobId, Properties vanillaProps) {
this.jobId = jobId;
this.props = getVenicePropsFromVanillaProps(Objects.requireNonNull(vanillaProps, "VPJ props cannot be null"));
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
LOGGER.info("Constructing {}: {}", VenicePushJob.class.getSimpleName(), props.toString(true));
this.sslProperties = Lazy.of(() -> {
try {
Expand Down Expand Up @@ -657,11 +662,7 @@ DataWriterComputeJob getDataWriterComputeJob() {
*/
public void run() {
try {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
this.sslProperties);
initControllerClient(pushJobSetting.storeName, sslFactory);
initControllerClient(pushJobSetting.storeName);
pushJobSetting.clusterName = controllerClient.getClusterName();
LOGGER.info(
"The store {} is discovered in Venice cluster {}",
Expand All @@ -672,6 +673,7 @@ public void run() {
initKIFRepushDetails();
}

setupJobTimeoutMonitor();
initPushJobDetails();
logGreeting();
sendPushJobDetailsToController();
Expand Down Expand Up @@ -881,6 +883,20 @@ public void run() {
}
}

/**
* Timeout on the entire push job that kills the job if it runs longer than the store's configured bootstrap timeout.
*/
private void setupJobTimeoutMonitor() {
long bootstrapToOnlineTimeoutInHours =
getStoreResponse(pushJobSetting.storeName).getStore().getBootstrapToOnlineTimeoutInHours();
timeoutExecutor.schedule(() -> {
cancel();
throw new VeniceTimeoutException(
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after "
+ bootstrapToOnlineTimeoutInHours + " hours.");
}, bootstrapToOnlineTimeoutInHours, TimeUnit.HOURS);
}

private void buildHDFSSchemaDir() throws IOException {
// Build the full path for HDFSRmdSchemaSource:
// RMD schemas: <job_temp_dir>/rmd_schemas
Expand Down Expand Up @@ -1193,9 +1209,12 @@ protected InputDataInfoProvider getInputDataInfoProvider() {
* 2. A mock controller client is provided
*
* @param storeName
* @param sslFactory
*/
private void initControllerClient(String storeName, Optional<SSLFactory> sslFactory) {
private void initControllerClient(String storeName) {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
this.sslProperties);
final String controllerD2ZkHost;
if (pushJobSetting.multiRegion) {
// In multi region mode, push jobs will communicate with parent controller
Expand Down Expand Up @@ -2306,7 +2325,6 @@ void pollStatusUntilComplete(
* no more than {@link DEFAULT_JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS}.
*/
long unknownStateStartTimeMs = 0;
long pollStartTimeMs = System.currentTimeMillis();

String topicToMonitor = getTopicToMonitor(pushJobSetting);

Expand Down Expand Up @@ -2376,14 +2394,6 @@ void pollStatusUntilComplete(
}
return;
}
long bootstrapToOnlineTimeoutInHours =
VenicePushJob.this.pushJobSetting.storeResponse.getStore().getBootstrapToOnlineTimeoutInHours();
long durationMs = LatencyUtils.getElapsedTimeFromMsToMs(pollStartTimeMs);
if (durationMs > TimeUnit.HOURS.toMillis(bootstrapToOnlineTimeoutInHours)) {
throw new VeniceException(
"Failing push-job for store " + VenicePushJob.this.pushJobSetting.storeResponse.getName()
+ " which is still running after " + TimeUnit.MILLISECONDS.toHours(durationMs) + " hours.");
}
if (!overallStatus.equals(ExecutionStatus.UNKNOWN)) {
unknownStateStartTimeMs = 0;
} else if (unknownStateStartTimeMs == 0) {
Expand Down Expand Up @@ -2551,9 +2561,8 @@ private String pushJobPropertiesToString(
}

/**
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions.
*
* @throws Exception
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
* the job exceeding bootstrapToOnlineTimeoutInHours.
*/
public void cancel() {
killJob(pushJobSetting, controllerClient);
Expand All @@ -2566,7 +2575,7 @@ public void cancel() {
sendPushJobDetailsToController();
}

private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
killDataWriterJob();
if (!pushJobSetting.isIncrementalPush) {
Expand All @@ -2591,7 +2600,7 @@ private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerC
}
}

private void killDataWriterJob() {
void killDataWriterJob() {
if (dataWriterComputeJob == null) {
LOGGER.warn("No op to kill a null data writer job");
return;
Expand Down Expand Up @@ -2665,6 +2674,7 @@ private static Path getLatestPath(Path path, FileSystem fs) throws IOException {

@Override
public void close() {
timeoutExecutor.shutdownNow();
closeVeniceWriter();
Utils.closeQuietlyWithErrorLogged(dataWriterComputeJob);
Utils.closeQuietlyWithErrorLogged(controllerClient);
Expand All @@ -2678,7 +2688,6 @@ public void close() {
}

public static void main(String[] args) {

if (args.length != 1) {
Utils.exit("USAGE: java -jar venice-push-job-all.jar <VPJ_config_file_path>");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public DataWriterTaskTracker getTaskTracker() {
}

@Override
protected void runComputeJob() {
public void runComputeJob() {
LOGGER.info("Triggering MR job for data writer");
try {
runningJob = JobUtils.runJobWithConfig(jobConf, jobClientWrapper);
Expand All @@ -346,6 +346,7 @@ public PushJobSetting getPushJobSetting() {
public void kill() {
if (runningJob == null) {
LOGGER.warn("No op to kill a null running job");
super.kill();
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.ConfigKeys.PASS_THROUGH_CONFIG_PREFIXES_LIST_KEY;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.PushJobSetting;
Expand Down Expand Up @@ -88,7 +89,8 @@ public static void populateWithPassThroughConfigs(VeniceProperties props, Config

public abstract DataWriterTaskTracker getTaskTracker();

protected void validateJob() {
@VisibleForTesting
public void validateJob() {
DataWriterTaskTracker dataWriterTaskTracker = getTaskTracker();
if (dataWriterTaskTracker == null) {
throw new VeniceException("DataWriterTaskTracker is not set. Unable to validate the job status.");
Expand Down Expand Up @@ -160,7 +162,8 @@ protected void validateJob() {
}
}

protected abstract void runComputeJob();
@VisibleForTesting
public abstract void runComputeJob();

@Override
public void configure(VeniceProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public PushJobSetting getPushJobSetting() {
}

@Override
protected void runComputeJob() {
public void runComputeJob() {
// Load data from input path
Dataset<Row> dataFrame = getInputDataFrame();
validateDataFrame(dataFrame);
Expand Down
Loading
Loading