Skip to content

Commit 6ec5073

Browse files
Directly return Response objects from metadata client responses (#3768) (#3770)
* Update search calls to directly use search response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update get calls to directly use get response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update delete calls to directly use delete response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update index calls to directly use index response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update update calls to directly use update response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update bulk call to directly use bulk response without parsing Signed-off-by: Daniel Widdis <widdis@gmail.com> --------- Signed-off-by: Daniel Widdis <widdis@gmail.com> (cherry picked from commit 8ba07fe) Co-authored-by: Daniel Widdis <widdis@gmail.com>
1 parent 8617295 commit 6ec5073

24 files changed

+61
-154
lines changed

plugin/src/main/java/org/opensearch/ml/action/agents/DeleteAgentTransportAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
109109
}
110110
} else {
111111
try {
112-
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
112+
GetResponse gr = r.getResponse();
113113
assert gr != null;
114114
if (gr.isExists()) {
115115
try (
@@ -186,7 +186,7 @@ private void handleDeleteResponse(
186186
actionListener.onFailure(cause);
187187
} else {
188188
try {
189-
DeleteResponse deleteResponse = DeleteResponse.fromXContent(response.parser());
189+
DeleteResponse deleteResponse = response.deleteResponse();
190190
log.info("Agent deletion result: {}, agent id: {}", deleteResponse.getResult(), response.id());
191191
actionListener.onResponse(deleteResponse);
192192
} catch (Exception e) {

plugin/src/main/java/org/opensearch/ml/action/agents/GetAgentTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLAgen
109109
}
110110
} else {
111111
try {
112-
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
112+
GetResponse gr = r.getResponse();
113113
if (gr != null && gr.isExists()) {
114114
try (
115115
XContentParser parser = jsonXContent

plugin/src/main/java/org/opensearch/ml/action/agents/TransportRegisterAgentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private void registerAgentToIndex(MLAgent mlAgent, String tenantId, ActionListen
150150
listener.onFailure(cause);
151151
} else {
152152
try {
153-
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
153+
IndexResponse indexResponse = r.indexResponse();
154154
log.info("Agent creation result: {}, Agent id: {}", indexResponse.getResult(), indexResponse.getId());
155155
MLRegisterAgentResponse response = new MLRegisterAgentResponse(r.id());
156156
listener.onResponse(response);

plugin/src/main/java/org/opensearch/ml/action/agents/TransportSearchAgentAction.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77

88
import static org.opensearch.ml.action.handler.MLSearchHandler.wrapRestActionListener;
99

10-
import org.opensearch.OpenSearchStatusException;
1110
import org.opensearch.action.search.SearchRequest;
1211
import org.opensearch.action.search.SearchResponse;
1312
import org.opensearch.action.support.ActionFilters;
1413
import org.opensearch.action.support.HandledTransportAction;
1514
import org.opensearch.common.inject.Inject;
1615
import org.opensearch.common.util.concurrent.ThreadContext;
1716
import org.opensearch.core.action.ActionListener;
18-
import org.opensearch.core.rest.RestStatus;
1917
import org.opensearch.index.query.BoolQueryBuilder;
2018
import org.opensearch.index.query.QueryBuilders;
2119
import org.opensearch.ml.common.CommonValue;
@@ -96,25 +94,7 @@ private void search(SearchRequest request, String tenantId, ActionListener<Searc
9694
.tenantId(tenantId)
9795
.build();
9896

99-
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> {
100-
if (throwable != null) {
101-
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable, OpenSearchStatusException.class);
102-
log.error("Failed to search agent", cause);
103-
wrappedListener.onFailure(cause);
104-
} else {
105-
try {
106-
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
107-
log.info("Agent search complete: {}", searchResponse.getHits().getTotalHits());
108-
wrappedListener.onResponse(searchResponse);
109-
} catch (Exception e) {
110-
log.error("Failed to parse model search response", e);
111-
wrappedListener
112-
.onFailure(
113-
new OpenSearchStatusException("Failed to parse model search response", RestStatus.INTERNAL_SERVER_ERROR)
114-
);
115-
}
116-
}
117-
});
97+
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete(SdkClientUtils.wrapSearchCompletion(wrappedListener));
11898
} catch (Exception e) {
11999
log.error("failed to search the agent index", e);
120100
actionListener.onFailure(e);

plugin/src/main/java/org/opensearch/ml/action/connector/DeleteConnectorTransportAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
99
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1010

11-
import java.io.IOException;
1211
import java.util.ArrayList;
1312
import java.util.Arrays;
1413
import java.util.List;
@@ -154,7 +153,8 @@ private void handleSearchResponse(
154153
}
155154

156155
try {
157-
SearchResponse response = SearchResponse.fromXContent(searchResponse.parser());
156+
SearchResponse response = searchResponse.searchResponse();
157+
// Parsing failure would produce NPE on next line
158158
SearchHit[] searchHits = response.getHits().getHits();
159159

160160
if (searchHits.length == 0) {
@@ -220,10 +220,10 @@ private void handleDeleteResponse(
220220
actionListener.onFailure(cause);
221221
} else {
222222
try {
223-
DeleteResponse deleteResponse = DeleteResponse.fromXContent(response.parser());
223+
DeleteResponse deleteResponse = response.deleteResponse();
224224
log.info("Connector deletion result: {}, connector id: {}", deleteResponse.getResult(), response.id());
225225
actionListener.onResponse(deleteResponse);
226-
} catch (IOException e) {
226+
} catch (Exception e) {
227227
actionListener.onFailure(e);
228228
}
229229
}

plugin/src/main/java/org/opensearch/ml/action/connector/TransportCreateConnectorAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
1010
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_TRUSTED_CONNECTOR_ENDPOINTS_REGEX;
1111

12-
import java.io.IOException;
1312
import java.time.Instant;
1413
import java.util.HashSet;
1514
import java.util.List;
@@ -179,15 +178,15 @@ private void indexConnector(Connector connector, ActionListener<MLCreateConnecto
179178
listener.onFailure(cause);
180179
} else {
181180
try {
182-
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
181+
IndexResponse indexResponse = r.indexResponse();
183182
log
184183
.info(
185184
"Connector creation result: {}, connector id: {}",
186185
indexResponse.getResult(),
187186
indexResponse.getId()
188187
);
189188
listener.onResponse(new MLCreateConnectorResponse(indexResponse.getId()));
190-
} catch (IOException e) {
189+
} catch (Exception e) {
191190
listener.onFailure(e);
192191
}
193192
}

plugin/src/main/java/org/opensearch/ml/action/connector/UpdateConnectorTransportAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1010
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_TRUSTED_CONNECTOR_ENDPOINTS_REGEX;
1111

12-
import java.io.IOException;
1312
import java.time.Instant;
1413
import java.util.ArrayList;
1514
import java.util.Arrays;
@@ -177,7 +176,8 @@ private void updateUndeployedConnector(
177176
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((sr, st) -> {
178177
if (sr != null) {
179178
try {
180-
SearchResponse searchResponse = SearchResponse.fromXContent(sr.parser());
179+
SearchResponse searchResponse = sr.searchResponse();
180+
// Parsing failure would cause NPE on next line
181181
SearchHit[] searchHits = searchResponse.getHits().getHits();
182182
if (searchHits.length == 0) {
183183
sdkClient.updateDataObjectAsync(updateDataObjectRequest).whenComplete((r, throwable) -> {
@@ -227,9 +227,8 @@ private void handleUpdateDataObjectCompletionStage(
227227
updateListener.onFailure(cause);
228228
} else {
229229
try {
230-
UpdateResponse updateResponse = r.parser() == null ? null : UpdateResponse.fromXContent(r.parser());
231-
updateListener.onResponse(updateResponse);
232-
} catch (IOException e) {
230+
updateListener.onResponse(r.updateResponse());
231+
} catch (Exception e) {
233232
updateListener.onFailure(e);
234233
}
235234
}

plugin/src/main/java/org/opensearch/ml/action/handler/MLSearchHandler.java

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,9 @@ public void search(SdkClient sdkClient, SearchRequest request, String tenantId,
140140
.searchSourceBuilder(request.source())
141141
.tenantId(tenantId)
142142
.build();
143-
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> {
144-
if (throwable == null) {
145-
try {
146-
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
147-
log.info("Model search complete: {}", searchResponse.getHits().getTotalHits());
148-
doubleWrapperListener.onResponse(searchResponse);
149-
} catch (Exception e) {
150-
doubleWrapperListener.onFailure(e);
151-
}
152-
} else {
153-
Exception e = SdkClientUtils.unwrapAndConvertToException(throwable, OpenSearchStatusException.class);
154-
doubleWrapperListener.onFailure(e);
155-
}
156-
});
143+
sdkClient
144+
.searchDataObjectAsync(searchDataObjectRequest)
145+
.whenComplete(SdkClientUtils.wrapSearchCompletion(doubleWrapperListener));
157146
} else {
158147
SearchSourceBuilder sourceBuilder = modelAccessControlHelper.createSearchSourceBuilder(user);
159148
SearchRequest modelGroupSearchRequest = new SearchRequest();
@@ -182,20 +171,9 @@ public void search(SdkClient sdkClient, SearchRequest request, String tenantId,
182171
.searchSourceBuilder(request.source())
183172
.tenantId(tenantId)
184173
.build();
185-
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((sr, throwable) -> {
186-
if (throwable == null) {
187-
try {
188-
SearchResponse searchResponse = SearchResponse.fromXContent(sr.parser());
189-
log.info("Model search complete: {}", searchResponse.getHits().getTotalHits());
190-
doubleWrapperListener.onResponse(searchResponse);
191-
} catch (Exception e) {
192-
doubleWrapperListener.onFailure(e);
193-
}
194-
} else {
195-
Exception e = SdkClientUtils.unwrapAndConvertToException(throwable, OpenSearchStatusException.class);
196-
doubleWrapperListener.onFailure(e);
197-
}
198-
});
174+
sdkClient
175+
.searchDataObjectAsync(searchDataObjectRequest)
176+
.whenComplete(SdkClientUtils.wrapSearchCompletion(doubleWrapperListener));
199177
}, e -> {
200178
log.error("Fail to search model groups!", e);
201179
wrappedListener.onFailure(e);
@@ -206,20 +184,9 @@ public void search(SdkClient sdkClient, SearchRequest request, String tenantId,
206184
.searchSourceBuilder(modelGroupSearchRequest.source())
207185
.tenantId(tenantId)
208186
.build();
209-
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> {
210-
if (throwable == null) {
211-
try {
212-
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
213-
log.info("Model search complete: {}", searchResponse.getHits().getTotalHits());
214-
modelGroupSearchActionListener.onResponse(searchResponse);
215-
} catch (Exception e) {
216-
modelGroupSearchActionListener.onFailure(e);
217-
}
218-
} else {
219-
Exception e = SdkClientUtils.unwrapAndConvertToException(throwable, OpenSearchStatusException.class);
220-
modelGroupSearchActionListener.onFailure(e);
221-
}
222-
});
187+
sdkClient
188+
.searchDataObjectAsync(searchDataObjectRequest)
189+
.whenComplete(SdkClientUtils.wrapSearchCompletion(modelGroupSearchActionListener));
223190
}
224191
} catch (Exception e) {
225192
log.error(e.getMessage(), e);

plugin/src/main/java/org/opensearch/ml/action/model_group/DeleteModelGroupTransportAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1010
import static org.opensearch.ml.utils.RestActionUtils.PARAMETER_MODEL_GROUP_ID;
1111

12-
import java.io.IOException;
13-
1412
import org.opensearch.ExceptionsHelper;
1513
import org.opensearch.OpenSearchStatusException;
1614
import org.opensearch.action.ActionRequest;
@@ -156,7 +154,8 @@ private void handleModelSearchResponse(
156154
}
157155

158156
try {
159-
SearchResponse response = SearchResponse.fromXContent(searchResponse.parser());
157+
SearchResponse response = searchResponse.searchResponse();
158+
// Parsing failure would cause NPE on next line
160159
if (response.getHits().getHits().length == 0) {
161160
DeleteRequest deleteRequest = new DeleteRequest(ML_MODEL_GROUP_INDEX, modelGroupId);
162161
deleteModelGroup(deleteRequest, tenantId, listener);
@@ -210,10 +209,10 @@ private void handleDeleteResponse(
210209
actionListener.onFailure(cause);
211210
} else {
212211
try {
213-
DeleteResponse deleteResponse = DeleteResponse.fromXContent(response.parser());
212+
DeleteResponse deleteResponse = response.deleteResponse();
214213
log.debug("Completed Delete Model Group Request, model group id:{} deleted", response.id());
215214
actionListener.onResponse(deleteResponse);
216-
} catch (IOException e) {
215+
} catch (Exception e) {
217216
actionListener.onFailure(e);
218217
}
219218
}

plugin/src/main/java/org/opensearch/ml/action/model_group/GetModelGroupTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private void processResponse(
146146
ActionListener<MLModelGroupGetResponse> wrappedListener
147147
) {
148148
try {
149-
GetResponse gr = getDataObjectResponse.parser() == null ? null : GetResponse.fromXContent(getDataObjectResponse.parser());
149+
GetResponse gr = getDataObjectResponse.getResponse();
150150
if (gr != null && gr.isExists()) {
151151
try (
152152
XContentParser parser = jsonXContent

0 commit comments

Comments
 (0)