Skip to content

Commit 22f258b

Browse files
Run the Get Connector request in a stashed threadcontext (#3492) (#3497)
* Run the Get Connector request in a stashed threadcontext Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add restore context Signed-off-by: Craig Perkins <cwperx@amazon.com> --------- Signed-off-by: Craig Perkins <cwperx@amazon.com> (cherry picked from commit 09bf858) Co-authored-by: Craig Perkins <cwperx@amazon.com>
1 parent 72d370c commit 22f258b

File tree

1 file changed

+34
-30
lines changed

1 file changed

+34
-30
lines changed

plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2074,40 +2074,44 @@ public void getConnector(String connectorId, String tenantId, ActionListener<Con
20742074
.tenantId(tenantId)
20752075
.build();
20762076

2077-
sdkClient.getDataObjectAsync(getDataObjectRequest).whenComplete((r, throwable) -> {
2078-
log.debug("Completed Get Connector Request, id:{}", connectorId);
2079-
if (throwable != null) {
2080-
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
2081-
if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) {
2082-
log.error("Failed to get connector index", cause);
2083-
listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND));
2077+
try (ThreadContext.StoredContext ctx = client.threadPool().getThreadContext().stashContext()) {
2078+
sdkClient.getDataObjectAsync(getDataObjectRequest).whenComplete((r, throwable) -> {
2079+
log.debug("Completed Get Connector Request, id:{}", connectorId);
2080+
ctx.restore();
2081+
if (throwable != null) {
2082+
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
2083+
if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) {
2084+
log.error("Failed to get connector index", cause);
2085+
listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND));
2086+
} else {
2087+
log.error("Failed to get ML connector {}", connectorId, cause);
2088+
listener.onFailure(cause);
2089+
}
20842090
} else {
2085-
log.error("Failed to get ML connector {}", connectorId, cause);
2086-
listener.onFailure(cause);
2087-
}
2088-
} else {
2089-
try {
2090-
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
2091-
if (gr != null && gr.isExists()) {
2092-
try (
2093-
XContentParser parser = MLNodeUtils
2094-
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef())
2095-
) {
2096-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
2097-
Connector connector = Connector.createConnector(parser);
2098-
listener.onResponse(connector);
2099-
} catch (Exception e) {
2100-
log.error("Failed to parse connector:{}", connectorId);
2101-
listener.onFailure(e);
2091+
try {
2092+
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
2093+
if (gr != null && gr.isExists()) {
2094+
try (
2095+
XContentParser parser = MLNodeUtils
2096+
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef())
2097+
) {
2098+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
2099+
Connector connector = Connector.createConnector(parser);
2100+
listener.onResponse(connector);
2101+
} catch (Exception e) {
2102+
log.error("Failed to parse connector:{}", connectorId);
2103+
listener.onFailure(e);
2104+
}
2105+
} else {
2106+
listener
2107+
.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
21022108
}
2103-
} else {
2104-
listener.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
2109+
} catch (Exception e) {
2110+
listener.onFailure(e);
21052111
}
2106-
} catch (Exception e) {
2107-
listener.onFailure(e);
21082112
}
2109-
}
2110-
});
2113+
});
2114+
}
21112115
}
21122116

21132117
/**

0 commit comments

Comments
 (0)