Skip to content

Commit 5b8dfbc

Browse files
Fix python client not able to connect to MCP server issue (#3822) (#3828)
Signed-off-by: zane-neo <zaniu@amazon.com> Co-authored-by: Dhrubo Saha <dhrubo@amazon.com> (cherry picked from commit 0cba0ed) Co-authored-by: zane-neo <zaniu@amazon.com>
1 parent 25e3502 commit 5b8dfbc

File tree

2 files changed

+46
-46
lines changed

2 files changed

+46
-46
lines changed

plugin/src/main/java/org/opensearch/ml/action/mcpserver/OpenSearchMcpServerTransportProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public Mono<Void> closeGracefully() {
108108
* Handles new SSE connection requests from clients. Creates a new session for each
109109
* connection and sets up the SSE event stream.
110110
*/
111-
public Mono<HttpChunk> handleSseConnection(StreamingRestChannel channel, boolean ssePrefix, String nodeId, NodeClient client) {
111+
public Mono<HttpChunk> handleSseConnection(StreamingRestChannel channel, boolean appendToBaseUrl, String nodeId, NodeClient client) {
112112
return Mono.create(sink -> {
113113
OpenSearchMcpSessionTransport sessionTransport = new OpenSearchMcpSessionTransport(channel);
114114
McpServerSession session = sessionFactory.create(sessionTransport);
@@ -121,10 +121,10 @@ public Mono<HttpChunk> handleSseConnection(StreamingRestChannel channel, boolean
121121
// Send initial endpoint event
122122
log.debug("Sending initial endpoint event to session: {}", sessionId);
123123
String result;
124-
if (ssePrefix) {
125-
result = String.format(Locale.ROOT, "/sse/message?sessionId=%s", sessionId);
124+
if (appendToBaseUrl) {
125+
result = String.format(Locale.ROOT, "/_plugins/_ml/mcp/sse/message?sessionId=%s", sessionId);
126126
} else {
127-
result = String.format(Locale.ROOT, "/message?sessionId=%s", sessionId);
127+
result = String.format(Locale.ROOT, "/sse/message?sessionId=%s", sessionId);
128128
}
129129
McpAsyncServerHolder.CHANNELS.put(sessionId, channel);
130130
sink.success(createHttpChunk(ENDPOINT_EVENT_TYPE, result));

plugin/src/main/java/org/opensearch/ml/rest/mcpserver/RestMcpConnectionMessageStreamingAction.java

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
8383
}
8484
String path = request.path();
8585
final String sessionId = request.param("sessionId");
86-
final String ssePrefixStr = request.param("sse_prefix");
87-
boolean ssePrefix = Optional.ofNullable(ssePrefixStr).map(x -> Boolean.parseBoolean(ssePrefixStr)).orElse(true);
88-
final StreamingRestChannelConsumer consumer = (channel) -> prepareRequestInternal(path, ssePrefix, sessionId, channel, client);
86+
final String sAppendToBaseUrl = request.param("append_to_base_url");
87+
boolean appendToBaseUrl = Optional.ofNullable(sAppendToBaseUrl).map(x -> Boolean.parseBoolean(sAppendToBaseUrl)).orElse(false);
88+
final StreamingRestChannelConsumer consumer = (channel) -> prepareRequestInternal(
89+
path,
90+
appendToBaseUrl,
91+
sessionId,
92+
channel,
93+
client
94+
);
8995
return channel -> {
9096
if (channel instanceof StreamingRestChannel) {
9197
consumer.accept((StreamingRestChannel) channel);
@@ -101,25 +107,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
101107
@VisibleForTesting
102108
protected void prepareRequestInternal(
103109
final String path,
104-
final boolean ssePrefix,
110+
final boolean appendToBaseUrl,
105111
final String sessionId,
106112
final StreamingRestChannel channel,
107113
final NodeClient client
108114
) {
109-
channel
110-
.prepareResponse(
111-
RestStatus.OK,
112-
Map
113-
.of(
114-
"Content-Type",
115-
List.of("text/event-stream"),
116-
"Cache-Control",
117-
List.of("no-cache"),
118-
"Connection",
119-
List.of("keep-alive")
120-
)
121-
);
122115
if (path.equals(SSE_ENDPOINT)) {
116+
channel
117+
.prepareResponse(
118+
RestStatus.OK,
119+
Map
120+
.of(
121+
"Content-Type",
122+
List.of("text/event-stream"),
123+
"Cache-Control",
124+
List.of("no-cache"),
125+
"Connection",
126+
List.of("keep-alive")
127+
)
128+
);
123129
// The connection request doesn't have request body, but we're still reading the request body content,
124130
// The reason is that the response producer is created when http channel is been read, so here
125131
// we subscribe to channel to trigger the channel read, but ignoring the content.
@@ -129,7 +135,7 @@ protected void prepareRequestInternal(
129135
.map(HttpChunk::content)
130136
.flatMap(
131137
x -> McpAsyncServerHolder.mcpServerTransportProvider
132-
.handleSseConnection(channel, ssePrefix, clusterService.localNode().getId(), client)
138+
.handleSseConnection(channel, appendToBaseUrl, clusterService.localNode().getId(), client)
133139
)
134140
.flatMap(y -> Mono.fromRunnable(() -> {
135141
log.debug("starting to send sse connection chunk result");
@@ -145,6 +151,7 @@ protected void prepareRequestInternal(
145151
}))
146152
.subscribe();
147153
} else if (path.equals(MESSAGE_ENDPOINT)) {
154+
channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of("text/plain")));
148155
if (sessionId == null) {
149156
try {
150157
channel
@@ -183,13 +190,8 @@ protected void prepareRequestInternal(
183190
McpAsyncServerHolder.mcpServerTransportProvider
184191
.handleMessage(sessionId, requestBody)
185192
.doOnSuccess(y -> {
186-
if (requestBody.contains("notifications/initialized")) {
187-
log
188-
.debug(
189-
"Starting to send OK response for notifications/initialized request in local node"
190-
);
191-
channel.sendChunk(createInitializedNotificationRes());
192-
}
193+
log.debug("Starting to send rest response to client in local node");
194+
channel.sendChunk(createRestResponse());
193195
})
194196
.onErrorResume(e -> Mono.fromRunnable(() -> {
195197
try {
@@ -207,27 +209,25 @@ protected void prepareRequestInternal(
207209
} else {
208210
ActionListener<AcknowledgedResponse> actionListener = ActionListener.wrap(y -> {
209211
if (y.isAcknowledged()) {
210-
log
211-
.debug(
212-
"MCP request has been dispatched to corresponding node and handled successfully!"
213-
);
214-
if (requestBody.contains("notifications/initialized")) {
215-
log
216-
.debug(
217-
"Starting to send OK response for notifications/initialized request in coordinator node"
218-
);
219-
channel.sendChunk(createInitializedNotificationRes());
220-
}
212+
log.debug("Starting to send rest response to client as peer node returns successfully");
213+
channel.sendChunk(createRestResponse());
221214
}
222-
},
223-
e -> {
215+
}, e -> {
216+
log
217+
.error(
218+
"MCP request has been dispatched to corresponding node but peer node failed to handle it",
219+
e
220+
);
221+
try {
222+
channel.sendResponse(new BytesRestResponse(channel, e));
223+
} catch (IOException ex) {
224224
log
225225
.error(
226-
"MCP request has been dispatched to corresponding node but peer node failed to handle it",
227-
e
226+
"Failed to send exception response to client during message handling in remote node due to IOException, nodeId: {}",
227+
nodeId
228228
);
229229
}
230-
);
230+
});
231231
client
232232
.execute(
233233
MLMcpMessageAction.INSTANCE,
@@ -279,7 +279,7 @@ protected void prepareRequestInternal(
279279
}
280280
}
281281

282-
private HttpChunk createInitializedNotificationRes() {
282+
private HttpChunk createRestResponse() {
283283
return new HttpChunk() {
284284
@Override
285285
public boolean isLast() {

0 commit comments

Comments
 (0)