Skip to content

Commit 23afd3b

Browse files
authored
Merge branch 'main' into CVE-48734
Signed-off-by: Brian Flores <iflorbri@amazon.com>
2 parents 40526eb + 5bbf565 commit 23afd3b

File tree

13 files changed

+267
-83
lines changed

13 files changed

+267
-83
lines changed

common/src/main/java/org/opensearch/ml/common/CommonValue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class CommonValue {
6565
public static final String ML_MEMORY_MESSAGE_INDEX_MAPPING_PATH = "index-mappings/ml_memory_message.json";
6666
public static final String ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH = "index-mappings/ml_mcp_session_management.json";
6767
public static final String ML_MCP_TOOLS_INDEX_MAPPING_PATH = "index-mappings/ml_mcp_tools.json";
68+
public static final String ML_JOBS_INDEX_MAPPING_PATH = "index-mappings/ml_jobs.json";
6869

6970
// Calculate Versions independently of OpenSearch core version
7071
public static final Version VERSION_2_11_0 = Version.fromString("2.11.0");

common/src/main/java/org/opensearch/ml/common/MLIndex.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX_MAPPING_PATH;
1616
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
1717
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX_MAPPING_PATH;
18+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
19+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX_MAPPING_PATH;
1820
import static org.opensearch.ml.common.CommonValue.ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH;
1921
import static org.opensearch.ml.common.CommonValue.ML_MCP_TOOLS_INDEX_MAPPING_PATH;
2022
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
@@ -44,7 +46,8 @@ public enum MLIndex {
4446
MEMORY_META(ML_MEMORY_META_INDEX, false, ML_MEMORY_META_INDEX_MAPPING_PATH),
4547
MEMORY_MESSAGE(ML_MEMORY_MESSAGE_INDEX, false, ML_MEMORY_MESSAGE_INDEX_MAPPING_PATH),
4648
MCP_SESSION_MANAGEMENT(MCP_SESSION_MANAGEMENT_INDEX, false, ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH),
47-
MCP_TOOLS(MCP_TOOLS_INDEX, false, ML_MCP_TOOLS_INDEX_MAPPING_PATH);
49+
MCP_TOOLS(MCP_TOOLS_INDEX, false, ML_MCP_TOOLS_INDEX_MAPPING_PATH),
50+
JOBS(ML_JOBS_INDEX, false, ML_JOBS_INDEX_MAPPING_PATH);
4851

4952
private final String indexName;
5053
// whether we use an alias for the index

common/src/main/java/org/opensearch/ml/common/settings/MLCommonsSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,9 @@ private MLCommonsSettings() {}
345345

346346
// Feature flag for enabling telemetry metric collection via metrics framework
347347
public static final Setting<Boolean> ML_COMMONS_METRIC_COLLECTION_ENABLED = Setting
348-
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
348+
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
349349

350350
// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
351351
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
352-
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
352+
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
353353
}

common/src/main/java/org/opensearch/ml/common/settings/MLFeatureEnabledSetting.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,6 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
8888
clusterService
8989
.getClusterSettings()
9090
.addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED, it -> isRagSearchPipelineEnabled = it);
91-
clusterService
92-
.getClusterSettings()
93-
.addSettingsUpdateConsumer(ML_COMMONS_METRIC_COLLECTION_ENABLED, it -> isMetricCollectionEnabled = it);
94-
clusterService
95-
.getClusterSettings()
96-
.addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> isStaticMetricCollectionEnabled = it);
9791
}
9892

9993
/**
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"_meta": {
3+
"schema_version": 1
4+
},
5+
"properties": {
6+
"name": {
7+
"type": "keyword"
8+
},
9+
"enabled": {
10+
"type": "boolean"
11+
},
12+
"enabled_time": {
13+
"type": "date",
14+
"format": "strict_date_time||epoch_millis"
15+
},
16+
"last_update_time": {
17+
"type": "date",
18+
"format": "strict_date_time||epoch_millis"
19+
},
20+
"schedule": {
21+
"properties": {
22+
"interval": {
23+
"properties": {
24+
"start_time": {
25+
"type": "date",
26+
"format": "strict_date_time||epoch_millis"
27+
},
28+
"period": {
29+
"type": "integer"
30+
},
31+
"unit": {
32+
"type": "keyword"
33+
}
34+
}
35+
}
36+
}
37+
},
38+
"lock_duration_seconds": {
39+
"type": "long"
40+
},
41+
"jitter": {
42+
"type": "double"
43+
},
44+
"type": {
45+
"type": "keyword"
46+
}
47+
}
48+
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/indices/MLIndicesHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public void initMLMcpToolsIndex(ActionListener<Boolean> listener) {
9292
initMLIndexIfAbsent(MLIndex.MCP_TOOLS, listener);
9393
}
9494

95+
public void initMLJobsIndex(ActionListener<Boolean> listener) {
96+
initMLIndexIfAbsent(MLIndex.JOBS, listener);
97+
}
98+
9599
public void initMLAgentIndex(ActionListener<Boolean> listener) {
96100
initMLIndexIfAbsent(MLIndex.AGENT, listener);
97101
}

ml-algorithms/src/test/java/org/opensearch/ml/engine/indices/MLIndicesHandlerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.mockito.Mockito.when;
1212
import static org.opensearch.ml.common.CommonValue.META;
1313
import static org.opensearch.ml.common.CommonValue.ML_AGENT_INDEX;
14+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
1415
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
1516
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_META_INDEX;
1617
import static org.opensearch.ml.common.CommonValue.SCHEMA_VERSION_FIELD;
@@ -216,4 +217,36 @@ public void initMLConnectorIndex_ResourceAlreadyExistsException_RaceCondition()
216217
verify(listener).onResponse(argumentCaptor.capture());
217218
assertEquals(true, argumentCaptor.getValue());
218219
}
220+
221+
@Test
222+
public void initMLJobsIndex() {
223+
ActionListener<Boolean> listener = mock(ActionListener.class);
224+
doAnswer(invocation -> {
225+
ActionListener<AcknowledgedResponse> actionListener = invocation.getArgument(1);
226+
actionListener.onResponse(new AcknowledgedResponse(true));
227+
return null;
228+
}).when(indicesAdminClient).putMapping(any(), any());
229+
ArgumentCaptor<Boolean> argumentCaptor = ArgumentCaptor.forClass(Boolean.class);
230+
indicesHandler.initMLJobsIndex(listener);
231+
232+
verify(listener).onResponse(argumentCaptor.capture());
233+
assertEquals(true, argumentCaptor.getValue());
234+
}
235+
236+
@Test
237+
public void initMLJobsIndexNoIndex() {
238+
ActionListener<Boolean> listener = mock(ActionListener.class);
239+
when(metadata.hasIndex(anyString())).thenReturn(false);
240+
doAnswer(invocation -> {
241+
ActionListener<CreateIndexResponse> actionListener = invocation.getArgument(1);
242+
actionListener.onResponse(new CreateIndexResponse(true, true, ML_JOBS_INDEX));
243+
return null;
244+
}).when(indicesAdminClient).create(any(), any());
245+
ArgumentCaptor<Boolean> argumentCaptor = ArgumentCaptor.forClass(Boolean.class);
246+
indicesHandler.initMLJobsIndex(listener);
247+
248+
verify(indicesAdminClient).create(isA(CreateIndexRequest.class), any());
249+
verify(listener).onResponse(argumentCaptor.capture());
250+
assertEquals(true, argumentCaptor.getValue());
251+
}
219252
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
package org.opensearch.ml.cluster;
77

8+
import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX;
89
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MONITORING_REQUEST_COUNT;
910

1011
import java.util.List;
1112
import java.util.Set;
1213
import java.util.stream.Collectors;
1314

15+
import org.opensearch.Version;
1416
import org.opensearch.cluster.ClusterChangedEvent;
1517
import org.opensearch.cluster.ClusterState;
1618
import org.opensearch.cluster.ClusterStateListener;
@@ -19,9 +21,11 @@
1921
import org.opensearch.cluster.service.ClusterService;
2022
import org.opensearch.common.settings.Settings;
2123
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
24+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
2225
import org.opensearch.ml.model.MLModelCacheHelper;
2326
import org.opensearch.ml.model.MLModelManager;
2427
import org.opensearch.ml.task.MLTaskManager;
28+
import org.opensearch.transport.client.Client;
2529

2630
import lombok.extern.log4j.Log4j2;
2731

@@ -32,22 +36,27 @@ public class MLCommonsClusterEventListener implements ClusterStateListener {
3236
private final MLModelManager mlModelManager;
3337
private final MLTaskManager mlTaskManager;
3438
private final MLModelCacheHelper modelCacheHelper;
35-
3639
private final MLModelAutoReDeployer mlModelAutoReDeployer;
40+
private final Client client;
41+
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
3742

3843
public MLCommonsClusterEventListener(
3944
ClusterService clusterService,
4045
MLModelManager mlModelManager,
4146
MLTaskManager mlTaskManager,
4247
MLModelCacheHelper modelCacheHelper,
43-
MLModelAutoReDeployer mlModelAutoReDeployer
48+
MLModelAutoReDeployer mlModelAutoReDeployer,
49+
Client client,
50+
MLFeatureEnabledSetting mlFeatureEnabledSetting
4451
) {
4552
this.clusterService = clusterService;
4653
this.clusterService.addListener(this);
4754
this.mlModelManager = mlModelManager;
4855
this.mlTaskManager = mlTaskManager;
4956
this.modelCacheHelper = modelCacheHelper;
5057
this.mlModelAutoReDeployer = mlModelAutoReDeployer;
58+
this.client = client;
59+
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
5160
}
5261

5362
@Override
@@ -69,5 +78,28 @@ public void clusterChanged(ClusterChangedEvent event) {
6978
List<String> addedNodesIds = delta.addedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toList());
7079
mlModelAutoReDeployer.buildAutoReloadArrangement(addedNodesIds, state.getNodes().getClusterManagerNodeId());
7180
}
81+
82+
/*
83+
* In version 3.1, a new index `.plugins-ml-jobs` replaces the old `.ml_commons_task_polling_job` index for the job scheduler.
84+
* Version 3.1 also introduces a stats collector job that should run at startup if the relevant settings are enabled.
85+
* When upgrading from 3.0 to 3.1, we need to ensure the new `.plugins-ml-jobs` index is created if either:
86+
* - The stats collector job is enabled, or
87+
* - The batch polling task job was already running.
88+
* To avoid issues during blue/green or rolling upgrades, we wait for a data node running 3.1 or later before creating the new jobs index and starting the jobs.
89+
* The following logic implements this behavior.
90+
*/
91+
for (DiscoveryNode node : state.nodes()) {
92+
if (node.isDataNode() && Version.V_3_1_0.onOrAfter(node.getVersion())) {
93+
if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) {
94+
mlTaskManager.startStatsCollectorJob();
95+
}
96+
97+
if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) {
98+
mlTaskManager.startTaskPollingJob();
99+
}
100+
101+
break;
102+
}
103+
}
72104
}
73105
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,18 @@
88
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS;
99
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
1010

11-
import java.io.IOException;
12-
import java.time.Instant;
13-
import java.time.temporal.ChronoUnit;
1411
import java.util.List;
1512

16-
import org.opensearch.action.index.IndexRequest;
17-
import org.opensearch.action.support.WriteRequest;
1813
import org.opensearch.cluster.LocalNodeClusterManagerListener;
1914
import org.opensearch.cluster.service.ClusterService;
2015
import org.opensearch.common.lifecycle.LifecycleListener;
2116
import org.opensearch.common.settings.Settings;
2217
import org.opensearch.common.unit.TimeValue;
23-
import org.opensearch.common.xcontent.json.JsonXContent;
2418
import org.opensearch.core.action.ActionListener;
25-
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
2619
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
27-
import org.opensearch.ml.common.CommonValue;
2820
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
2921
import org.opensearch.ml.engine.encryptor.Encryptor;
3022
import org.opensearch.ml.engine.indices.MLIndicesHandler;
31-
import org.opensearch.ml.jobs.MLJobParameter;
32-
import org.opensearch.ml.jobs.MLJobType;
3323
import org.opensearch.remote.metadata.client.SdkClient;
3424
import org.opensearch.threadpool.Scheduler;
3525
import org.opensearch.threadpool.ThreadPool;
@@ -105,40 +95,6 @@ public void onClusterManager() {
10595
TimeValue.timeValueSeconds(jobInterval),
10696
GENERAL_THREAD_POOL
10797
);
108-
startStatsCollectorJob();
109-
}
110-
111-
private void startStatsCollectorJob() {
112-
try {
113-
int intervalInMinutes = 5;
114-
Long lockDurationSeconds = 20L;
115-
116-
MLJobParameter jobParameter = new MLJobParameter(
117-
MLJobType.STATS_COLLECTOR.name(),
118-
new IntervalSchedule(Instant.now(), intervalInMinutes, ChronoUnit.MINUTES),
119-
lockDurationSeconds,
120-
null,
121-
MLJobType.STATS_COLLECTOR
122-
);
123-
124-
IndexRequest indexRequest = new IndexRequest()
125-
.index(CommonValue.ML_JOBS_INDEX)
126-
.id(MLJobType.STATS_COLLECTOR.name())
127-
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
128-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
129-
130-
client
131-
.index(
132-
indexRequest,
133-
ActionListener
134-
.wrap(
135-
r -> log.info("Indexed ml stats collection job successfully"),
136-
e -> log.error("Failed to index stats collection job", e)
137-
)
138-
);
139-
} catch (IOException e) {
140-
log.error("Failed to index stats collection job", e);
141-
}
14298
}
14399

144100
private void startSyncModelRoutingCron() {

plugin/src/main/java/org/opensearch/ml/jobs/processors/MLJobProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.jobscheduler.spi.JobExecutionContext;
1515
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1616
import org.opensearch.jobscheduler.spi.utils.LockService;
17-
import org.opensearch.ml.common.exception.MLException;
1817
import org.opensearch.threadpool.ThreadPool;
1918
import org.opensearch.transport.client.Client;
2019

@@ -36,7 +35,8 @@ public MLJobProcessor(ClusterService clusterService, Client client, ThreadPool t
3635

3736
public void process(ScheduledJobParameter scheduledJobParameter, JobExecutionContext jobExecutionContext, boolean isProcessorEnabled) {
3837
if (!isProcessorEnabled) {
39-
throw new MLException(scheduledJobParameter.getName() + " not enabled.");
38+
log.warn("{} not enabled.", scheduledJobParameter.getName());
39+
return;
4040
}
4141

4242
process(scheduledJobParameter, jobExecutionContext);

0 commit comments

Comments
 (0)