Skip to content

Commit fdfd1fa

Browse files
committed
remove ml config index creation from cron job
Signed-off-by: Jing Zhang <jngz@amazon.com>
1 parent dad243f commit fdfd1fa

File tree

3 files changed

+2
-112
lines changed

3 files changed

+2
-112
lines changed

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private void startSyncModelRoutingCron() {
102102
log.info("Starting ML sync up job...");
103103
syncModelRoutingCron = threadPool
104104
.scheduleWithFixedDelay(
105-
new MLSyncUpCron(client, sdkClient, clusterService, nodeHelper, mlIndicesHandler, encryptor, mlFeatureEnabledSetting),
105+
new MLSyncUpCron(client, sdkClient, clusterService, nodeHelper, mlIndicesHandler, mlFeatureEnabledSetting),
106106
TimeValue.timeValueSeconds(jobInterval),
107107
GENERAL_THREAD_POOL
108108
);

plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55

66
package org.opensearch.ml.cluster;
77

8-
import static org.opensearch.ml.common.CommonValue.CREATE_TIME_FIELD;
9-
import static org.opensearch.ml.common.CommonValue.MASTER_KEY;
10-
import static org.opensearch.ml.common.CommonValue.ML_CONFIG_INDEX;
118
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
129
import static org.opensearch.ml.utils.RestActionUtils.getAllNodes;
1310

@@ -23,14 +20,10 @@
2320
import java.util.stream.Collectors;
2421

2522
import org.opensearch.OpenSearchStatusException;
26-
import org.opensearch.action.DocWriteRequest;
27-
import org.opensearch.action.get.GetRequest;
28-
import org.opensearch.action.index.IndexRequest;
2923
import org.opensearch.action.search.SearchResponse;
3024
import org.opensearch.action.support.WriteRequest;
3125
import org.opensearch.cluster.node.DiscoveryNode;
3226
import org.opensearch.cluster.service.ClusterService;
33-
import org.opensearch.common.util.concurrent.ThreadContext;
3427
import org.opensearch.core.action.ActionListener;
3528
import org.opensearch.index.query.BoolQueryBuilder;
3629
import org.opensearch.index.query.TermsQueryBuilder;
@@ -46,7 +39,6 @@
4639
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesResponse;
4740
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsAction;
4841
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsRequest;
49-
import org.opensearch.ml.engine.encryptor.Encryptor;
5042
import org.opensearch.ml.engine.indices.MLIndicesHandler;
5143
import org.opensearch.remote.metadata.client.BulkDataObjectRequest;
5244
import org.opensearch.remote.metadata.client.SdkClient;
@@ -58,7 +50,6 @@
5850
import org.opensearch.transport.client.Client;
5951

6052
import com.google.common.annotations.VisibleForTesting;
61-
import com.google.common.collect.ImmutableMap;
6253

6354
import lombok.extern.log4j.Log4j2;
6455

@@ -71,7 +62,6 @@ public class MLSyncUpCron implements Runnable {
7162
private ClusterService clusterService;
7263
private DiscoveryNodeHelper nodeHelper;
7364
private MLIndicesHandler mlIndicesHandler;
74-
private Encryptor encryptor;
7565
private volatile Boolean mlConfigInited;
7666
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
7767
@VisibleForTesting
@@ -83,7 +73,6 @@ public MLSyncUpCron(
8373
ClusterService clusterService,
8474
DiscoveryNodeHelper nodeHelper,
8575
MLIndicesHandler mlIndicesHandler,
86-
Encryptor encryptor,
8776
MLFeatureEnabledSetting mlFeatureEnabledSetting
8877
) {
8978
this.client = client;
@@ -93,13 +82,11 @@ public MLSyncUpCron(
9382
this.mlIndicesHandler = mlIndicesHandler;
9483
this.updateModelStateSemaphore = new Semaphore(1);
9584
this.mlConfigInited = false;
96-
this.encryptor = encryptor;
9785
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
9886
}
9987

10088
@Override
10189
public void run() {
102-
initMLConfig();
10390
if (!clusterService.state().metadata().indices().containsKey(ML_MODEL_INDEX)) {
10491
// no need to run sync up job if no model index
10592
log.info("Skipping sync up job - ML model index not found");
@@ -241,45 +228,6 @@ private void undeployExpiredModels(
241228
}, e -> { log.error("Failed to undeploy models {}", expiredModels, e); }));
242229
}
243230

244-
@VisibleForTesting
245-
void initMLConfig() {
246-
if (mlConfigInited || mlFeatureEnabledSetting.isMultiTenancyEnabled()) {
247-
return;
248-
}
249-
mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> {
250-
if (!r) {
251-
log.error("Failed to initialize or update ML Config index");
252-
return;
253-
}
254-
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
255-
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
256-
client.get(getRequest, ActionListener.wrap(getResponse -> {
257-
if (!getResponse.isExists()) {
258-
IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
259-
final String masterKey = encryptor.generateMasterKey();
260-
indexRequest.source(ImmutableMap.of(MASTER_KEY, masterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli()));
261-
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
262-
indexRequest.opType(DocWriteRequest.OpType.CREATE);
263-
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
264-
log.info("ML configuration initialized successfully");
265-
// as this method is not being used for multi-tenancy use case, we are setting
266-
// tenant id null by default
267-
encryptor.setMasterKey(null, masterKey);
268-
mlConfigInited = true;
269-
}, e -> { log.debug("Failed to save ML encryption master key", e); }));
270-
} else {
271-
final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
272-
// as this method is not being used for multi-tenancy use case, we are setting
273-
// tenant id null by default
274-
encryptor.setMasterKey(null, masterKey);
275-
mlConfigInited = true;
276-
log.info("ML configuration already initialized, no action needed");
277-
}
278-
}, e -> { log.debug("Failed to get ML encryption master key", e); }));
279-
}
280-
}, e -> { log.debug("Failed to init ML config index", e); }));
281-
}
282-
283231
@VisibleForTesting
284232
void refreshModelState(Map<String, Set<String>> modelWorkerNodes, Map<String, Set<String>> deployingModels) {
285233
if (!updateModelStateSemaphore.tryAcquire()) {

plugin/src/test/java/org/opensearch/ml/cluster/MLSyncUpCronTests.java

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,10 @@
1010
import static org.mockito.ArgumentMatchers.eq;
1111
import static org.mockito.Mockito.doAnswer;
1212
import static org.mockito.Mockito.doThrow;
13-
import static org.mockito.Mockito.mock;
1413
import static org.mockito.Mockito.never;
15-
import static org.mockito.Mockito.spy;
1614
import static org.mockito.Mockito.times;
1715
import static org.mockito.Mockito.verify;
1816
import static org.mockito.Mockito.when;
19-
import static org.opensearch.ml.common.CommonValue.CREATE_TIME_FIELD;
20-
import static org.opensearch.ml.common.CommonValue.MASTER_KEY;
2117
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
2218
import static org.opensearch.ml.utils.TestHelper.ML_ROLE;
2319
import static org.opensearch.ml.utils.TestHelper.setupTestClusterState;
@@ -34,16 +30,13 @@
3430
import java.util.concurrent.atomic.AtomicInteger;
3531

3632
import org.apache.lucene.search.TotalHits;
37-
import org.junit.Assert;
3833
import org.junit.Before;
3934
import org.mockito.ArgumentCaptor;
4035
import org.mockito.Mock;
4136
import org.mockito.Mockito;
4237
import org.mockito.MockitoAnnotations;
4338
import org.opensearch.Version;
4439
import org.opensearch.action.bulk.BulkRequest;
45-
import org.opensearch.action.get.GetResponse;
46-
import org.opensearch.action.index.IndexResponse;
4740
import org.opensearch.action.search.SearchResponse;
4841
import org.opensearch.action.search.SearchResponseSections;
4942
import org.opensearch.action.search.ShardSearchFailure;
@@ -71,8 +64,6 @@
7164
import org.opensearch.ml.common.transport.sync.MLSyncUpAction;
7265
import org.opensearch.ml.common.transport.sync.MLSyncUpNodeResponse;
7366
import org.opensearch.ml.common.transport.sync.MLSyncUpNodesResponse;
74-
import org.opensearch.ml.engine.encryptor.Encryptor;
75-
import org.opensearch.ml.engine.encryptor.EncryptorImpl;
7667
import org.opensearch.ml.engine.indices.MLIndicesHandler;
7768
import org.opensearch.ml.utils.TestHelper;
7869
import org.opensearch.remote.metadata.client.SdkClient;
@@ -87,7 +78,6 @@
8778
import org.opensearch.threadpool.ThreadPool;
8879
import org.opensearch.transport.client.Client;
8980

90-
import com.google.common.collect.ImmutableMap;
9181
import com.google.common.collect.ImmutableSet;
9282

9383
public class MLSyncUpCronTests extends OpenSearchTestCase {
@@ -112,7 +102,6 @@ public class MLSyncUpCronTests extends OpenSearchTestCase {
112102
private final String mlNode2Id = "mlNode2";
113103

114104
private ClusterState testState;
115-
private Encryptor encryptor;
116105

117106
@Mock
118107
ThreadPool threadPool;
@@ -124,64 +113,17 @@ public void setup() throws IOException {
124113
MockitoAnnotations.openMocks(this);
125114
mlNode1 = new DiscoveryNode(mlNode1Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
126115
mlNode2 = new DiscoveryNode(mlNode2Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
127-
encryptor = spy(new EncryptorImpl(null, "m+dWmfmnNRiNlOdej/QelEkvMTyH//frS2TBeS2BP4w="));
128116

129117
testState = setupTestClusterState("node");
130118
when(clusterService.state()).thenReturn(testState);
131119

132-
doAnswer(invocation -> {
133-
ActionListener<Boolean> actionListener = invocation.getArgument(0);
134-
actionListener.onResponse(true);
135-
return null;
136-
}).when(mlIndicesHandler).initMLConfigIndex(any());
137-
138120
Settings settings = Settings.builder().build();
139121
sdkClient = Mockito.spy(SdkClientFactory.createSdkClient(client, NamedXContentRegistry.EMPTY, Collections.emptyMap()));
140122
threadContext = new ThreadContext(settings);
141123
threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, USER_STRING);
142124
when(client.threadPool()).thenReturn(threadPool);
143125
when(threadPool.getThreadContext()).thenReturn(threadContext);
144-
syncUpCron = new MLSyncUpCron(client, sdkClient, clusterService, nodeHelper, mlIndicesHandler, encryptor, mlFeatureEnabledSetting);
145-
}
146-
147-
public void testInitMlConfig_MasterKeyNotExist() {
148-
doAnswer(invocation -> {
149-
ActionListener<GetResponse> listener = invocation.getArgument(1);
150-
GetResponse response = mock(GetResponse.class);
151-
when(response.isExists()).thenReturn(false);
152-
listener.onResponse(response);
153-
return null;
154-
}).when(client).get(any(), any());
155-
156-
doAnswer(invocation -> {
157-
ActionListener<IndexResponse> listener = invocation.getArgument(1);
158-
IndexResponse indexResponse = mock(IndexResponse.class);
159-
listener.onResponse(indexResponse);
160-
return null;
161-
}).when(client).index(any(), any());
162-
163-
syncUpCron.initMLConfig();
164-
Assert.assertNotNull(encryptor.encrypt("test", null));
165-
syncUpCron.initMLConfig();
166-
verify(encryptor, times(1)).setMasterKey(any(), any());
167-
}
168-
169-
public void testInitMlConfig_MasterKeyExists() {
170-
doAnswer(invocation -> {
171-
ActionListener<GetResponse> listener = invocation.getArgument(1);
172-
GetResponse response = mock(GetResponse.class);
173-
when(response.isExists()).thenReturn(true);
174-
String masterKey = encryptor.generateMasterKey();
175-
when(response.getSourceAsMap())
176-
.thenReturn(ImmutableMap.of(MASTER_KEY, masterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli()));
177-
listener.onResponse(response);
178-
return null;
179-
}).when(client).get(any(), any());
180-
181-
syncUpCron.initMLConfig();
182-
Assert.assertNotNull(encryptor.encrypt("test", null));
183-
syncUpCron.initMLConfig();
184-
verify(encryptor, times(1)).setMasterKey(any(), any());
126+
syncUpCron = new MLSyncUpCron(client, sdkClient, clusterService, nodeHelper, mlIndicesHandler, mlFeatureEnabledSetting);
185127
}
186128

187129
public void testRun_NoMLModelIndex() {

0 commit comments

Comments
 (0)