4
4
package com .microsoft .signalr ;
5
5
6
6
import java .util .Map ;
7
+ import java .util .concurrent .locks .ReentrantLock ;
7
8
8
9
import org .slf4j .Logger ;
9
10
import org .slf4j .LoggerFactory ;
@@ -26,6 +27,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
26
27
private WebSocketOnClosedCallback onClose ;
27
28
private CompletableSubject startSubject = CompletableSubject .create ();
28
29
private CompletableSubject closeSubject = CompletableSubject .create ();
30
+ private final ReentrantLock closeLock = new ReentrantLock ();
29
31
30
32
private final Logger logger = LoggerFactory .getLogger (OkHttpWebSocketWrapper .class );
31
33
@@ -87,14 +89,29 @@ public void onMessage(WebSocket webSocket, String message) {
87
89
@ Override
88
90
public void onClosing (WebSocket webSocket , int code , String reason ) {
89
91
onClose .invoke (code , reason );
90
- closeSubject .onComplete ();
92
+ try {
93
+ closeLock .lock ();
94
+ closeSubject .onComplete ();
95
+ }
96
+ finally {
97
+ closeLock .unlock ();
98
+ }
91
99
checkStartFailure ();
92
100
}
93
101
94
102
@ Override
95
103
public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
96
104
logger .error ("WebSocket closed from an error: {}." , t .getMessage ());
97
- closeSubject .onError (new RuntimeException (t ));
105
+
106
+ try {
107
+ closeLock .lock ();
108
+ if (!closeSubject .hasComplete ()) {
109
+ closeSubject .onError (new RuntimeException (t ));
110
+ }
111
+ }
112
+ finally {
113
+ closeLock .unlock ();
114
+ }
98
115
onClose .invoke (null , t .getMessage ());
99
116
checkStartFailure ();
100
117
}
0 commit comments