1
1
using System ;
2
+ using System . Collections . Concurrent ;
2
3
using System . Collections . Generic ;
3
4
using System . Linq ;
5
+ using System . Net . WebSockets ;
6
+ using System . Text ;
7
+ using System . Threading ;
4
8
using System . Threading . Tasks ;
9
+ using JsonServices . Sessions ;
5
10
using JsonServices . Transport ;
6
11
7
12
namespace JsonServices . Sample . CoreServer
@@ -12,19 +17,99 @@ public class JsonServicesConnectionManager : IServer
12
17
13
18
public void Start ( ) => Started = true ;
14
19
15
- public void Dispose ( ) => Started = false ;
20
+ public void Dispose ( )
21
+ {
22
+ Started = false ;
23
+
24
+ // shutdown all running web sockets? hmm, can't do it synchronously
25
+ foreach ( var conn in WebSocketConnections )
26
+ {
27
+ var ws = conn . Value . WebSocket ;
28
+ if ( ws . State == WebSocketState . Open )
29
+ {
30
+ // there is no ws.Close
31
+ }
32
+ }
33
+ }
16
34
17
- public IConnection TryGetConnection ( string connectionId )
35
+ public async Task HandleIncomingMessages ( WebSocket webSocket )
18
36
{
19
- throw new NotImplementedException ( ) ;
37
+ // 1. create connection
38
+ var connection = new WebSocketConnection ( webSocket ) ;
39
+ WebSocketConnections . TryAdd ( connection . ConnectionId , connection ) ;
40
+ ClientConnected ? . Invoke ( this , new MessageEventArgs
41
+ {
42
+ ConnectionId = connection . ConnectionId ,
43
+ } ) ;
44
+
45
+ // 2. TODO: handle incoming messages in a loop until the socket is closed
46
+ await ReceiveMessages ( connection ) ;
47
+
48
+ // 3. disconnect the client
49
+ ClientDisconnected ? . Invoke ( this , new MessageEventArgs
50
+ {
51
+ ConnectionId = connection . ConnectionId ,
52
+ } ) ;
20
53
}
21
54
22
- public Task SendAsync ( string connectionId , string data )
55
+ private async Task ReceiveMessages ( WebSocketConnection connection )
56
+ {
57
+ var webSocket = connection . WebSocket ;
58
+ var buffer = new byte [ 1024 * 32 ] ;
59
+ var stringBuilder = new StringBuilder ( ) ;
60
+
61
+ while ( webSocket . State == WebSocketState . Open )
62
+ {
63
+ var result = await webSocket . ReceiveAsync ( buffer , CancellationToken . None ) ;
64
+ if ( result . MessageType == WebSocketMessageType . Close )
65
+ {
66
+ return ;
67
+ }
68
+
69
+ // if the message is too big to fit in the buffer, assemble it
70
+ // TODO: message parts can be invalid UTF8 chars (broken into parts)
71
+ var part = Encoding . UTF8 . GetString ( buffer ) ;
72
+ stringBuilder . Append ( part ) ;
73
+ if ( result . EndOfMessage )
74
+ {
75
+ MessageReceived ? . Invoke ( this , new MessageEventArgs
76
+ {
77
+ ConnectionId = connection . ConnectionId ,
78
+ Data = stringBuilder . ToString ( ) ,
79
+ } ) ;
80
+
81
+ stringBuilder . Clear ( ) ;
82
+ }
83
+ }
84
+ }
85
+
86
+ private class WebSocketConnection : IConnection
87
+ {
88
+ public WebSocketConnection ( WebSocket ws ) => WebSocket = ws ;
89
+
90
+ public string ConnectionId { get ; } = Guid . NewGuid ( ) . ToString ( ) ;
91
+
92
+ public WebSocket WebSocket { get ; }
93
+
94
+ public Session Session { get ; set ; }
95
+ }
96
+
97
+ private ConcurrentDictionary < string , WebSocketConnection > WebSocketConnections { get ; } =
98
+ new ConcurrentDictionary < string , WebSocketConnection > ( ) ;
99
+
100
+ public IConnection TryGetConnection ( string connectionId ) =>
101
+ WebSocketConnections . TryGetValue ( connectionId , out var value ) ? value : null ;
102
+
103
+ public async Task SendAsync ( string connectionId , string data )
23
104
{
24
- throw new NotImplementedException ( ) ;
105
+ if ( WebSocketConnections . TryGetValue ( connectionId , out var connection ) )
106
+ {
107
+ var buffer = Encoding . UTF8 . GetBytes ( data ) ;
108
+ await connection . WebSocket . SendAsync ( buffer , WebSocketMessageType . Text , true , CancellationToken . None ) ;
109
+ }
25
110
}
26
111
27
- public IEnumerable < IConnection > Connections => throw new NotImplementedException ( ) ;
112
+ public IEnumerable < IConnection > Connections => WebSocketConnections . Values ;
28
113
29
114
public event EventHandler < MessageEventArgs > MessageReceived ;
30
115
0 commit comments