@@ -258,11 +258,11 @@ public class SSLSecurity {
258
258
259
259
return keys
260
260
}
261
-
261
+
262
262
return keys
263
263
}
264
-
265
-
264
+
265
+
266
266
}
267
267
268
268
@@ -291,6 +291,10 @@ import Foundation
291
291
import CoreFoundation
292
292
import Security
293
293
294
+ public let WebsocketDidConnectNotification = " WebsocketDidConnectNotification "
295
+ public let WebsocketDidDisconnectNotification = " WebsocketDidDisconnectNotification "
296
+ public let WebsocketDisconnectionErrorKeyName = " WebsocketDisconnectionErrorKeyName "
297
+
294
298
public protocol WebSocketDelegate : class {
295
299
func websocketDidConnect( socket: WebSocket )
296
300
func websocketDidDisconnect( socket: WebSocket , error: NSError ? )
@@ -380,6 +384,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
380
384
public var security : SSLSecurity ?
381
385
public var enabledSSLCipherSuites : [ SSLCipherSuite ] ?
382
386
public var origin : String ?
387
+ public var timeout = 5
383
388
public var isConnected : Bool {
384
389
return connected
385
390
}
@@ -397,6 +402,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
397
402
private var didDisconnect = false
398
403
private var readyToWrite = false
399
404
private let mutex = NSLock ( )
405
+ private let notificationCenter = NSNotificationCenter . defaultCenter ( )
400
406
private var canDispatch : Bool {
401
407
mutex. lock ( )
402
408
let canWork = readyToWrite
@@ -589,12 +595,12 @@ public class WebSocket : NSObject, NSStreamDelegate {
589
595
self . mutex. unlock ( )
590
596
591
597
let bytes = UnsafePointer < UInt8 > ( data. bytes)
592
- var timeout = 5000000 //wait 5 seconds before giving up
598
+ var out = timeout * 1000000 //wait 5 seconds before giving up
593
599
writeQueue. addOperationWithBlock { [ weak self] in
594
600
while !outStream. hasSpaceAvailable {
595
601
usleep ( 100 ) //wait until the socket is ready
596
- timeout -= 100
597
- if timeout < 0 {
602
+ out -= 100
603
+ if out < 0 {
598
604
self ? . cleanupStream ( )
599
605
self ? . doDisconnect ( self ? . errorWithDetail ( " write wait timed out " , code: 2 ) )
600
606
return
@@ -675,25 +681,24 @@ public class WebSocket : NSObject, NSStreamDelegate {
675
681
}
676
682
///dequeue the incoming input so it is processed in order
677
683
private func dequeueInput( ) {
678
- guard !inputQueue. isEmpty else { return }
679
-
680
- let data = inputQueue [ 0 ]
681
- var work = data
682
- if let fragBuffer = fragBuffer {
683
- let combine = NSMutableData ( data: fragBuffer)
684
- combine. appendData ( data)
685
- work = combine
686
- self . fragBuffer = nil
687
- }
688
- let buffer = UnsafePointer < UInt8 > ( work. bytes)
689
- let length = work. length
690
- if !connected {
691
- processTCPHandshake ( buffer, bufferLen: length)
692
- } else {
693
- processRawMessage ( buffer, bufferLen: length)
684
+ while !inputQueue. isEmpty {
685
+ let data = inputQueue [ 0 ]
686
+ var work = data
687
+ if let fragBuffer = fragBuffer {
688
+ let combine = NSMutableData ( data: fragBuffer)
689
+ combine. appendData ( data)
690
+ work = combine
691
+ self . fragBuffer = nil
692
+ }
693
+ let buffer = UnsafePointer < UInt8 > ( work. bytes)
694
+ let length = work. length
695
+ if !connected {
696
+ processTCPHandshake ( buffer, bufferLen: length)
697
+ } else {
698
+ processRawMessagesInBuffer ( buffer, bufferLen: length)
699
+ }
700
+ inputQueue = inputQueue. filter { $0 != data}
694
701
}
695
- inputQueue = inputQueue. filter { $0 != data}
696
- dequeueInput ( )
697
702
}
698
703
699
704
//handle checking the inital connection status
@@ -707,6 +712,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
707
712
guard let s = self else { return }
708
713
s. onConnect ? ( )
709
714
s. delegate? . websocketDidConnect ( s)
715
+ s. notificationCenter. postNotificationName ( WebsocketDidConnectNotification, object: self )
710
716
}
711
717
case - 1 :
712
718
fragBuffer = NSData ( bytes: buffer, length: bufferLen)
@@ -739,7 +745,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
739
745
totalSize += 1 //skip the last \n
740
746
let restSize = bufferLen - totalSize
741
747
if restSize > 0 {
742
- processRawMessage ( ( buffer+ totalSize) , bufferLen: restSize)
748
+ processRawMessagesInBuffer ( buffer + totalSize, bufferLen: restSize)
743
749
}
744
750
return 0 //success
745
751
}
@@ -792,12 +798,15 @@ public class WebSocket : NSObject, NSStreamDelegate {
792
798
}
793
799
}
794
800
795
- ///process the websocket data
796
- private func processRawMessage( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
801
+ /// Process one message at the start of `buffer`. Return another buffer (sharing storage) that contains the leftover contents of `buffer` that I didn't process.
802
+ @warn_unused_result
803
+ private func processOneRawMessage( inBuffer buffer: UnsafeBufferPointer < UInt8 > ) -> UnsafeBufferPointer< UInt8 > {
797
804
let response = readStack. last
805
+ let baseAddress = buffer. baseAddress
806
+ let bufferLen = buffer. count
798
807
if response != nil && bufferLen < 2 {
799
- fragBuffer = NSData ( bytes : buffer, length : bufferLen )
800
- return
808
+ fragBuffer = NSData ( buffer: buffer )
809
+ return emptyBuffer
801
810
}
802
811
if let response = response where response. bytesLeft > 0 {
803
812
var len = response. bytesLeft
@@ -807,79 +816,77 @@ public class WebSocket : NSObject, NSStreamDelegate {
807
816
extra = 0
808
817
}
809
818
response. bytesLeft -= len
810
- response. buffer? . appendData ( NSData ( bytes: buffer , length: len) )
819
+ response. buffer? . appendData ( NSData ( bytes: baseAddress , length: len) )
811
820
processResponse ( response)
812
- let offset = bufferLen - extra
813
- if extra > 0 {
814
- processExtra ( ( buffer+ offset) , bufferLen: extra)
815
- }
816
- return
821
+ return buffer. fromOffset ( bufferLen - extra)
817
822
} else {
818
- let isFin = ( FinMask & buffer [ 0 ] )
819
- let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & buffer [ 0 ] ) )
820
- let isMasked = ( MaskMask & buffer [ 1 ] )
821
- let payloadLen = ( PayloadLenMask & buffer [ 1 ] )
823
+ let isFin = ( FinMask & baseAddress [ 0 ] )
824
+ let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & baseAddress [ 0 ] ) )
825
+ let isMasked = ( MaskMask & baseAddress [ 1 ] )
826
+ let payloadLen = ( PayloadLenMask & baseAddress [ 1 ] )
822
827
var offset = 2
823
- if ( isMasked > 0 || ( RSVMask & buffer [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
828
+ if ( isMasked > 0 || ( RSVMask & baseAddress [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
824
829
let errCode = CloseCode . ProtocolError. rawValue
825
830
doDisconnect ( errorWithDetail ( " masked and rsv data is not currently supported " , code: errCode) )
826
831
writeError ( errCode)
827
- return
832
+ return emptyBuffer
828
833
}
829
834
let isControlFrame = ( receivedOpcode == . ConnectionClose || receivedOpcode == . Ping)
830
835
if !isControlFrame && ( receivedOpcode != . BinaryFrame && receivedOpcode != . ContinueFrame &&
831
836
receivedOpcode != . TextFrame && receivedOpcode != . Pong) {
832
837
let errCode = CloseCode . ProtocolError. rawValue
833
838
doDisconnect ( errorWithDetail ( " unknown opcode: \( receivedOpcode) " , code: errCode) )
834
839
writeError ( errCode)
835
- return
840
+ return emptyBuffer
836
841
}
837
842
if isControlFrame && isFin == 0 {
838
843
let errCode = CloseCode . ProtocolError. rawValue
839
844
doDisconnect ( errorWithDetail ( " control frames can't be fragmented " , code: errCode) )
840
845
writeError ( errCode)
841
- return
846
+ return emptyBuffer
842
847
}
843
848
if receivedOpcode == . ConnectionClose {
844
849
var code = CloseCode . Normal. rawValue
845
850
if payloadLen == 1 {
846
851
code = CloseCode . ProtocolError. rawValue
847
852
} else if payloadLen > 1 {
848
- code = WebSocket . readUint16 ( buffer , offset: offset)
853
+ code = WebSocket . readUint16 ( baseAddress , offset: offset)
849
854
if code < 1000 || ( code > 1003 && code < 1007 ) || ( code > 1011 && code < 3000 ) {
850
855
code = CloseCode . ProtocolError. rawValue
851
856
}
852
857
offset += 2
853
858
}
859
+ var closeReason = " connection closed by server "
854
860
if payloadLen > 2 {
855
- let len = Int ( payloadLen- 2 )
861
+ let len = Int ( payloadLen - 2 )
856
862
if len > 0 {
857
- let bytes = UnsafePointer < UInt8 > ( ( buffer+ offset) )
858
- let str : NSString ? = NSString ( data: NSData ( bytes: bytes, length: len) , encoding: NSUTF8StringEncoding)
859
- if str == nil {
863
+ let bytes = baseAddress + offset
864
+ if let customCloseReason = String ( data: NSData ( bytes: bytes, length: len) , encoding: NSUTF8StringEncoding) {
865
+ closeReason = customCloseReason
866
+ } else {
860
867
code = CloseCode . ProtocolError. rawValue
861
868
}
862
869
}
863
870
}
864
- doDisconnect ( errorWithDetail ( " connection closed by server " , code: code) )
871
+ doDisconnect ( errorWithDetail ( closeReason , code: code) )
865
872
writeError ( code)
866
- return
873
+ return emptyBuffer
867
874
}
868
875
if isControlFrame && payloadLen > 125 {
869
876
writeError ( CloseCode . ProtocolError. rawValue)
870
- return
877
+ return emptyBuffer
871
878
}
872
879
var dataLength = UInt64 ( payloadLen)
873
880
if dataLength == 127 {
874
- dataLength = WebSocket . readUint64 ( buffer , offset: offset)
881
+ dataLength = WebSocket . readUint64 ( baseAddress , offset: offset)
875
882
offset += sizeof ( UInt64)
876
883
} else if dataLength == 126 {
877
- dataLength = UInt64 ( WebSocket . readUint16 ( buffer , offset: offset) )
884
+ dataLength = UInt64 ( WebSocket . readUint16 ( baseAddress , offset: offset) )
878
885
offset += sizeof ( UInt16)
879
886
}
880
887
if bufferLen < offset || UInt64 ( bufferLen - offset) < dataLength {
881
- fragBuffer = NSData ( bytes: buffer , length: bufferLen)
882
- return
888
+ fragBuffer = NSData ( bytes: baseAddress , length: bufferLen)
889
+ return emptyBuffer
883
890
}
884
891
var len = dataLength
885
892
if dataLength > UInt64 ( bufferLen) {
@@ -890,7 +897,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
890
897
len = 0
891
898
data = NSData ( )
892
899
} else {
893
- data = NSData ( bytes: UnsafePointer < UInt8 > ( ( buffer + offset) ) , length: Int ( len) )
900
+ data = NSData ( bytes: baseAddress + offset, length: Int ( len) )
894
901
}
895
902
if receivedOpcode == . Pong {
896
903
if canDispatch {
@@ -900,12 +907,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
900
907
s. pongDelegate? . websocketDidReceivePong ( s)
901
908
}
902
909
}
903
- let step = Int ( offset+ numericCast( len) )
904
- let extra = bufferLen- step
905
- if extra > 0 {
906
- processRawMessage ( ( buffer+ step) , bufferLen: extra)
907
- }
908
- return
910
+ return buffer. fromOffset ( offset + Int( len) )
909
911
}
910
912
var response = readStack. last
911
913
if isControlFrame {
@@ -915,7 +917,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
915
917
let errCode = CloseCode . ProtocolError. rawValue
916
918
doDisconnect ( errorWithDetail ( " continue frame before a binary or text frame " , code: errCode) )
917
919
writeError ( errCode)
918
- return
920
+ return emptyBuffer
919
921
}
920
922
var isNew = false
921
923
if response == nil {
@@ -924,7 +926,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
924
926
doDisconnect ( errorWithDetail ( " first frame can't be a continue frame " ,
925
927
code: errCode) )
926
928
writeError ( errCode)
927
- return
929
+ return emptyBuffer
928
930
}
929
931
isNew = true
930
932
response = WSResponse ( )
@@ -939,7 +941,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
939
941
doDisconnect ( errorWithDetail ( " second and beyond of fragment message must be a continue frame " ,
940
942
code: errCode) )
941
943
writeError ( errCode)
942
- return
944
+ return emptyBuffer
943
945
}
944
946
response!. buffer!. appendData ( data)
945
947
}
@@ -954,20 +956,18 @@ public class WebSocket : NSObject, NSStreamDelegate {
954
956
}
955
957
956
958
let step = Int ( offset+ numericCast( len) )
957
- let extra = bufferLen- step
958
- if extra > 0 {
959
- processExtra ( ( buffer+ step) , bufferLen: extra)
960
- }
959
+ return buffer. fromOffset ( step)
961
960
}
962
-
963
961
}
964
962
965
- ///process the extra of a buffer
966
- private func processExtra( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
967
- if bufferLen < 2 {
968
- fragBuffer = NSData ( bytes: buffer, length: bufferLen)
969
- } else {
970
- processRawMessage ( buffer, bufferLen: bufferLen)
963
+ /// Process all messages in the buffer if possible.
964
+ private func processRawMessagesInBuffer( pointer: UnsafePointer < UInt8 > , bufferLen: Int) {
965
+ var buffer = UnsafeBufferPointer ( start: pointer, count: bufferLen)
966
+ repeat {
967
+ buffer = processOneRawMessage ( inBuffer: buffer)
968
+ } while buffer. count >= 2
969
+ if buffer. count > 0 {
970
+ fragBuffer = NSData ( buffer: buffer)
971
971
}
972
972
}
973
973
@@ -1093,6 +1093,8 @@ public class WebSocket : NSObject, NSStreamDelegate {
1093
1093
guard let s = self else { return }
1094
1094
s. onDisconnect ? ( error)
1095
1095
s. delegate? . websocketDidDisconnect ( s, error: error)
1096
+ let userInfo = error. map ( { [ WebsocketDisconnectionErrorKeyName: $0] } )
1097
+ s. notificationCenter. postNotificationName ( WebsocketDidDisconnectNotification, object: self , userInfo: userInfo)
1096
1098
}
1097
1099
}
1098
1100
@@ -1103,4 +1105,22 @@ public class WebSocket : NSObject, NSStreamDelegate {
1103
1105
cleanupStream ( )
1104
1106
}
1105
1107
1106
- }
1108
+ }
1109
+
1110
+ private extension NSData {
1111
+
1112
+ convenience init( buffer: UnsafeBufferPointer < UInt8 > ) {
1113
+ self . init ( bytes: buffer. baseAddress, length: buffer. count)
1114
+ }
1115
+
1116
+ }
1117
+
1118
+ private extension UnsafeBufferPointer {
1119
+
1120
+ func fromOffset( offset: Int ) -> UnsafeBufferPointer < Element > {
1121
+ return UnsafeBufferPointer < Element > ( start: baseAddress. advancedBy ( offset) , count: count - offset)
1122
+ }
1123
+
1124
+ }
1125
+
1126
+ private let emptyBuffer = UnsafeBufferPointer < UInt8 > ( start: nil , count: 0 )
0 commit comments