@@ -16,6 +16,7 @@ import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
16
16
17
17
import org .apache .spark .internal .Logging
18
18
import org .apache .spark .sql .SparkSession
19
+ import org .apache .spark .sql .flint .config .FlintSparkConf
19
20
import org .apache .spark .sql .flint .newDaemonThreadPoolScheduledExecutor
20
21
21
22
/**
@@ -34,43 +35,32 @@ class FlintSparkIndexMonitor(
34
35
dataSourceName : String )
35
36
extends Logging {
36
37
38
+ /** Task execution initial delay in seconds */
39
+ private val INITIAL_DELAY_SECONDS = FlintSparkConf ().monitorInitialDelaySeconds()
40
+
41
+ /** Task execution interval in seconds */
42
+ private val INTERVAL_SECONDS = FlintSparkConf ().monitorIntervalSeconds()
43
+
44
+ /** Max error count allowed */
45
+ private val MAX_ERROR_COUNT = FlintSparkConf ().monitorMaxErrorCount()
46
+
37
47
/**
38
48
* Start monitoring task on the given Flint index.
39
49
*
40
50
* @param indexName
41
51
* Flint index name
42
52
*/
43
53
def startMonitor (indexName : String ): Unit = {
44
- val task = FlintSparkIndexMonitor .executor.scheduleWithFixedDelay(
45
- () => {
46
- logInfo(s " Scheduler trigger index monitor task for $indexName" )
47
- try {
48
- if (isStreamingJobActive(indexName)) {
49
- logInfo(" Streaming job is still active" )
50
- flintClient
51
- .startTransaction(indexName, dataSourceName)
52
- .initialLog(latest => latest.state == REFRESHING )
53
- .finalLog(latest => latest) // timestamp will update automatically
54
- .commit(_ => {})
55
- } else {
56
- logError(" Streaming job is not active. Cancelling monitor task" )
57
- flintClient
58
- .startTransaction(indexName, dataSourceName)
59
- .initialLog(_ => true )
60
- .finalLog(latest => latest.copy(state = FAILED ))
61
- .commit(_ => {})
54
+ logInfo(s """ Starting index monitor for $indexName with configuration:
55
+ | - Initial delay: $INITIAL_DELAY_SECONDS seconds
56
+ | - Interval: $INTERVAL_SECONDS seconds
57
+ | - Max error count: $MAX_ERROR_COUNT
58
+ | """ .stripMargin)
62
59
63
- stopMonitor(indexName)
64
- logInfo(" Index monitor task is cancelled" )
65
- }
66
- } catch {
67
- case e : Throwable =>
68
- logError(" Failed to update index log entry" , e)
69
- MetricsUtil .incrementCounter(MetricConstants .STREAMING_HEARTBEAT_FAILED_METRIC )
70
- }
71
- },
72
- 15 , // Delay to ensure final logging is complete first, otherwise version conflicts
73
- 60 , // TODO: make interval configurable
60
+ val task = FlintSparkIndexMonitor .executor.scheduleWithFixedDelay(
61
+ new FlintSparkIndexMonitorTask (indexName),
62
+ INITIAL_DELAY_SECONDS , // Delay to ensure final logging is complete first, otherwise version conflicts
63
+ INTERVAL_SECONDS ,
74
64
TimeUnit .SECONDS )
75
65
76
66
FlintSparkIndexMonitor .indexMonitorTracker.put(indexName, task)
@@ -92,8 +82,68 @@ class FlintSparkIndexMonitor(
92
82
}
93
83
}
94
84
85
+ /**
86
+ * Index monitor task that encapsulates the execution logic with number of consecutive error
87
+ * tracked.
88
+ *
89
+ * @param indexName
90
+ * Flint index name
91
+ */
92
+ private class FlintSparkIndexMonitorTask (indexName : String ) extends Runnable {
93
+
94
+ /** The number of consecutive error */
95
+ private var errorCnt = 0
96
+
97
+ override def run (): Unit = {
98
+ logInfo(s " Scheduler trigger index monitor task for $indexName" )
99
+ try {
100
+ if (isStreamingJobActive(indexName)) {
101
+ logInfo(" Streaming job is still active" )
102
+ flintClient
103
+ .startTransaction(indexName, dataSourceName)
104
+ .initialLog(latest => latest.state == REFRESHING )
105
+ .finalLog(latest => latest) // timestamp will update automatically
106
+ .commit(_ => {})
107
+ } else {
108
+ logError(" Streaming job is not active. Cancelling monitor task" )
109
+ flintClient
110
+ .startTransaction(indexName, dataSourceName)
111
+ .initialLog(_ => true )
112
+ .finalLog(latest => latest.copy(state = FAILED ))
113
+ .commit(_ => {})
114
+
115
+ stopMonitor(indexName)
116
+ logInfo(" Index monitor task is cancelled" )
117
+ }
118
+ errorCnt = 0 // Reset counter if no error
119
+ } catch {
120
+ case e : Throwable =>
121
+ errorCnt += 1
122
+ logError(s " Failed to update index log entry, consecutive errors: $errorCnt" , e)
123
+ MetricsUtil .incrementCounter(MetricConstants .STREAMING_HEARTBEAT_FAILED_METRIC )
124
+
125
+ // Stop streaming job and its monitor if max retry limit reached
126
+ if (errorCnt >= MAX_ERROR_COUNT ) {
127
+ logInfo(s " Terminating streaming job and index monitor for $indexName" )
128
+ stopStreamingJob(indexName)
129
+ stopMonitor(indexName)
130
+ logInfo(s " Streaming job and index monitor terminated " )
131
+ }
132
+ }
133
+ }
134
+ }
135
+
95
136
private def isStreamingJobActive (indexName : String ): Boolean =
96
137
spark.streams.active.exists(_.name == indexName)
138
+
139
+ private def stopStreamingJob (indexName : String ): Unit = {
140
+ val job = spark.streams.active.find(_.name == indexName)
141
+ if (job.isDefined) {
142
+ job.get.stop()
143
+ } else {
144
+ logWarning(" Refreshing job not found" )
145
+ }
146
+ }
97
147
}
98
148
99
149
object FlintSparkIndexMonitor extends Logging {
0 commit comments