164
164
import java .util .Properties ;
165
165
import java .util .Set ;
166
166
import java .util .concurrent .Executors ;
167
- import java .util .concurrent .Future ;
168
167
import java .util .concurrent .ScheduledExecutorService ;
169
168
import java .util .concurrent .TimeUnit ;
170
169
import java .util .stream .Collectors ;
@@ -247,6 +246,7 @@ public class VenicePushJob implements AutoCloseable {
247
246
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory ;
248
247
private PushJobHeartbeatSender pushJobHeartbeatSender = null ;
249
248
private boolean pushJobStatusUploadDisabledHasBeenLogged = false ;
249
+ private final ScheduledExecutorService timeoutExecutor ;
250
250
251
251
/**
252
252
* @param jobId id of the job
@@ -255,6 +255,7 @@ public class VenicePushJob implements AutoCloseable {
255
255
public VenicePushJob (String jobId , Properties vanillaProps ) {
256
256
this .jobId = jobId ;
257
257
this .props = getVenicePropsFromVanillaProps (Objects .requireNonNull (vanillaProps , "VPJ props cannot be null" ));
258
+ this .timeoutExecutor = Executors .newSingleThreadScheduledExecutor ();
258
259
LOGGER .info ("Constructing {}: {}" , VenicePushJob .class .getSimpleName (), props .toString (true ));
259
260
this .sslProperties = Lazy .of (() -> {
260
261
try {
@@ -655,36 +656,23 @@ DataWriterComputeJob getDataWriterComputeJob() {
655
656
* @throws VeniceException
656
657
*/
657
658
public void run () {
658
- Optional <SSLFactory > sslFactory = VPJSSLUtils .createSSLFactory (
659
- pushJobSetting .enableSSL ,
660
- props .getString (SSL_FACTORY_CLASS_NAME , DEFAULT_SSL_FACTORY_CLASS_NAME ),
661
- this .sslProperties );
662
- initControllerClient (pushJobSetting .storeName , sslFactory );
663
-
664
- ScheduledExecutorService timeoutExecutor = Executors .newSingleThreadScheduledExecutor ();
665
- long bootstrapToOnlineTimeoutInHours =
666
- getStoreResponse (pushJobSetting .storeName ).getStore ().getBootstrapToOnlineTimeoutInHours ();
667
- Future <?> timeoutFuture = timeoutExecutor .schedule (() -> {
668
- LOGGER .error ("Timeout reached. Stopping the job." );
669
- cancel ();
670
- }, bootstrapToOnlineTimeoutInHours , TimeUnit .HOURS );
671
-
672
- try {
673
- runPushJob ();
674
- } finally {
675
- timeoutFuture .cancel (true );
676
- timeoutExecutor .shutdown ();
677
- }
678
- }
679
-
680
- private void runPushJob () {
681
659
try {
660
+ initControllerClient (pushJobSetting .storeName );
682
661
pushJobSetting .clusterName = controllerClient .getClusterName ();
683
662
LOGGER .info (
684
663
"The store {} is discovered in Venice cluster {}" ,
685
664
pushJobSetting .storeName ,
686
665
pushJobSetting .clusterName );
687
666
667
+ long bootstrapToOnlineTimeoutInHours =
668
+ getStoreResponse (pushJobSetting .storeName ).getStore ().getBootstrapToOnlineTimeoutInHours ();
669
+ timeoutExecutor .schedule (() -> {
670
+ cancel ();
671
+ throw new VeniceException (
672
+ "Failing push-job for store " + pushJobSetting .storeName + " which is still running after "
673
+ + bootstrapToOnlineTimeoutInHours + " hours." );
674
+ }, bootstrapToOnlineTimeoutInHours , TimeUnit .HOURS );
675
+
688
676
if (pushJobSetting .isSourceKafka ) {
689
677
initKIFRepushDetails ();
690
678
}
@@ -1207,9 +1195,12 @@ protected InputDataInfoProvider getInputDataInfoProvider() {
1207
1195
* 2. A mock controller client is provided
1208
1196
*
1209
1197
* @param storeName
1210
- * @param sslFactory
1211
1198
*/
1212
- private void initControllerClient (String storeName , Optional <SSLFactory > sslFactory ) {
1199
+ private void initControllerClient (String storeName ) {
1200
+ Optional <SSLFactory > sslFactory = VPJSSLUtils .createSSLFactory (
1201
+ pushJobSetting .enableSSL ,
1202
+ props .getString (SSL_FACTORY_CLASS_NAME , DEFAULT_SSL_FACTORY_CLASS_NAME ),
1203
+ this .sslProperties );
1213
1204
final String controllerD2ZkHost ;
1214
1205
if (pushJobSetting .multiRegion ) {
1215
1206
// In multi region mode, push jobs will communicate with parent controller
@@ -2520,8 +2511,6 @@ private String pushJobPropertiesToString(
2520
2511
/**
2521
2512
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
2522
2513
* the job exceeding bootstrapToOnlineTimeoutInHours.
2523
- *
2524
- * @throws Exception
2525
2514
*/
2526
2515
public void cancel () {
2527
2516
killJob (pushJobSetting , controllerClient );
@@ -2633,6 +2622,7 @@ private static Path getLatestPath(Path path, FileSystem fs) throws IOException {
2633
2622
2634
2623
@ Override
2635
2624
public void close () {
2625
+ timeoutExecutor .shutdownNow ();
2636
2626
closeVeniceWriter ();
2637
2627
Utils .closeQuietlyWithErrorLogged (dataWriterComputeJob );
2638
2628
Utils .closeQuietlyWithErrorLogged (controllerClient );
0 commit comments