Skip to content

[vpj] Push Job Timeout #1786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8ebedc2
Removed old `bootstrapToOnlineOnlineTimeoutInHours` usage since it's …
KaiSernLim May 6, 2025
dddea75
First attempt at adding a timeout to cancel the job. 🌗
KaiSernLim May 6, 2025
82a2c2d
Implement timeout handling for VenicePushJob and add unit test for ti…
KaiSernLim May 6, 2025
cf391b1
Two unit tests for verification. 🚏
KaiSernLim May 9, 2025
387ff49
Fixed the unit test to actually test something meaningful. 🎤
KaiSernLim May 9, 2025
12902db
Fixed NPE from `storeResponse` not being populated. 🦆
KaiSernLim May 9, 2025
3d1f33d
Fixed Spotbugs warning in test. 🥱
KaiSernLim May 9, 2025
7cc99a3
Fixed `controllerClient` NPE. 🐻‍❄️
KaiSernLim May 10, 2025
e02f96c
Refactored `timeoutExecutor` to be a member variable. 🫩
KaiSernLim May 12, 2025
210c28c
Minor review changes by Nisarg. 🪡
KaiSernLim May 20, 2025
eac4a66
Try setting `bootstrapToOnlineTimeoutInHours` directly, instead of re…
KaiSernLim May 21, 2025
5540f7f
Revert "Try setting `bootstrapToOnlineTimeoutInHours` directly, inste…
KaiSernLim May 21, 2025
2f9f9fe
Try changing the condition in the test. 😪
KaiSernLim May 21, 2025
07d8ab9
Only check `KILLED` status in unit test. 🤖
KaiSernLim May 21, 2025
eca6ef3
I can't tell which branches the unit test is going into in CI. 😭
KaiSernLim May 22, 2025
74da931
Is the latch even counting down? 😭
KaiSernLim May 22, 2025
e96fd4a
Is the latch even counting down? 😭
KaiSernLim May 22, 2025
e305f09
Try reordering the unit tests? 😭
KaiSernLim May 22, 2025
6da0bcd
It was the `configure()` taking a long time, wasn't it? 🤪
KaiSernLim May 22, 2025
545f841
Added the finishing touches.
KaiSernLim May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -245,6 +247,7 @@ public class VenicePushJob implements AutoCloseable {
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;
private final ScheduledExecutorService timeoutExecutor;

/**
* @param jobId id of the job
Expand All @@ -253,6 +256,7 @@ public class VenicePushJob implements AutoCloseable {
public VenicePushJob(String jobId, Properties vanillaProps) {
this.jobId = jobId;
this.props = getVenicePropsFromVanillaProps(Objects.requireNonNull(vanillaProps, "VPJ props cannot be null"));
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
LOGGER.info("Constructing {}: {}", VenicePushJob.class.getSimpleName(), props.toString(true));
this.sslProperties = Lazy.of(() -> {
try {
Expand Down Expand Up @@ -657,17 +661,22 @@ DataWriterComputeJob getDataWriterComputeJob() {
*/
public void run() {
try {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
this.sslProperties);
initControllerClient(pushJobSetting.storeName, sslFactory);
initControllerClient(pushJobSetting.storeName);
pushJobSetting.clusterName = controllerClient.getClusterName();
LOGGER.info(
"The store {} is discovered in Venice cluster {}",
pushJobSetting.storeName,
pushJobSetting.clusterName);

long bootstrapToOnlineTimeoutInHours =
getStoreResponse(pushJobSetting.storeName).getStore().getBootstrapToOnlineTimeoutInHours();
timeoutExecutor.schedule(() -> {
cancel();
throw new VeniceException(
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after "
+ bootstrapToOnlineTimeoutInHours + " hours.");
}, bootstrapToOnlineTimeoutInHours, TimeUnit.HOURS);

if (pushJobSetting.isSourceKafka) {
initKIFRepushDetails();
}
Expand Down Expand Up @@ -1193,9 +1202,12 @@ protected InputDataInfoProvider getInputDataInfoProvider() {
* 2. A mock controller client is provided
*
* @param storeName
* @param sslFactory
*/
private void initControllerClient(String storeName, Optional<SSLFactory> sslFactory) {
private void initControllerClient(String storeName) {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME),
this.sslProperties);
final String controllerD2ZkHost;
if (pushJobSetting.multiRegion) {
// In multi region mode, push jobs will communicate with parent controller
Expand Down Expand Up @@ -2306,7 +2318,6 @@ void pollStatusUntilComplete(
* no more than {@link DEFAULT_JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS}.
*/
long unknownStateStartTimeMs = 0;
long pollStartTimeMs = System.currentTimeMillis();

String topicToMonitor = getTopicToMonitor(pushJobSetting);

Expand Down Expand Up @@ -2376,14 +2387,6 @@ void pollStatusUntilComplete(
}
return;
}
long bootstrapToOnlineTimeoutInHours =
VenicePushJob.this.pushJobSetting.storeResponse.getStore().getBootstrapToOnlineTimeoutInHours();
long durationMs = LatencyUtils.getElapsedTimeFromMsToMs(pollStartTimeMs);
if (durationMs > TimeUnit.HOURS.toMillis(bootstrapToOnlineTimeoutInHours)) {
throw new VeniceException(
"Failing push-job for store " + VenicePushJob.this.pushJobSetting.storeResponse.getName()
+ " which is still running after " + TimeUnit.MILLISECONDS.toHours(durationMs) + " hours.");
}
if (!overallStatus.equals(ExecutionStatus.UNKNOWN)) {
unknownStateStartTimeMs = 0;
} else if (unknownStateStartTimeMs == 0) {
Expand Down Expand Up @@ -2551,9 +2554,8 @@ private String pushJobPropertiesToString(
}

/**
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions.
*
* @throws Exception
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
* the job exceeding bootstrapToOnlineTimeoutInHours.
*/
public void cancel() {
killJob(pushJobSetting, controllerClient);
Expand All @@ -2566,7 +2568,7 @@ public void cancel() {
sendPushJobDetailsToController();
}

private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
killDataWriterJob();
if (!pushJobSetting.isIncrementalPush) {
Expand All @@ -2591,7 +2593,7 @@ private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerC
}
}

private void killDataWriterJob() {
void killDataWriterJob() {
if (dataWriterComputeJob == null) {
LOGGER.warn("No op to kill a null data writer job");
return;
Expand Down Expand Up @@ -2665,6 +2667,7 @@ private static Path getLatestPath(Path path, FileSystem fs) throws IOException {

@Override
public void close() {
timeoutExecutor.shutdownNow();
closeVeniceWriter();
Utils.closeQuietlyWithErrorLogged(dataWriterComputeJob);
Utils.closeQuietlyWithErrorLogged(controllerClient);
Expand All @@ -2678,7 +2681,6 @@ public void close() {
}

public static void main(String[] args) {

if (args.length != 1) {
Utils.exit("USAGE: java -jar venice-push-job-all.jar <VPJ_config_file_path>");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public DataWriterTaskTracker getTaskTracker() {
}

@Override
protected void runComputeJob() {
public void runComputeJob() {
LOGGER.info("Triggering MR job for data writer");
try {
runningJob = JobUtils.runJobWithConfig(jobConf, jobClientWrapper);
Expand All @@ -346,6 +346,7 @@ public PushJobSetting getPushJobSetting() {
public void kill() {
if (runningJob == null) {
LOGGER.warn("No op to kill a null running job");
super.kill();
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.ConfigKeys.PASS_THROUGH_CONFIG_PREFIXES_LIST_KEY;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.PushJobSetting;
Expand Down Expand Up @@ -88,7 +89,8 @@ public static void populateWithPassThroughConfigs(VeniceProperties props, Config

public abstract DataWriterTaskTracker getTaskTracker();

protected void validateJob() {
@VisibleForTesting
public void validateJob() {
DataWriterTaskTracker dataWriterTaskTracker = getTaskTracker();
if (dataWriterTaskTracker == null) {
throw new VeniceException("DataWriterTaskTracker is not set. Unable to validate the job status.");
Expand Down Expand Up @@ -160,7 +162,8 @@ protected void validateJob() {
}
}

protected abstract void runComputeJob();
@VisibleForTesting
public abstract void runComputeJob();

@Override
public void configure(VeniceProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public PushJobSetting getPushJobSetting() {
}

@Override
protected void runComputeJob() {
public void runComputeJob() {
// Load data from input path
Dataset<Row> dataFrame = getInputDataFrame();
validateDataFrame(dataFrame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V1_UPDATE_SCHEMA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.CONTROLLER_REQUEST_RETRY_ATTEMPTS;
import static com.linkedin.venice.vpj.VenicePushJobConstants.D2_ZK_HOSTS_PREFIX;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DATA_WRITER_COMPUTE_JOB_CLASS;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP;
Expand Down Expand Up @@ -38,6 +39,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -71,7 +73,9 @@
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.exceptions.VeniceValidationException;
import com.linkedin.venice.hadoop.mapreduce.datawriter.jobs.DataWriterMRJob;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.jobs.DataWriterComputeJob;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.MaterializedViewParameters;
Expand All @@ -83,6 +87,7 @@
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob;
import com.linkedin.venice.status.PushJobDetailsStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.utils.DataProviderUtils;
Expand All @@ -96,12 +101,15 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -270,60 +278,110 @@
}
}

@Test
public void testPushJobPollStatus() {
Properties vpjProps = new Properties();
vpjProps.setProperty(HEARTBEAT_ENABLED_CONFIG.getConfigName(), "true");
ControllerClient client = mock(ControllerClient.class);
@DataProvider(name = "DataWriterJobClasses")
public Object[][] getDataWriterJobClasses() {
return new Object[][] { { DataWriterMRJob.class }, { DataWriterSparkJob.class } };
}

/**
* Test that VenicePushJob.cancel() is called after bootstrapToOnlineTimeoutInHours is reached.
* UNKNOWN status is returned for pollStatusUntilComplete() to stall the job until cancel() can be called.
*/
@Test(dataProvider = "DataWriterJobClasses")
public void testPushJobTimeout(Class<? extends DataWriterComputeJob> dataWriterJobClass) throws Exception {
Properties props = getVpjRequiredProperties();
props.put(KEY_FIELD_PROP, "id");
props.put(VALUE_FIELD_PROP, "name");
props.put(DATA_WRITER_COMPUTE_JOB_CLASS, dataWriterJobClass.getName());
ControllerClient client = getClient();
JobStatusQueryResponse response = mock(JobStatusQueryResponse.class);
doReturn("UNKNOWN").when(response).getStatus();
doReturn(response).when(client).queryOverallJobStatus(anyString(), eq(Optional.empty()), eq(null), anyBoolean());
try (VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client)) {
doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), any(), anyBoolean());
doReturn(response).when(client).killOfflinePushJob(anyString());

try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
pushJobSetting.jobStatusInUnknownStateTimeoutMs = 10;
Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled);
pushJobSetting.version = 1;
pushJobSetting.topic = "abc";
pushJobSetting.storeResponse = new StoreResponse();
pushJobSetting.storeResponse.setName("abc");
pushJobSetting.jobStatusInUnknownStateTimeoutMs = 100; // give some time for the timeout to run on the executor
StoreInfo storeInfo = new StoreInfo();
storeInfo.setBootstrapToOnlineTimeoutInHours(0);
pushJobSetting.storeResponse = new StoreResponse();
pushJobSetting.storeResponse.setStore(storeInfo);
VeniceException exception = Assert.expectThrows(
VeniceException.class,
() -> pushJob.pollStatusUntilComplete(null, client, pushJobSetting, null, false, false));
Assert
.assertEquals(exception.getMessage(), "Failing push-job for store abc which is still running after 0 hours.");
skipVPJValidation(pushJob);
try {
pushJob.run();
fail("Test should fail because pollStatusUntilComplete() never saw COMPLETE status, but doesn't.");
} catch (VeniceException e) {
Assert.assertTrue(e.getMessage().contains("push job is still in unknown state."));
}
verify(pushJob, atLeast(1)).cancel();
verify(pushJob, atLeast(1)).killDataWriterJob();
}
}

@Test
public void testPushJobUnknownPollStatusDoesWaiting() {
Properties vpjProps = new Properties();
vpjProps.setProperty(HEARTBEAT_ENABLED_CONFIG.getConfigName(), "true");
ControllerClient client = mock(ControllerClient.class);
JobStatusQueryResponse unknownResponse = mock(JobStatusQueryResponse.class);
doReturn("UNKNOWN").when(unknownResponse).getStatus();
JobStatusQueryResponse completedResponse = mock(JobStatusQueryResponse.class);
doReturn("COMPLETED").when(completedResponse).getStatus();
doReturn(unknownResponse).doReturn(unknownResponse)
.doReturn(completedResponse)
.when(client)
.queryOverallJobStatus(anyString(), eq(Optional.empty()), eq(null), anyBoolean());
try (VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client)) {
/**
* Ensures that the data writer job is killed if the job times out. Uses an Answer to stall the data writer job
* while it's running in order for it to get killed properly.
*/
@Test(dataProvider = "DataWriterJobClasses")
public void testDataWriterComputeJobTimeout(Class<? extends DataWriterComputeJob> dataWriterJobClass)
throws Exception {
Properties props = getVpjRequiredProperties();
props.put(KEY_FIELD_PROP, "id");
props.put(VALUE_FIELD_PROP, "name");
props.put(DATA_WRITER_COMPUTE_JOB_CLASS, dataWriterJobClass.getName());
ControllerClient client = getClient();
JobStatusQueryResponse response = mock(JobStatusQueryResponse.class);
doReturn("SUCCESS").when(response).getStatus();
doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), any(), anyBoolean());
doReturn(response).when(client).killOfflinePushJob(anyString());

try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) {
StoreInfo storeInfo = new StoreInfo();
storeInfo.setBootstrapToOnlineTimeoutInHours(0);
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
pushJobSetting.jobStatusInUnknownStateTimeoutMs = 100_000_000;
Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled);
pushJobSetting.version = 1;
pushJobSetting.topic = "abc";
pushJobSetting.storeResponse = new StoreResponse();
pushJobSetting.storeResponse.setName("abc");
StoreInfo storeInfo = new StoreInfo();
storeInfo.setBootstrapToOnlineTimeoutInHours(10);
pushJobSetting.storeResponse.setStore(storeInfo);
pushJob.pollStatusUntilComplete(null, client, pushJobSetting, null, false, false);
} catch (Exception e) {
fail("The test should be completed successfully without any timeout exception");
CountDownLatch runningJobLatch = new CountDownLatch(1);
CountDownLatch killedJobLatch = new CountDownLatch(1);
skipVPJValidation(pushJob);

/*
* 1. Data writer job starts and status is set to RUNNING.
* 2. Timeout thread kills the data writer job and status is set to KILLED.
* The latch is used to stall the validateJob() method until the data writer job is killed.
*/
Answer<Void> stallDataWriterJob = invocation -> {
// At this point, the data writer job status is already set to RUNNING.
runningJobLatch.countDown(); // frees VenicePushJob.killJob()
if (!killedJobLatch.await(5, TimeUnit.SECONDS)) { // waits for this data writer job to be killed
fail("Timed out waiting for the data writer job to be killed.");
}
throw new VeniceException("No data found at source path");
};

Answer<Void> killDataWriterJob = invocation -> {
if (!runningJobLatch.await(5, TimeUnit.SECONDS)) { // waits for job status to be set to RUNNING
fail("Timed out waiting for the data writer job status to be set to RUNNING");
}
pushJob.killDataWriterJob(); // sets job status to KILLED
killedJobLatch.countDown(); // frees DataWriterComputeJob.runComputeJob()
return null;
};

try {
doCallRealMethod().when(pushJob).runJobAndUpdateStatus();
DataWriterComputeJob dataWriterJob = spy(pushJob.getDataWriterComputeJob());
pushJob.setDataWriterComputeJob(dataWriterJob);
doNothing().when(dataWriterJob).validateJob();
doAnswer(stallDataWriterJob).when(dataWriterJob).runComputeJob();
doAnswer(killDataWriterJob).when(pushJob).killJob(any(), any());
pushJob.run(); // data writer job will run in this main test thread
} catch (VeniceException e) {
// Expected, because the data writer job is not configured to run successfully in this unit test environment
}
verify(pushJob, atLeast(1)).cancel();
verify(pushJob, atLeast(1)).killDataWriterJob();
assertEquals(pushJob.getDataWriterComputeJob().getStatus(), DataWriterComputeJob.Status.KILLED);

Check failure on line 384 in clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

VenicePushJobTest.testDataWriterComputeJobTimeout[1](class com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob)

java.lang.AssertionError: expected [KILLED] but found [FAILED]

Check failure on line 384 in clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

VenicePushJobTest.testDataWriterComputeJobTimeout[1](class com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob)

java.lang.AssertionError: expected [KILLED] but found [FAILED]

Check failure on line 384 in clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

VenicePushJobTest.testDataWriterComputeJobTimeout[1](class com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob)

java.lang.AssertionError: expected [KILLED] but found [FAILED]

Check failure on line 384 in clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

VenicePushJobTest.testDataWriterComputeJobTimeout[1](class com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob)

java.lang.AssertionError: expected [KILLED] but found [FAILED]
}
}

Expand Down
Loading