6
6
package org .apache .spark .sql
7
7
8
8
import java .net .ConnectException
9
- import java .time .Instant
10
- import java .util .Map
11
9
import java .util .concurrent .{ScheduledExecutorService , ScheduledFuture }
12
10
13
11
import scala .concurrent .{ExecutionContext , ExecutionContextExecutor , Future , TimeoutException }
14
12
import scala .concurrent .duration .{Duration , MINUTES , _ }
15
13
import scala .util .{Failure , Success , Try }
16
14
import scala .util .control .NonFatal
17
15
16
+ import org .json4s .native .Serialization
18
17
import org .opensearch .action .get .GetResponse
19
18
import org .opensearch .common .Strings
20
19
import org .opensearch .flint .app .{FlintCommand , FlintInstance }
20
+ import org .opensearch .flint .app .FlintInstance .formats
21
21
import org .opensearch .flint .core .storage .{FlintReader , OpenSearchUpdater }
22
22
23
23
import org .apache .spark .SparkConf
@@ -47,6 +47,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
47
47
private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration (30 , MINUTES )
48
48
private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000
49
49
val INITIAL_DELAY_MILLIS = 3000L
50
+ val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L
50
51
51
52
def update (flintCommand : FlintCommand , updater : OpenSearchUpdater ): Unit = {
52
53
updater.update(flintCommand.statementId, FlintCommand .serialize(flintCommand))
@@ -292,10 +293,11 @@ object FlintREPL extends Logging with FlintJobExecutor {
292
293
var lastActivityTime = currentTimeProvider.currentEpochMillis()
293
294
var verificationResult : VerificationResult = NotVerified
294
295
var canPickUpNextStatement = true
296
+ var lastCanPickCheckTime = 0L
295
297
while (currentTimeProvider
296
298
.currentEpochMillis() - lastActivityTime <= commandContext.inactivityLimitMillis && canPickUpNextStatement) {
297
299
logInfo(
298
- s """ read from ${commandContext.sessionIndex}, sessionId: $commandContext.sessionId """ )
300
+ s """ read from ${commandContext.sessionIndex}, sessionId: ${ commandContext.sessionId} """ )
299
301
val flintReader : FlintReader =
300
302
createQueryReader(
301
303
commandContext.osClient,
@@ -309,18 +311,21 @@ object FlintREPL extends Logging with FlintJobExecutor {
309
311
verificationResult,
310
312
flintReader,
311
313
futureMappingCheck,
312
- executionContext)
313
- val result : (Long , VerificationResult , Boolean ) =
314
+ executionContext,
315
+ lastCanPickCheckTime)
316
+ val result : (Long , VerificationResult , Boolean , Long ) =
314
317
processCommands(commandContext, commandState)
315
318
316
319
val (
317
320
updatedLastActivityTime,
318
321
updatedVerificationResult,
319
- updatedCanPickUpNextStatement) = result
322
+ updatedCanPickUpNextStatement,
323
+ updatedLastCanPickCheckTime) = result
320
324
321
325
lastActivityTime = updatedLastActivityTime
322
326
verificationResult = updatedVerificationResult
323
327
canPickUpNextStatement = updatedCanPickUpNextStatement
328
+ lastCanPickCheckTime = updatedLastCanPickCheckTime
324
329
} finally {
325
330
flintReader.close()
326
331
}
@@ -481,18 +486,27 @@ object FlintREPL extends Logging with FlintJobExecutor {
481
486
482
487
private def processCommands (
483
488
context : CommandContext ,
484
- state : CommandState ): (Long , VerificationResult , Boolean ) = {
489
+ state : CommandState ): (Long , VerificationResult , Boolean , Long ) = {
485
490
import context ._
486
491
import state ._
487
492
488
493
var lastActivityTime = recordedLastActivityTime
489
494
var verificationResult = recordedVerificationResult
490
495
var canProceed = true
491
496
var canPickNextStatementResult = true // Add this line to keep track of canPickNextStatement
497
+ var lastCanPickCheckTime = recordedLastCanPickCheckTime
492
498
493
499
while (canProceed) {
494
- if (! canPickNextStatement(sessionId, jobId, osClient, sessionIndex)) {
495
- canPickNextStatementResult = false
500
+ val currentTime = currentTimeProvider.currentEpochMillis()
501
+
502
+ // Only call canPickNextStatement if EARLY_TERMIANTION_CHECK_FREQUENCY milliseconds have passed
503
+ if (currentTime - lastCanPickCheckTime > EARLY_TERMIANTION_CHECK_FREQUENCY ) {
504
+ canPickNextStatementResult =
505
+ canPickNextStatement(sessionId, jobId, osClient, sessionIndex)
506
+ lastCanPickCheckTime = currentTime
507
+ }
508
+
509
+ if (! canPickNextStatementResult) {
496
510
canProceed = false
497
511
} else if (! flintReader.hasNext) {
498
512
canProceed = false
@@ -524,7 +538,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
524
538
}
525
539
526
540
// return tuple indicating if still active and mapping verification result
527
- (lastActivityTime, verificationResult, canPickNextStatementResult)
541
+ (lastActivityTime, verificationResult, canPickNextStatementResult, lastCanPickCheckTime )
528
542
}
529
543
530
544
/**
@@ -888,20 +902,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
888
902
return // Exit the run method if the thread is interrupted
889
903
}
890
904
891
- val getResponse = osClient.getDoc(sessionIndex, sessionId)
892
- if (getResponse.isExists()) {
893
- val source = getResponse.getSourceAsMap
894
- val flintInstance = FlintInstance .deserializeFromMap(source)
895
- flintInstance.state = " running"
896
- flintSessionUpdater.updateIf(
897
- sessionId,
898
- FlintInstance .serializeWithoutJobId(
899
- flintInstance,
900
- currentTimeProvider.currentEpochMillis()),
901
- getResponse.getSeqNo,
902
- getResponse.getPrimaryTerm)
903
- }
904
- // do nothing if the session doc does not exist
905
+ flintSessionUpdater.upsert(
906
+ sessionId,
907
+ Serialization .write(
908
+ Map (
909
+ " lastUpdateTime" -> currentTimeProvider.currentEpochMillis(),
910
+ " state" -> " running" )))
905
911
} catch {
906
912
case ie : InterruptedException =>
907
913
// Preserve the interrupt status
0 commit comments