Skip to content

Commit 12b0000

Browse files
authored
add sdk implementation to the connector search (#3704)
Signed-off-by: Dhrubo Saha <dhrubo@amazon.com>
1 parent 82f7eb7 commit 12b0000

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

plugin/src/main/java/org/opensearch/ml/action/connector/SearchConnectorTransportAction.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.ml.action.connector;
77

8-
import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD;
98
import static org.opensearch.ml.utils.RestActionUtils.wrapListenerToHandleSearchIndexNotFound;
109

1110
import java.util.ArrayList;
@@ -15,6 +14,7 @@
1514
import java.util.stream.Collectors;
1615

1716
import org.opensearch.ExceptionsHelper;
17+
import org.opensearch.OpenSearchStatusException;
1818
import org.opensearch.action.search.SearchRequest;
1919
import org.opensearch.action.search.SearchResponse;
2020
import org.opensearch.action.search.ShardSearchFailure;
@@ -24,9 +24,8 @@
2424
import org.opensearch.common.util.concurrent.ThreadContext;
2525
import org.opensearch.commons.authuser.User;
2626
import org.opensearch.core.action.ActionListener;
27+
import org.opensearch.core.rest.RestStatus;
2728
import org.opensearch.index.IndexNotFoundException;
28-
import org.opensearch.index.query.BoolQueryBuilder;
29-
import org.opensearch.index.query.QueryBuilders;
3029
import org.opensearch.ml.common.CommonValue;
3130
import org.opensearch.ml.common.connector.HttpConnector;
3231
import org.opensearch.ml.common.transport.connector.MLConnectorSearchAction;
@@ -36,6 +35,8 @@
3635
import org.opensearch.ml.utils.RestActionUtils;
3736
import org.opensearch.ml.utils.TenantAwareHelper;
3837
import org.opensearch.remote.metadata.client.SdkClient;
38+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest;
39+
import org.opensearch.remote.metadata.common.SdkClientUtils;
3940
import org.opensearch.search.builder.SearchSourceBuilder;
4041
import org.opensearch.search.fetch.subphase.FetchSourceContext;
4142
import org.opensearch.search.internal.InternalSearchResponse;
@@ -108,22 +109,34 @@ private void search(SearchRequest request, String tenantId, ActionListener<Searc
108109
final ActionListener<SearchResponse> doubleWrappedListener = ActionListener
109110
.wrap(wrappedListener::onResponse, e -> wrapListenerToHandleSearchIndexNotFound(e, wrappedListener));
110111

111-
if (tenantId != null) {
112-
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
113-
if (request.source().query() != null) {
114-
queryBuilder.must(request.source().query());
115-
}
116-
queryBuilder.filter(QueryBuilders.termQuery(TENANT_ID_FIELD, tenantId)); // Replace with your tenant_id field
117-
request.source().query(queryBuilder);
118-
}
119-
120-
if (connectorAccessControlHelper.skipConnectorAccessControl(user)) {
121-
client.search(request, doubleWrappedListener);
122-
} else {
112+
if (!connectorAccessControlHelper.skipConnectorAccessControl(user)) {
123113
SearchSourceBuilder sourceBuilder = connectorAccessControlHelper.addUserBackendRolesFilter(user, request.source());
124114
request.source(sourceBuilder);
125-
client.search(request, doubleWrappedListener);
126115
}
116+
117+
SearchDataObjectRequest searchDataObjectRequest = SearchDataObjectRequest
118+
.builder()
119+
.indices(request.indices())
120+
.searchSourceBuilder(request.source())
121+
.tenantId(tenantId)
122+
.build();
123+
sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> {
124+
if (throwable != null) {
125+
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
126+
log.error("Failed to search connector", cause);
127+
doubleWrappedListener.onFailure(cause);
128+
} else {
129+
try {
130+
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
131+
log.info("Connector search complete: {}", searchResponse.getHits().getTotalHits());
132+
doubleWrappedListener.onResponse(searchResponse);
133+
} catch (Exception e) {
134+
log.error("Failed to parse search response", e);
135+
doubleWrappedListener
136+
.onFailure(new OpenSearchStatusException("Failed to parse search response", RestStatus.INTERNAL_SERVER_ERROR));
137+
}
138+
}
139+
});
127140
} catch (Exception e) {
128141
log.error(e.getMessage(), e);
129142
actionListener.onFailure(e);

0 commit comments

Comments
 (0)