Skip to content

Commit 1efc86d

Browse files
authored
Websocket message queue using streams and Delay between messages (#335)
1 parent 5e75f3a commit 1efc86d

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

lib/src/config.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,12 @@ class Settings {
6161

6262
/// ICE Gathering Timeout (in millisecond).
6363
int ice_gathering_timeout = 500;
64+
65+
/// Sip Message Delay (in millisecond) ( default 0 ).
66+
int sip_message_delay = 0;
6467
}
6568

69+
6670
// Configuration checks.
6771
class Checks {
6872
Map<String, Null Function(Settings src, Settings? dst)> mandatory =

lib/src/sip_ua_helper.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ class SIPUAHelper extends EventManager {
116116
// Reset settings
117117
_settings = Settings();
118118
WebSocketInterface socket = WebSocketInterface(
119-
uaSettings.webSocketUrl, uaSettings.webSocketSettings);
119+
uaSettings.webSocketUrl, messageDelay: _settings.sip_message_delay, webSocketSettings: uaSettings.webSocketSettings);
120120
_settings.sockets = <WebSocketInterface>[socket];
121121
_settings.uri = uaSettings.uri;
122+
_settings.sip_message_delay = uaSettings.sip_message_delay;
122123
_settings.realm = uaSettings.realm;
123124
_settings.password = uaSettings.password;
124125
_settings.ha1 = uaSettings.ha1;
@@ -698,7 +699,8 @@ class UaSettings {
698699

699700
/// ICE Gathering Timeout, default 500ms
700701
int iceGatheringTimeout = 500;
701-
702+
/// Sip Message Delay (in millisecond) (default 0).
703+
int sip_message_delay = 0;
702704
List<Map<String, String>> iceServers = <Map<String, String>>[
703705
<String, String>{'url': 'stun:stun.l.google.com:19302'},
704706
// turn server configuration example.

lib/src/transports/websocket_dart_impl.dart

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@ typedef OnCloseCallback = void Function(int? code, String? reason);
1111
typedef OnOpenCallback = void Function();
1212

1313
class WebSocketImpl {
14-
WebSocketImpl(this._url);
14+
WebSocketImpl(this._url, this.messageDelay);
1515

1616
final String _url;
1717
WebSocket? _socket;
1818
OnOpenCallback? onOpen;
1919
OnMessageCallback? onMessage;
2020
OnCloseCallback? onClose;
21-
21+
final int messageDelay;
2222
void connect(
2323
{Iterable<String>? protocols,
2424
required WebSocketSettings webSocketSettings}) async {
25+
handleQueue();
2526
logger.i('connect $_url, ${webSocketSettings.extraHeaders}, $protocols');
2627
try {
2728
if (webSocketSettings.allowBadCertificate) {
@@ -42,11 +43,20 @@ class WebSocketImpl {
4243
onClose?.call(500, e.toString());
4344
}
4445
}
46+
final StreamController<dynamic> queue = StreamController<dynamic>.broadcast();
47+
void handleQueue() async {
48+
queue.stream.asyncMap((dynamic event) async {
49+
await Future<void>.delayed(Duration(milliseconds: messageDelay));
50+
return event;
51+
}).listen((dynamic event) async {
52+
_socket!.add(event);
53+
logger.d('send: \n\n$event');
54+
});
55+
}
4556

46-
void send(dynamic data) {
57+
void send(dynamic data) async {
4758
if (_socket != null) {
48-
_socket!.add(data);
49-
logger.d('send: \n\n$data');
59+
queue.add(data);
5060
}
5161
}
5262

lib/src/transports/websocket_interface.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import 'websocket_dart_impl.dart'
77
if (dart.library.js) 'websocket_web_impl.dart';
88

99
class WebSocketInterface implements Socket {
10-
WebSocketInterface(String url, [WebSocketSettings? webSocketSettings]) {
10+
final int _messageDelay;
11+
12+
WebSocketInterface(String url,
13+
{required int messageDelay, WebSocketSettings? webSocketSettings})
14+
: _messageDelay = messageDelay {
1115
logger.d('new() [url:$url]');
1216
_url = url;
1317
dynamic parsed_url = Grammar.parse(url, 'absoluteURI');
@@ -83,7 +87,7 @@ class WebSocketInterface implements Socket {
8387
}
8488
logger.d('connecting to WebSocket $_url');
8589
try {
86-
_ws = WebSocketImpl(_url!);
90+
_ws = WebSocketImpl(_url!, _messageDelay);
8791

8892
_ws!.onOpen = () {
8993
_closed = false;

0 commit comments

Comments
 (0)