Skip to content

Commit 7f80607

Browse files
opensearch-trigger-bot[bot]rbhavnadhrubo-os
authored
add more logging to deploy/undeploy flows for better debugging (#3825) (#3827)
* add more logging to deploy/undeploy flows for better debugging Signed-off-by: Bhavana Goud Ramaram <rbhavna@amazon.com> (cherry picked from commit 793c62e) Co-authored-by: Bhavana Goud Ramaram <rbhavna@amazon.com> Co-authored-by: Dhrubo Saha <dhrubo@amazon.com>
1 parent 5b8dfbc commit 7f80607

File tree

5 files changed

+52
-2
lines changed

5 files changed

+52
-2
lines changed

plugin/src/main/java/org/opensearch/ml/action/deploy/TransportDeployModelAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLDepl
139139
MLDeployModelRequest deployModelRequest = MLDeployModelRequest.fromActionRequest(request);
140140
String modelId = deployModelRequest.getModelId();
141141
String tenantId = deployModelRequest.getTenantId();
142+
log.debug("Received deploy request for modelId: {}", modelId);
142143
if (!TenantAwareHelper.validateTenantId(mlFeatureEnabledSetting, tenantId, listener)) {
143144
return;
144145
}
@@ -215,6 +216,7 @@ private void deployModel(
215216
) {
216217
String[] targetNodeIds = deployModelRequest.getModelNodeIds();
217218
boolean deployToAllNodes = targetNodeIds == null || targetNodeIds.length == 0;
219+
log.info("Starting model deployment for model: {}", modelId);
218220
if (!allowCustomDeploymentPlan && !deployToAllNodes) {
219221
throw new IllegalArgumentException("Don't allow custom deployment plan");
220222
}
@@ -283,6 +285,7 @@ private void deployModel(
283285
.build();
284286
mlTaskManager.createMLTask(mlTask, ActionListener.wrap(response -> {
285287
String taskId = response.getId();
288+
log.debug("ML deploy task {} created for modelId: {}", taskId, modelId);
286289
mlTask.setTaskId(taskId);
287290
if (algorithm == FunctionName.REMOTE) {
288291
mlTaskManager.add(mlTask, eligibleNodeIds);
@@ -396,6 +399,7 @@ private ActionListener<MLDeployModelNodesResponse> deployModelNodesResponseListe
396399
if (mlTaskManager.contains(taskId)) {
397400
mlTaskManager.updateMLTask(taskId, tenantId, Map.of(STATE_FIELD, MLTaskState.RUNNING), TASK_SEMAPHORE_TIMEOUT, false);
398401
}
402+
log.debug("Model deployment successful for model: {}", modelId);
399403
listener.onResponse(new MLDeployModelResponse(taskId, MLTaskType.DEPLOY_MODEL, MLTaskState.COMPLETED.name()));
400404
}, e -> {
401405
log.error("Failed to deploy model {}", modelId, e);
@@ -423,6 +427,7 @@ void updateModelDeployStatusAndTriggerOnNodesAction(
423427
List<DiscoveryNode> eligibleNodes,
424428
boolean deployToAllNodes
425429
) {
430+
log.debug("Triggering deploy on nodes for modelId: {}", modelId);
426431
MLDeployModelInput deployModelInput = new MLDeployModelInput(
427432
modelId,
428433
taskId,
@@ -438,6 +443,7 @@ void updateModelDeployStatusAndTriggerOnNodesAction(
438443
deployModelInput
439444
);
440445
ActionListener<MLDeployModelNodesResponse> actionListener = ActionListener.wrap(r -> {
446+
log.debug("Successfully triggered model deployment on nodes for model: {}", modelId);
441447
if (mlTaskManager.contains(taskId)) {
442448
mlTaskManager
443449
.updateMLTask(taskId, mlModel.getTenantId(), Map.of(STATE_FIELD, MLTaskState.RUNNING), TASK_SEMAPHORE_TIMEOUT, false);
@@ -462,6 +468,7 @@ void updateModelDeployStatusAndTriggerOnNodesAction(
462468
});
463469

464470
List<String> workerNodes = eligibleNodes.stream().map(DiscoveryNode::getId).toList();
471+
log.debug("Updating model state to DEPLOYING for modelId: {}", modelId);
465472
mlModelManager
466473
.updateModel(
467474
modelId,

plugin/src/main/java/org/opensearch/ml/action/deploy/TransportDeployModelOnNodeAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ private MLDeployModelNodeResponse createDeployModelNodeResponse(MLDeployModelNod
140140

141141
String localNodeId = clusterService.localNode().getId();
142142

143+
log.debug("Starting model deployment for model: {} on node {}", modelId, localNodeId);
144+
143145
ActionListener<MLForwardResponse> taskDoneListener = ActionListener
144146
.wrap(res -> { log.info("deploy model task done {}", taskId); }, ex -> {
145147
logException("Deploy model task failed: " + taskId, ex, log);
@@ -155,6 +157,12 @@ private MLDeployModelNodeResponse createDeployModelNodeResponse(MLDeployModelNod
155157
deployToAllNodes,
156158
mlTask,
157159
ActionListener.wrap(r -> {
160+
log
161+
.debug(
162+
"Model deployed successfully on local node: {}. Sending DONE message to coordinating node: {}",
163+
localNodeId,
164+
coordinatingNodeId
165+
);
158166
MLForwardInput mlForwardInput = MLForwardInput
159167
.builder()
160168
.requestType(MLForwardRequestType.DEPLOY_MODEL_DONE)
@@ -175,6 +183,13 @@ private MLDeployModelNodeResponse createDeployModelNodeResponse(MLDeployModelNod
175183
);
176184
}
177185
}, e -> {
186+
log
187+
.warn(
188+
"Model deployment failed on local node: {}. Sending FAILED message to coordinating node {} with error: {}",
189+
localNodeId,
190+
coordinatingNodeId,
191+
e.getMessage()
192+
);
178193
MLForwardInput mlForwardInput = MLForwardInput
179194
.builder()
180195
.requestType(MLForwardRequestType.DEPLOY_MODEL_DONE)

plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelAction.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public TransportUndeployModelAction(
100100

101101
@Override
102102
protected void doExecute(Task task, MLUndeployModelNodesRequest request, ActionListener<MLUndeployModelNodesResponse> listener) {
103+
log.info("Executing undeploy for models: {}", Arrays.toString(request.getModelIds()));
103104
ActionListener<MLUndeployModelNodesResponse> wrappedListener = ActionListener.wrap(undeployModelNodesResponse -> {
104105
processUndeployModelResponseAndUpdate(request.getTenantId(), undeployModelNodesResponse, listener);
105106
}, listener::onFailure);
@@ -112,6 +113,7 @@ void processUndeployModelResponseAndUpdate(
112113
ActionListener<MLUndeployModelNodesResponse> listener
113114
) {
114115
List<MLUndeployModelNodeResponse> responses = undeployModelNodesResponse.getNodes();
116+
log.debug("Processing undeploy model responses from nodes");
115117
if (responses == null || responses.isEmpty()) {
116118
listener.onResponse(undeployModelNodesResponse);
117119
return;
@@ -135,9 +137,10 @@ void processUndeployModelResponseAndUpdate(
135137

136138
Map<String, String> modelUndeployStatus = r.getModelUndeployStatus();
137139
for (Map.Entry<String, String> entry : modelUndeployStatus.entrySet()) {
140+
String modelId = entry.getKey();
138141
String status = entry.getValue();
142+
log.debug("Model status of model {} on node {}: {}", modelId, r.getNode().getId(), status);
139143
if (UNDEPLOYED.equals(status)) {
140-
String modelId = entry.getKey();
141144
if (!actualRemovedNodesMap.containsKey(modelId)) {
142145
actualRemovedNodesMap.put(modelId, new ArrayList<>());
143146
}
@@ -154,6 +157,7 @@ void processUndeployModelResponseAndUpdate(
154157
MLSyncUpNodesRequest syncUpRequest = new MLSyncUpNodesRequest(nodeFilter.getAllNodes(), syncUpInput);
155158
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
156159
if (!actualRemovedNodesMap.isEmpty()) {
160+
log.debug("Models undeployed from nodes: {}", actualRemovedNodesMap);
157161
BulkDataObjectRequest bulkRequest = BulkDataObjectRequest.builder().globalIndex(ML_MODEL_INDEX).build();
158162
Map<String, Boolean> deployToAllNodes = new HashMap<>();
159163
for (String modelId : actualRemovedNodesMap.keySet()) {
@@ -166,8 +170,10 @@ void processUndeployModelResponseAndUpdate(
166170
* we need to update both planning worker nodes (count) and current worker nodes (count)
167171
* and deployToAllNodes value in model index.
168172
*/
173+
log.debug("Updating metadata for model {}: removedNodes={}", modelId, removedNodes);
169174
Map<String, Object> updateDocument = new HashMap<>();
170-
if (modelWorkNodesBeforeRemoval.get(modelId).length == removedNodeCount) { // undeploy all nodes.
175+
if (modelWorkNodesBeforeRemoval.get(modelId).length == removedNodeCount) {
176+
log.debug("All nodes removed for model {}. Marking as undeployed.", modelId);// undeploy all nodes.
171177
updateDocument.put(MLModel.PLANNING_WORKER_NODES_FIELD, ImmutableList.of());
172178
updateDocument.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0);
173179
updateDocument.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0);
@@ -180,6 +186,12 @@ void processUndeployModelResponseAndUpdate(
180186
.stream(modelWorkNodesBeforeRemoval.get(modelId))
181187
.filter(x -> !removedNodes.contains(x))
182188
.collect(Collectors.toList());
189+
log
190+
.debug(
191+
"Partially undeployed for model {} with remaining planning worker nodes: {}",
192+
modelId,
193+
newPlanningWorkerNodes
194+
);
183195
updateDocument.put(MLModel.PLANNING_WORKER_NODES_FIELD, newPlanningWorkerNodes);
184196
updateDocument.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, newPlanningWorkerNodes.size());
185197
updateDocument.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, newPlanningWorkerNodes.size());
@@ -195,6 +207,7 @@ void processUndeployModelResponseAndUpdate(
195207
bulkRequest.add(updateRequest).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
196208
}
197209
syncUpInput.setDeployToAllNodes(deployToAllNodes);
210+
log.debug("Sending bulk metadata update request for undeploy");
198211
ActionListener<BulkResponse> actionListener = ActionListener.wrap(r -> {
199212
log
200213
.debug(
@@ -203,6 +216,7 @@ void processUndeployModelResponseAndUpdate(
203216
);
204217
}, e -> { log.error("Failed to update model state as undeployed", e); });
205218
ActionListener<BulkResponse> wrappedListener = ActionListener.runAfter(actionListener, () -> {
219+
log.debug("Triggering sync-up after bulk update for undeploy");
206220
syncUpUndeployedModels(syncUpRequest);
207221
listener.onResponse(undeployModelNodesResponse);
208222
});
@@ -288,11 +302,14 @@ private MLUndeployModelNodeResponse createUndeployModelNodeResponse(MLUndeployMo
288302

289303
boolean specifiedModelIds = modelIds != null && modelIds.length > 0;
290304
String[] removedModelIds = specifiedModelIds ? modelIds : mlModelManager.getAllModelIds();
305+
306+
log.debug("Models to undeploy: {}", Arrays.toString(removedModelIds));
291307
if (removedModelIds != null) {
292308
for (String modelId : removedModelIds) {
293309
FunctionName functionName = mlModelManager.getModelFunctionName(modelId);
294310
String[] workerNodes = mlModelManager.getWorkerNodes(modelId, functionName);
295311
modelWorkerNodesMap.put(modelId, workerNodes);
312+
log.debug("Retrieved worker nodes for model {}: {}", modelId, Arrays.toString(workerNodes));
296313
}
297314
}
298315

plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,14 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLUnde
129129
String[] modelIds = undeployModelsRequest.getModelIds();
130130
String tenantId = undeployModelsRequest.getTenantId();
131131
String[] targetNodeIds = undeployModelsRequest.getNodeIds();
132+
log.info("Executing undeploy model action for modelIds: {}", Arrays.toString(modelIds));
132133

133134
if (!TenantAwareHelper.validateTenantId(mlFeatureEnabledSetting, tenantId, listener)) {
134135
return;
135136
}
136137

137138
if (modelIds == null) {
139+
log.error("No modelIds provided in undeploy.");
138140
listener.onFailure(new IllegalArgumentException("Must set specific model ids to undeploy"));
139141
return;
140142
}
@@ -186,10 +188,12 @@ private void undeployModels(
186188
String tenantId,
187189
ActionListener<MLUndeployModelsResponse> listener
188190
) {
191+
log.debug("Initiating undeploy on nodes: {}, for modelIds: {}", Arrays.toString(targetNodeIds), Arrays.toString(modelIds));
189192
MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds);
190193
mlUndeployModelNodesRequest.setTenantId(tenantId);
191194

192195
client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(response -> {
196+
log.info("Undeploy response received from nodes");
193197
/*
194198
* The method TransportUndeployModelsAction.processUndeployModelResponseAndUpdate(...) performs
195199
* undeploy action of models by removing the models from the nodes cache and updating the index when it's able to find it.
@@ -213,9 +217,11 @@ private void undeployModels(
213217
return modelCacheMissForModelIds;
214218
});
215219
if (response.getNodes().isEmpty() || modelNotFoundInNodesCache) {
220+
log.warn("No node found running the model(s): {}", Arrays.toString(modelIds));
216221
bulkSetModelIndexToUndeploy(modelIds, listener, response);
217222
return;
218223
}
224+
log.info("Successfully undeployed model(s) from nodes: {}", Arrays.toString(modelIds));
219225
listener.onResponse(new MLUndeployModelsResponse(response));
220226
}, listener::onFailure));
221227
}

plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void run() {
102102
initMLConfig();
103103
if (!clusterService.state().metadata().indices().containsKey(ML_MODEL_INDEX)) {
104104
// no need to run sync up job if no model index
105+
log.info("Skipping sync up job - ML model index not found");
105106
return;
106107
}
107108
log.debug("ML sync job starts");
@@ -111,6 +112,7 @@ public void run() {
111112

112113
// gather running model/tasks on nodes
113114
client.execute(MLSyncUpAction.INSTANCE, gatherInfoRequest, ActionListener.wrap(r -> {
115+
log.debug("Received sync up responses from nodes");
114116
List<MLSyncUpNodeResponse> responses = r.getNodes();
115117
if (r.failures() != null && !r.failures().isEmpty()) {
116118
log
@@ -130,6 +132,7 @@ public void run() {
130132
Map<String, Set<String>> expiredModelToNodes = new HashMap<>();
131133
for (MLSyncUpNodeResponse response : responses) {
132134
String nodeId = response.getNode().getId();
135+
log.debug("Processing sync response from node: {}", nodeId);
133136
String[] expiredModelIds = response.getExpiredModelIds();
134137
if (expiredModelIds != null && expiredModelIds.length > 0) {
135138
Arrays
@@ -215,6 +218,7 @@ private void undeployExpiredModels(
215218
Map<String, Set<String>> deployingModels
216219
) {
217220
String[] targetNodeIds = getAllNodes(clusterService);
221+
log.debug("Sending requests to undeploy expired models: {}", expiredModels);
218222
MLUndeployModelsRequest mlUndeployModelsRequest = new MLUndeployModelsRequest(
219223
expiredModels.toArray(new String[expiredModels.size()]),
220224
targetNodeIds,
@@ -279,6 +283,7 @@ void initMLConfig() {
279283
@VisibleForTesting
280284
void refreshModelState(Map<String, Set<String>> modelWorkerNodes, Map<String, Set<String>> deployingModels) {
281285
if (!updateModelStateSemaphore.tryAcquire()) {
286+
log.debug("Model state refresh already in progress. Skipping this cycle.");
282287
return;
283288
}
284289
try {

0 commit comments

Comments
 (0)