Skip to content

Commit 0e54c8b

Browse files
committed
buffer values when paused
1 parent 1d7cdf3 commit 0e54c8b

File tree

2 files changed

+17
-27
lines changed

2 files changed

+17
-27
lines changed

lib/src/common.dart

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ Stream<R> toStreamWithTransform<T extends Listenable, R>(
88
T listenable,
99
R Function(T) transform,
1010
) {
11-
late StreamController<R> controller;
11+
final controller = StreamController<R>();
1212
VoidCallback? listener;
1313

14-
final onListenOrOnResume = () {
14+
controller.onListen = () {
1515
assert(listener == null);
1616
try {
1717
listenable
@@ -21,26 +21,15 @@ Stream<R> toStreamWithTransform<T extends Listenable, R>(
2121
}
2222
};
2323

24-
final createOnPauseOrOnCancel = ([bool closeOnError = false]) {
24+
controller.onCancel = () {
2525
return () {
2626
assert(listener != null);
2727
try {
2828
listenable.removeListener(listener!);
2929
listener = null;
30-
} catch (_ /*Ignore*/) {
31-
if (identical(closeOnError, true)) {
32-
controller.close();
33-
}
34-
}
30+
} catch (_ /*Ignore*/) {}
3531
};
3632
};
3733

38-
controller = StreamController<R>(
39-
onListen: onListenOrOnResume,
40-
onPause: createOnPauseOrOnCancel(true),
41-
onResume: onListenOrOnResume,
42-
onCancel: createOnPauseOrOnCancel(),
43-
);
44-
4534
return controller.stream;
4635
}

test/listenable_stream_test.dart

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ void main() {
6969
final subscription = stream.listen(
7070
expectAsync1(
7171
(v) => expect(v, changeNotifier),
72-
count: 1,
72+
count: 4,
7373
),
7474
)..pause();
7575

76-
// no effect
76+
// buffer
7777
changeNotifier.notifyListeners();
7878
changeNotifier.notifyListeners();
7979
changeNotifier.notifyListeners();
@@ -181,30 +181,32 @@ void main() {
181181
test('not replay', () async {
182182
final valueNotifier = ValueNotifier(0);
183183
final stream = valueNotifier.toValueStream();
184-
final expected = 4;
184+
final expected = [1, 2, 3, 4, 5];
185185

186+
var i = 0;
186187
final subscription = stream.listen(
187188
expectAsync1(
188-
(v) => expect(v, expected),
189-
count: 1,
189+
(v) => expect(v, expected[i++]),
190+
count: expected.length,
190191
),
191192
)..pause();
192193

193-
// no effect
194+
// buffer
194195
valueNotifier.value = 1;
195196
valueNotifier.value = 2;
196197
valueNotifier.value = 3;
197198

198199
await Future<void>.delayed(const Duration(milliseconds: 50));
199200
subscription.resume();
200201

201-
valueNotifier.value = expected;
202+
valueNotifier.value = 4;
203+
valueNotifier.value = 5;
202204
});
203205

204206
test('replay + pause immediately', () async {
205207
final valueNotifier = ValueNotifier(0);
206208
final stream = valueNotifier.toValueStream(replayValue: true);
207-
final expected = [0, 4, 5];
209+
final expected = [0, 1, 2, 3, 4, 5];
208210

209211
var i = 0;
210212
final subscription = stream.listen(
@@ -215,22 +217,21 @@ void main() {
215217
),
216218
)..pause();
217219

218-
// no effect
220+
// buffer
219221
valueNotifier.value = 1;
220222
valueNotifier.value = 2;
221223
valueNotifier.value = 3;
222224

223225
subscription.resume();
224226

225-
await pumpEventQueue();
226227
valueNotifier.value = 4;
227228
valueNotifier.value = 5;
228229
});
229230

230231
test('replay + pause after events queue.', () async {
231232
final valueNotifier = ValueNotifier(0);
232233
final stream = valueNotifier.toValueStream(replayValue: true);
233-
final expected = [0, 4, 5];
234+
final expected = [0, 1, 2, 3, 4, 5];
234235

235236
var i = 0;
236237
final subscription = stream.listen(
@@ -244,7 +245,7 @@ void main() {
244245
await pumpEventQueue();
245246
subscription.pause();
246247

247-
// no effect
248+
// buffer
248249
valueNotifier.value = 1;
249250
valueNotifier.value = 2;
250251
valueNotifier.value = 3;

0 commit comments

Comments
 (0)