Skip to content

Commit 6dc5726

Browse files
Support recovery for index with external scheduler (#717) (#776)
* Support recovery for index with external scheduler * Improve default option update logic * Resolve comments * Add index metrics * Remove debugging log and refactor updateSchedulerMode * refactor metrics with aop * Add more IT --------- (cherry picked from commit a345373) Signed-off-by: Louis Chu <clingzhi@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 288f88a commit 6dc5726

28 files changed

+959
-286
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ lazy val flintCommons = (project in file("flint-commons"))
117117
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
118118
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
119119
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
120-
"org.projectlombok" % "lombok" % "1.18.30",
120+
"org.projectlombok" % "lombok" % "1.18.30" % "provided",
121121
),
122122
libraryDependencies ++= deps(sparkVersion),
123123
publish / skip := true,

flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.codahale.metrics.Timer;
1212
import org.apache.spark.SparkEnv;
1313
import org.apache.spark.metrics.source.FlintMetricSource;
14+
import org.apache.spark.metrics.source.FlintIndexMetricSource;
1415
import org.apache.spark.metrics.source.Source;
1516
import scala.collection.Seq;
1617

@@ -33,10 +34,20 @@ private MetricsUtil() {
3334
* If the counter does not exist, it is created before being incremented.
3435
*
3536
* @param metricName The name of the metric for which the counter is incremented.
36-
* This name is used to retrieve or create the counter.
3737
*/
3838
public static void incrementCounter(String metricName) {
39-
Counter counter = getOrCreateCounter(metricName);
39+
incrementCounter(metricName, false);
40+
}
41+
42+
/**
43+
* Increments the Counter metric associated with the given metric name.
44+
* If the counter does not exist, it is created before being incremented.
45+
*
46+
* @param metricName The name of the metric for which the counter is incremented.
47+
* @param isIndexMetric Whether this metric is an index-specific metric.
48+
*/
49+
public static void incrementCounter(String metricName, boolean isIndexMetric) {
50+
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
4051
if (counter != null) {
4152
counter.inc();
4253
}
@@ -48,29 +59,48 @@ public static void incrementCounter(String metricName) {
4859
* @param metricName The name of the metric counter to be decremented.
4960
*/
5061
public static void decrementCounter(String metricName) {
51-
Counter counter = getOrCreateCounter(metricName);
62+
decrementCounter(metricName, false);
63+
}
64+
65+
/**
66+
* Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero.
67+
*
68+
* @param metricName The name of the metric counter to be decremented.
69+
* @param isIndexMetric Whether this metric is an index-specific metric.
70+
*/
71+
public static void decrementCounter(String metricName, boolean isIndexMetric) {
72+
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
5273
if (counter != null && counter.getCount() > 0) {
5374
counter.dec();
5475
}
5576
}
5677

5778
/**
5879
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
59-
* This context can be used to measure the duration of a particular operation or event.
6080
*
6181
* @param metricName The name of the metric timer to retrieve the context for.
6282
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
6383
*/
6484
public static Timer.Context getTimerContext(String metricName) {
65-
Timer timer = getOrCreateTimer(metricName);
85+
return getTimerContext(metricName, false);
86+
}
87+
88+
/**
89+
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
90+
*
91+
* @param metricName The name of the metric timer to retrieve the context for.
92+
* @param isIndexMetric Whether this metric is an index-specific metric.
93+
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
94+
*/
95+
public static Timer.Context getTimerContext(String metricName, boolean isIndexMetric) {
96+
Timer timer = getOrCreateTimer(metricName, isIndexMetric);
6697
return timer != null ? timer.time() : null;
6798
}
6899

69100
/**
70-
* Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started
71-
* and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}.
101+
* Stops the timer associated with the given {@link Timer.Context}.
72102
*
73-
* @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}.
103+
* @param context The {@link Timer.Context} to stop. May be {@code null}.
74104
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
75105
*/
76106
public static Long stopTimer(Timer.Context context) {
@@ -79,53 +109,61 @@ public static Long stopTimer(Timer.Context context) {
79109

80110
/**
81111
* Registers a gauge metric with the provided name and value.
82-
* The gauge will reflect the current value of the AtomicInteger provided.
83112
*
84113
* @param metricName The name of the gauge metric to register.
85-
* @param value The AtomicInteger whose current value should be reflected by the gauge.
114+
* @param value The AtomicInteger whose current value should be reflected by the gauge.
86115
*/
87116
public static void registerGauge(String metricName, final AtomicInteger value) {
88-
MetricRegistry metricRegistry = getMetricRegistry();
117+
registerGauge(metricName, value, false);
118+
}
119+
120+
/**
121+
* Registers a gauge metric with the provided name and value.
122+
*
123+
* @param metricName The name of the gauge metric to register.
124+
* @param value The AtomicInteger whose current value should be reflected by the gauge.
125+
* @param isIndexMetric Whether this metric is an index-specific metric.
126+
*/
127+
public static void registerGauge(String metricName, final AtomicInteger value, boolean isIndexMetric) {
128+
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
89129
if (metricRegistry == null) {
90130
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
91131
return;
92132
}
93133
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
94134
}
95135

96-
// Retrieves or creates a new counter for the given metric name
97-
private static Counter getOrCreateCounter(String metricName) {
98-
MetricRegistry metricRegistry = getMetricRegistry();
136+
private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) {
137+
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
99138
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
100139
}
101140

102-
// Retrieves or creates a new Timer for the given metric name
103-
private static Timer getOrCreateTimer(String metricName) {
104-
MetricRegistry metricRegistry = getMetricRegistry();
141+
private static Timer getOrCreateTimer(String metricName, boolean isIndexMetric) {
142+
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
105143
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
106144
}
107145

108-
// Retrieves the MetricRegistry from the current Spark environment.
109-
private static MetricRegistry getMetricRegistry() {
146+
private static MetricRegistry getMetricRegistry(boolean isIndexMetric) {
110147
SparkEnv sparkEnv = SparkEnv.get();
111148
if (sparkEnv == null) {
112149
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
113150
return null;
114151
}
115152

116-
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
117-
return flintMetricSource.metricRegistry();
153+
Source metricSource = isIndexMetric ?
154+
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(), FlintIndexMetricSource::new) :
155+
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_METRIC_SOURCE_NAME(), FlintMetricSource::new);
156+
return metricSource.metricRegistry();
118157
}
119158

120-
// Gets or initializes the FlintMetricSource
121-
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
122-
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
159+
private static Source getOrInitMetricSource(SparkEnv sparkEnv, String sourceName, java.util.function.Supplier<Source> sourceSupplier) {
160+
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(sourceName);
123161

124162
if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
125-
FlintMetricSource metricSource = new FlintMetricSource();
163+
Source metricSource = sourceSupplier.get();
126164
sparkEnv.metricsSystem().registerSource(metricSource);
127165
return metricSource;
128166
}
129-
return (FlintMetricSource) metricSourceSeq.head();
167+
return metricSourceSeq.head();
130168
}
131169
}

flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,25 @@ package org.apache.spark.metrics.source
77

88
import com.codahale.metrics.MetricRegistry
99

10-
class FlintMetricSource() extends Source {
10+
/**
11+
* Metric source for general Flint metrics.
12+
*/
13+
class FlintMetricSource extends Source {
1114

1215
// Implementing the Source trait
1316
override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME
1417
override val metricRegistry: MetricRegistry = new MetricRegistry
1518
}
1619

20+
/**
21+
* Metric source for Flint index-specific metrics.
22+
*/
23+
class FlintIndexMetricSource extends Source {
24+
override val sourceName: String = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME
25+
override val metricRegistry: MetricRegistry = new MetricRegistry
26+
}
27+
1728
object FlintMetricSource {
1829
val FLINT_METRIC_SOURCE_NAME = "Flint" // Default source name
30+
val FLINT_INDEX_METRIC_SOURCE_NAME = "FlintIndex" // Index specific source name
1931
}

flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,11 @@ public class FlintOptions implements Serializable {
105105

106106
public static final String DEFAULT_SUPPORT_SHARD = "true";
107107

108+
private static final String UNKNOWN = "UNKNOWN";
109+
108110
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
109111
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
112+
public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";
110113

111114
public FlintOptions(Map<String, String> options) {
112115
this.options = options;
@@ -185,9 +188,9 @@ public String getDataSourceName() {
185188
* @return the AWS accountId
186189
*/
187190
public String getAWSAccountId() {
188-
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
191+
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
189192
String[] parts = clusterName.split(":");
190-
return parts.length == 2 ? parts[0] : "";
193+
return parts.length == 2 ? parts[0] : UNKNOWN;
191194
}
192195

193196
public String getSystemIndexName() {

0 commit comments

Comments
 (0)