Skip to content

Commit a8bf518

Browse files
add sdk implementation to the connector search (#3704) (#3706)
Signed-off-by: Dhrubo Saha <dhrubo@amazon.com> (cherry picked from commit 12b0000) Co-authored-by: Dhrubo Saha <dhrubo@amazon.com>
1 parent d440726 commit a8bf518

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

plugin/src/main/java/org/opensearch/ml/action/connector/SearchConnectorTransportAction.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.ml.action.connector;
77

8-
import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD;
98
import static org.opensearch.ml.utils.RestActionUtils.wrapListenerToHandleSearchIndexNotFound;
109

1110
import java.util.ArrayList;
@@ -15,6 +14,7 @@
1514
import java.util.stream.Collectors;
1615

1716
import org.opensearch.ExceptionsHelper;
17+
import org.opensearch.OpenSearchStatusException;
1818
import org.opensearch.action.search.SearchRequest;
1919
import org.opensearch.action.search.SearchResponse;
2020
import org.opensearch.action.search.ShardSearchFailure;
@@ -25,9 +25,8 @@
2525
import org.opensearch.common.util.concurrent.ThreadContext;
2626
import org.opensearch.commons.authuser.User;
2727
import org.opensearch.core.action.ActionListener;
28+
import org.opensearch.core.rest.RestStatus;
2829
import org.opensearch.index.IndexNotFoundException;
29-
import org.opensearch.index.query.BoolQueryBuilder;
30-
import org.opensearch.index.query.QueryBuilders;
3130
import org.opensearch.ml.common.CommonValue;
3231
import org.opensearch.ml.common.connector.HttpConnector;
3332
import org.opensearch.ml.common.transport.connector.MLConnectorSearchAction;
@@ -37,6 +36,8 @@
3736
import org.opensearch.ml.utils.RestActionUtils;
3837
import org.opensearch.ml.utils.TenantAwareHelper;
3938
import org.opensearch.remote.metadata.client.SdkClient;
39+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest;
40+
import org.opensearch.remote.metadata.common.SdkClientUtils;
4041
import org.opensearch.search.builder.SearchSourceBuilder;
4142
import org.opensearch.search.fetch.subphase.FetchSourceContext;
4243
import org.opensearch.search.internal.InternalSearchResponse;
@@ -108,22 +109,34 @@ private void search(SearchRequest request, String tenantId, ActionListener<Searc
108109
final ActionListener<SearchResponse> doubleWrappedListener = ActionListener
109110
.wrap(wrappedListener::onResponse, e -> wrapListenerToHandleSearchIndexNotFound(e, wrappedListener));
110111

111-
if (tenantId != null) {
112-
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
113-
if (request.source().query() != null) {
114-
queryBuilder.must(request.source().query());
115-
}
116-
queryBuilder.filter(QueryBuilders.termQuery(TENANT_ID_FIELD, tenantId)); // Replace with your tenant_id field
117-
request.source().query(queryBuilder);
118-
}
119-
120-
if (connectorAccessControlHelper.skipConnectorAccessControl(user)) {
121-
client.search(request, doubleWrappedListener);
122-
} else {
112+
if (!connectorAccessControlHelper.skipConnectorAccessControl(user)) {
123113
SearchSourceBuilder sourceBuilder = connectorAccessControlHelper.addUserBackendRolesFilter(user, request.source());
124114
request.source(sourceBuilder);
125-
client.search(request, doubleWrappedListener);
126115
}
116+
117+
SearchDataObjectRequest searchDataObjectRequest = SearchDataObjectRequest
118+
.builder()
119+
.indices(request.indices())
120+
.searchSourceBuilder(request.source())
121+
.tenantId(tenantId)
122+
.build();
123+
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> {
124+
if (throwable != null) {
125+
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
126+
log.error("Failed to search connector", cause);
127+
doubleWrappedListener.onFailure(cause);
128+
} else {
129+
try {
130+
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
131+
log.info("Connector search complete: {}", searchResponse.getHits().getTotalHits());
132+
doubleWrappedListener.onResponse(searchResponse);
133+
} catch (Exception e) {
134+
log.error("Failed to parse search response", e);
135+
doubleWrappedListener
136+
.onFailure(new OpenSearchStatusException("Failed to parse search response", RestStatus.INTERNAL_SERVER_ERROR));
137+
}
138+
}
139+
});
127140
} catch (Exception e) {
128141
log.error(e.getMessage(), e);
129142
actionListener.onFailure(e);

0 commit comments

Comments
 (0)