1
+ package io .modelcontextprotocol .client ;
2
+
3
+ import java .time .Duration ;
4
+ import java .util .ArrayList ;
5
+ import java .util .Collections ;
6
+ import java .util .List ;
7
+ import java .util .concurrent .atomic .AtomicReference ;
8
+ import java .util .function .Function ;
9
+
10
+ import org .slf4j .Logger ;
11
+ import org .slf4j .LoggerFactory ;
12
+
13
+ import io .modelcontextprotocol .spec .McpClientSession ;
14
+ import io .modelcontextprotocol .spec .McpError ;
15
+ import io .modelcontextprotocol .spec .McpSchema ;
16
+ import io .modelcontextprotocol .spec .McpTransportSessionNotFoundException ;
17
+ import io .modelcontextprotocol .util .Assert ;
18
+ import reactor .core .publisher .Mono ;
19
+ import reactor .core .publisher .Sinks ;
20
+ import reactor .util .context .ContextView ;
21
+
22
+ /**
23
+ * <b>Handles the protocol initialization phase between client and server</b>
24
+ *
25
+ * <p>
26
+ * The initialization phase MUST be the first interaction between client and server.
27
+ * During this phase, the client and server perform the following operations:
28
+ * <ul>
29
+ * <li>Establish protocol version compatibility</li>
30
+ * <li>Exchange and negotiate capabilities</li>
31
+ * <li>Share implementation details</li>
32
+ * </ul>
33
+ *
34
+ * <b>Client Initialization Process</b>
35
+ * <p>
36
+ * The client MUST initiate this phase by sending an initialize request containing:
37
+ * <ul>
38
+ * <li>Protocol version supported</li>
39
+ * <li>Client capabilities</li>
40
+ * <li>Client implementation information</li>
41
+ * </ul>
42
+ *
43
+ * <p>
44
+ * After successful initialization, the client MUST send an initialized notification to
45
+ * indicate it is ready to begin normal operations.
46
+ *
47
+ * <b>Server Response</b>
48
+ * <p>
49
+ * The server MUST respond with its own capabilities and information.
50
+ *
51
+ * <b>Protocol Version Negotiation</b>
52
+ * <p>
53
+ * In the initialize request, the client MUST send a protocol version it supports. This
54
+ * SHOULD be the latest version supported by the client.
55
+ *
56
+ * <p>
57
+ * If the server supports the requested protocol version, it MUST respond with the same
58
+ * version. Otherwise, the server MUST respond with another protocol version it supports.
59
+ * This SHOULD be the latest version supported by the server.
60
+ *
61
+ * <p>
62
+ * If the client does not support the version in the server's response, it SHOULD
63
+ * disconnect.
64
+ *
65
+ * <b>Request Restrictions</b>
66
+ * <p>
67
+ * <strong>Important:</strong> The following restrictions apply during initialization:
68
+ * <ul>
69
+ * <li>The client SHOULD NOT send requests other than pings before the server has
70
+ * responded to the initialize request</li>
71
+ * <li>The server SHOULD NOT send requests other than pings and logging before receiving
72
+ * the initialized notification</li>
73
+ * </ul>
74
+ */
75
+ public class LifecyleInitializer {
76
+
77
+ private static final Logger logger = LoggerFactory .getLogger (LifecyleInitializer .class );
78
+
79
+ /**
80
+ * The MCP session supplier that manages bidirectional JSON-RPC communication between
81
+ * clients and servers.
82
+ */
83
+ private final Function <ContextView , McpClientSession > sessionSupplier ;
84
+
85
+ private final McpSchema .ClientCapabilities clientCapabilities ;
86
+
87
+ private final McpSchema .Implementation clientInfo ;
88
+
89
+ private List <String > protocolVersions ;
90
+
91
+ private final AtomicReference <DefaultInitialization > initializationRef = new AtomicReference <>();
92
+
93
+ /**
94
+ * The max timeout to await for the client-server connection to be initialized.
95
+ */
96
+ private final Duration initializationTimeout ;
97
+
98
+ public LifecyleInitializer (McpSchema .ClientCapabilities clientCapabilities , McpSchema .Implementation clientInfo ,
99
+ List <String > protocolVersions , Duration initializationTimeout ,
100
+ Function <ContextView , McpClientSession > sessionSupplier ) {
101
+
102
+ Assert .notNull (sessionSupplier , "Session supplier must not be null" );
103
+ Assert .notNull (clientCapabilities , "Client capabilities must not be null" );
104
+ Assert .notNull (clientInfo , "Client info must not be null" );
105
+ Assert .notEmpty (protocolVersions , "Protocol versions must not be empty" );
106
+ Assert .notNull (initializationTimeout , "Initialization timeout must not be null" );
107
+
108
+ this .sessionSupplier = sessionSupplier ;
109
+ this .clientCapabilities = clientCapabilities ;
110
+ this .clientInfo = clientInfo ;
111
+ this .protocolVersions = Collections .unmodifiableList (new ArrayList <>(protocolVersions ));
112
+ this .initializationTimeout = initializationTimeout ;
113
+ }
114
+
115
+ /**
116
+ * This method is package-private and used for test only. Should not be called by user
117
+ * code.
118
+ * @param protocolVersions the Client supported protocol versions.
119
+ */
120
+ void setProtocolVersions (List <String > protocolVersions ) {
121
+ this .protocolVersions = protocolVersions ;
122
+ }
123
+
124
+ /**
125
+ * Represents the initialization state of the MCP client.
126
+ */
127
+ interface Initialization {
128
+
129
+ /**
130
+ * Returns the MCP client session that is used to communicate with the server.
131
+ * This session is established during the initialization process and is used for
132
+ * sending requests and notifications.
133
+ * @return The MCP client session
134
+ */
135
+ McpClientSession mcpSession ();
136
+
137
+ /**
138
+ * Returns the result of the MCP initialization process. This result contains
139
+ * information about the protocol version, capabilities, server info, and
140
+ * instructions provided by the server during the initialization phase.
141
+ * @return The result of the MCP initialization process
142
+ */
143
+ McpSchema .InitializeResult initializeResult ();
144
+
145
+ }
146
+
147
+ /**
148
+ * Default implementation of the {@link Initialization} interface that manages the MCP
149
+ * client initialization process.
150
+ */
151
+ private static class DefaultInitialization implements Initialization {
152
+
153
+ /**
154
+ * A sink that emits the result of the MCP initialization process. It allows
155
+ * subscribers to wait for the initialization to complete.
156
+ */
157
+ private final Sinks .One <McpSchema .InitializeResult > initSink ;
158
+
159
+ /**
160
+ * Holds the result of the MCP initialization process. It is used to cache the
161
+ * result for future requests.
162
+ */
163
+ private final AtomicReference <McpSchema .InitializeResult > result ;
164
+
165
+ /**
166
+ * Holds the MCP client session that is used to communicate with the server. It is
167
+ * set during the initialization process and used for sending requests and
168
+ * notifications.
169
+ */
170
+ private final AtomicReference <McpClientSession > mcpClientSession ;
171
+
172
+ private DefaultInitialization () {
173
+ this .initSink = Sinks .one ();
174
+ this .result = new AtomicReference <>();
175
+ this .mcpClientSession = new AtomicReference <>();
176
+ }
177
+
178
+ // ---------------------------------------------------
179
+ // Public access for mcpSession and initializeResult because they are
180
+ // used in by the McpAsyncClient.
181
+ // ----------------------------------------------------
182
+ public McpClientSession mcpSession () {
183
+ return this .mcpClientSession .get ();
184
+ }
185
+
186
+ public McpSchema .InitializeResult initializeResult () {
187
+ return this .result .get ();
188
+ }
189
+
190
+ // ---------------------------------------------------
191
+ // Private accessors used internally by the LifecycleInitializer to set the MCP
192
+ // client session and complete the initialization process.
193
+ // ---------------------------------------------------
194
+ private void setMcpClientSession (McpClientSession mcpClientSession ) {
195
+ this .mcpClientSession .set (mcpClientSession );
196
+ }
197
+
198
+ /**
199
+ * Returns a Mono that completes when the MCP client initialization is complete.
200
+ * This allows subscribers to wait for the initialization to finish before
201
+ * proceeding with further operations.
202
+ * @return A Mono that emits the result of the MCP initialization process
203
+ */
204
+ private Mono <McpSchema .InitializeResult > await () {
205
+ return this .initSink .asMono ();
206
+ }
207
+
208
+ /**
209
+ * Completes the initialization process with the given result. It caches the
210
+ * result and emits it to all subscribers waiting for the initialization to
211
+ * complete.
212
+ * @param initializeResult The result of the MCP initialization process
213
+ */
214
+ private void complete (McpSchema .InitializeResult initializeResult ) {
215
+ // first ensure the result is cached
216
+ this .result .set (initializeResult );
217
+ // inform all the subscribers waiting for the initialization
218
+ this .initSink .emitValue (initializeResult , Sinks .EmitFailureHandler .FAIL_FAST );
219
+ }
220
+
221
+ private void error (Throwable t ) {
222
+ this .initSink .emitError (t , Sinks .EmitFailureHandler .FAIL_FAST );
223
+ }
224
+
225
+ private void close () {
226
+ this .mcpSession ().close ();
227
+ }
228
+
229
+ private Mono <Void > closeGracefully () {
230
+ return this .mcpSession ().closeGracefully ();
231
+ }
232
+
233
+ }
234
+
235
+ public boolean isInitialized () {
236
+ return currentInitializationResult () != null ;
237
+ }
238
+
239
+ public McpSchema .InitializeResult currentInitializationResult () {
240
+ DefaultInitialization current = this .initializationRef .get ();
241
+ McpSchema .InitializeResult initializeResult = current != null ? current .result .get () : null ;
242
+ return initializeResult ;
243
+ }
244
+
245
+ /**
246
+ * Hook to handle exceptions that occur during the MCP transport session.
247
+ * <p>
248
+ * If the exception is a {@link McpTransportSessionNotFoundException}, it indicates
249
+ * that the session was not found, and we should re-initialize the client.
250
+ * </p>
251
+ * @param t The exception to handle
252
+ */
253
+ public void handleException (Throwable t ) {
254
+ logger .warn ("Handling exception" , t );
255
+ if (t instanceof McpTransportSessionNotFoundException ) {
256
+ DefaultInitialization previous = this .initializationRef .getAndSet (null );
257
+ if (previous != null ) {
258
+ previous .close ();
259
+ }
260
+ // Providing an empty operation since we are only interested in triggering
261
+ // the implicit initialization step.
262
+ withIntitialization ("re-initializing" , result -> Mono .empty ()).subscribe ();
263
+ }
264
+ }
265
+
266
+ /**
267
+ * Utility method to ensure the initialization is established before executing an
268
+ * operation.
269
+ * @param <T> The type of the result Mono
270
+ * @param actionName The action to perform when the client is initialized
271
+ * @param operation The operation to execute when the client is initialized
272
+ * @return A Mono that completes with the result of the operation
273
+ */
274
+ public <T > Mono <T > withIntitialization (String actionName , Function <Initialization , Mono <T >> operation ) {
275
+ return Mono .deferContextual (ctx -> {
276
+ DefaultInitialization newInit = new DefaultInitialization ();
277
+ DefaultInitialization previous = this .initializationRef .compareAndExchange (null , newInit );
278
+
279
+ boolean needsToInitialize = previous == null ;
280
+ logger .debug (needsToInitialize ? "Initialization process started" : "Joining previous initialization" );
281
+
282
+ Mono <McpSchema .InitializeResult > initializationJob = needsToInitialize ? doInitialize (newInit , ctx )
283
+ : previous .await ();
284
+
285
+ return initializationJob .map (initializeResult -> this .initializationRef .get ())
286
+ .timeout (this .initializationTimeout )
287
+ .onErrorResume (ex -> {
288
+ logger .warn ("Failed to initialize" , ex );
289
+ return Mono .error (new McpError ("Client failed to initialize " + actionName ));
290
+ })
291
+ .flatMap (operation );
292
+ });
293
+ }
294
+
295
+ private Mono <McpSchema .InitializeResult > doInitialize (DefaultInitialization initialization , ContextView ctx ) {
296
+ initialization .setMcpClientSession (this .sessionSupplier .apply (ctx ));
297
+
298
+ McpClientSession mcpClientSession = initialization .mcpSession ();
299
+
300
+ String latestVersion = this .protocolVersions .get (this .protocolVersions .size () - 1 );
301
+
302
+ McpSchema .InitializeRequest initializeRequest = new McpSchema .InitializeRequest (latestVersion ,
303
+ this .clientCapabilities , this .clientInfo );
304
+
305
+ Mono <McpSchema .InitializeResult > result = mcpClientSession .sendRequest (McpSchema .METHOD_INITIALIZE ,
306
+ initializeRequest , McpAsyncClient .INITIALIZE_RESULT_TYPE_REF );
307
+
308
+ return result .flatMap (initializeResult -> {
309
+ logger .info ("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}" ,
310
+ initializeResult .protocolVersion (), initializeResult .capabilities (), initializeResult .serverInfo (),
311
+ initializeResult .instructions ());
312
+
313
+ if (!this .protocolVersions .contains (initializeResult .protocolVersion ())) {
314
+ return Mono .error (new McpError (
315
+ "Unsupported protocol version from the server: " + initializeResult .protocolVersion ()));
316
+ }
317
+
318
+ return mcpClientSession .sendNotification (McpSchema .METHOD_NOTIFICATION_INITIALIZED , null )
319
+ .thenReturn (initializeResult );
320
+ }).doOnNext (initialization ::complete ).onErrorResume (ex -> {
321
+ initialization .error (ex );
322
+ return Mono .error (ex );
323
+ });
324
+ }
325
+
326
+ /**
327
+ * Closes the current initialization if it exists.
328
+ */
329
+ public void close () {
330
+ DefaultInitialization current = this .initializationRef .getAndSet (null );
331
+ if (current != null ) {
332
+ current .close ();
333
+ }
334
+ }
335
+
336
+ /**
337
+ * Gracefully closes the current initialization if it exists.
338
+ * @return A Mono that completes when the connection is closed
339
+ */
340
+ public Mono <?> closeGracefully () {
341
+ return Mono .defer (() -> {
342
+ DefaultInitialization current = this .initializationRef .getAndSet (null );
343
+ Mono <?> sessionClose = current != null ? current .closeGracefully () : Mono .empty ();
344
+ return sessionClose ;
345
+ });
346
+ }
347
+
348
+ }
0 commit comments