Skip to content

Commit e64727d

Browse files
committed
Merge branch 'master' into nnbd
2 parents f97e213 + 48a77b5 commit e64727d

File tree

7 files changed

+241
-26
lines changed

7 files changed

+241
-26
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.0.2 - Jan 12, 2021
2+
3+
- Internal refactor
4+
15
## 0.0.1 - Dec 15, 2020
26

37
- Initial version.

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
[![codecov](https://codecov.io/gh/Flutter-Dart-Open-Source/listenable_stream/branch/master/graph/badge.svg?token=6eORcR6Web)](https://codecov.io/gh/Flutter-Dart-Open-Source/listenable_stream)
88
[![Flutter Tests](https://github.com/Flutter-Dart-Open-Source/listenable_stream/workflows/Flutter%20Tests/badge.svg)](https://github.com/Flutter-Dart-Open-Source/listenable_stream.git)
99

10+
- [x] `Listenable``Stream<Listenable>`
11+
- [x] `ValueListenable<T>``ValueStream<T>`
12+
1013
## Listenable.toStream()
1114
```dart
1215
final ChangeNotifier changeNotifier = ChangeNotifier();

lib/src/common.dart

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import 'dart:async';
2+
3+
import 'package:flutter/foundation.dart' show Listenable, VoidCallback;
4+
5+
/// @private
6+
/// Convert this [Listenable] to a [Stream].
7+
Stream<R> toStreamWithTransform<T extends Listenable, R>(
8+
T listenable,
9+
R Function(T) transform,
10+
) {
11+
late StreamController<R> controller;
12+
VoidCallback? listener;
13+
14+
final onListenOrOnResume = () {
15+
assert(listener == null);
16+
try {
17+
listenable
18+
.addListener(listener = () => controller.add(transform(listenable)));
19+
} catch (_ /*Ignore*/) {
20+
controller.close();
21+
}
22+
};
23+
24+
final createOnPauseOrOnCancel = ([bool closeOnError = false]) {
25+
return () {
26+
assert(listener != null);
27+
try {
28+
listenable.removeListener(listener!);
29+
listener = null;
30+
} catch (_ /*Ignore*/) {
31+
if (identical(closeOnError, true)) {
32+
controller.close();
33+
}
34+
}
35+
};
36+
};
37+
38+
controller = StreamController<R>(
39+
onListen: onListenOrOnResume,
40+
onPause: createOnPauseOrOnCancel(true),
41+
onResume: onListenOrOnResume,
42+
onCancel: createOnPauseOrOnCancel(),
43+
);
44+
45+
return controller.stream;
46+
}

lib/src/listenable_to_stream.dart

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,11 @@
11
import 'dart:async';
22

3-
import 'package:flutter/foundation.dart' show Listenable, VoidCallback;
3+
import 'package:flutter/foundation.dart' show Listenable;
4+
5+
import 'common.dart';
46

57
/// Convert this [Listenable] to a [Stream].
68
extension ListenableToStream<T extends Listenable> on T {
79
/// Convert this [Listenable] to a [Stream].
8-
Stream<T> toStream() {
9-
late StreamController<T> controller;
10-
VoidCallback? listener;
11-
12-
controller = StreamController<T>(
13-
sync: true,
14-
onListen: () => addListener(listener = () => controller.add(this)),
15-
onCancel: () {
16-
try {
17-
removeListener(listener!);
18-
listener = null;
19-
} catch (_ /*Ignore*/) {}
20-
},
21-
);
22-
23-
return controller.stream;
24-
}
10+
Stream<T> toStream() => toStreamWithTransform<T, T>(this, (t) => t);
2511
}

lib/src/value_listenable_to_value_stream.dart

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import 'dart:async' show Stream, StreamSubscription;
22

33
import 'package:flutter/foundation.dart' show ValueListenable;
4-
import 'package:rxdart/rxdart.dart' show ErrorAndStackTrace, ValueStream;
4+
import 'package:rxdart/rxdart.dart' show ValueStream;
55
import 'package:rxdart/src/utils/value_wrapper.dart';
66

7-
import 'listenable_to_stream.dart';
7+
import 'common.dart';
88
import 'streamx.dart';
99

10+
extension _ValueListenableToStreamExtension<T> on ValueListenable<T> {
11+
Stream<T> toStream() =>
12+
toStreamWithTransform<ValueListenable<T>, T>(this, (l) => l.value);
13+
}
14+
1015
/// Convert this [ValueListenable] to a [ValueStream].
1116
/// The returned [ValueStream] is a Single-Subscription [Stream].
1217
///
@@ -28,8 +33,6 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
2833
final bool _replayValue;
2934
Stream<T>? _stream;
3035

31-
T _getValue([void _]) => _valueListenable.value;
32-
3336
/// Construct a [ValueListenableStream] from [ValueListenable].
3437
ValueListenableStream(this._valueListenable, this._replayValue);
3538

@@ -42,9 +45,9 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
4245
}) {
4346
if (_replayValue) {
4447
_stream ??=
45-
_valueListenable.toStream().map(_getValue).startWith(_getValue);
48+
_valueListenable.toStream().startWith(() => _valueListenable.value);
4649
} else {
47-
_stream ??= _valueListenable.toStream().map(_getValue);
50+
_stream ??= _valueListenable.toStream();
4851
}
4952

5053
return _stream!.listen(
@@ -63,5 +66,5 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
6366
throw StateError('This Stream always has no error!');
6467

6568
@override
66-
ValueWrapper<T> get valueWrapper => ValueWrapper(_getValue());
69+
ValueWrapper<T> get valueWrapper => ValueWrapper(_valueListenable.value);
6770
}

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: listenable_stream
22
description: Convert Listenable (eg. ChangeNotifier), ValueListenable(eg. ValueNotifier) to Stream / ValueStream.
3-
version: 0.0.1
3+
version: 1.0.0-nullsafety
44
author: Petrus Nguyễn Thái Học <hoc081098@gmail.com>
55
homepage: https://github.com/Flutter-Dart-Open-Source/listenable_stream.git
66
repository: https://github.com/Flutter-Dart-Open-Source/listenable_stream.git

test/listenable_stream_test.dart

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:async';
2+
13
import 'package:flutter/foundation.dart';
24
import 'package:flutter_test/flutter_test.dart';
35
import 'package:listenable_stream/listenable_stream.dart';
@@ -37,6 +39,50 @@ void main() {
3739
final stream = ChangeNotifier().toStream();
3840
_isSingleSubscriptionStream(stream);
3941
});
42+
43+
test('Cancel', () async {
44+
final changeNotifier = ChangeNotifier();
45+
final stream = changeNotifier.toStream();
46+
47+
final subscription = stream.listen(
48+
expectAsync1(
49+
(e) => expect(e, changeNotifier),
50+
count: 3,
51+
),
52+
);
53+
54+
changeNotifier.notifyListeners();
55+
changeNotifier.notifyListeners();
56+
changeNotifier.notifyListeners();
57+
58+
await pumpEventQueue();
59+
await subscription.cancel();
60+
61+
changeNotifier.notifyListeners();
62+
changeNotifier.notifyListeners();
63+
});
64+
65+
test('Pause resume', () async {
66+
final changeNotifier = ChangeNotifier();
67+
final stream = changeNotifier.toStream();
68+
69+
final subscription = stream.listen(
70+
expectAsync1(
71+
(v) => expect(v, changeNotifier),
72+
count: 1,
73+
),
74+
)..pause();
75+
76+
// no effect
77+
changeNotifier.notifyListeners();
78+
changeNotifier.notifyListeners();
79+
changeNotifier.notifyListeners();
80+
81+
await Future<void>.delayed(const Duration(milliseconds: 50));
82+
subscription.resume();
83+
84+
changeNotifier.notifyListeners();
85+
});
4086
});
4187

4288
group('ValueListenableToStream', () {
@@ -82,5 +128,132 @@ void main() {
82128
_isSingleSubscriptionStream(stream);
83129
}
84130
});
131+
132+
test('Cancel', () async {
133+
{
134+
final valueNotifier = ValueNotifier(0);
135+
final stream = valueNotifier.toValueStream();
136+
137+
var i = 1;
138+
final subscription = stream.listen(
139+
expectAsync1(
140+
(e) => expect(e, i++),
141+
count: 3,
142+
),
143+
);
144+
145+
valueNotifier.value = 1;
146+
valueNotifier.value = 2;
147+
valueNotifier.value = 3;
148+
149+
await pumpEventQueue();
150+
await subscription.cancel();
151+
152+
valueNotifier.value = 4;
153+
valueNotifier.value = 5;
154+
}
155+
156+
{
157+
final valueNotifier = ValueNotifier(0);
158+
final stream = valueNotifier.toValueStream(replayValue: true);
159+
160+
var i = 0;
161+
final subscription = stream.listen(
162+
expectAsync1(
163+
(e) => expect(e, i++),
164+
count: 4,
165+
),
166+
);
167+
168+
valueNotifier.value = 1;
169+
valueNotifier.value = 2;
170+
valueNotifier.value = 3;
171+
172+
await pumpEventQueue();
173+
await subscription.cancel();
174+
175+
valueNotifier.value = 4;
176+
valueNotifier.value = 5;
177+
}
178+
});
179+
180+
group('Pause resume', () {
181+
test('not replay', () async {
182+
final valueNotifier = ValueNotifier(0);
183+
final stream = valueNotifier.toValueStream();
184+
final expected = 4;
185+
186+
final subscription = stream.listen(
187+
expectAsync1(
188+
(v) => expect(v, expected),
189+
count: 1,
190+
),
191+
)..pause();
192+
193+
// no effect
194+
valueNotifier.value = 1;
195+
valueNotifier.value = 2;
196+
valueNotifier.value = 3;
197+
198+
await Future<void>.delayed(const Duration(milliseconds: 50));
199+
subscription.resume();
200+
201+
valueNotifier.value = expected;
202+
});
203+
204+
test('replay + pause immediately', () async {
205+
final valueNotifier = ValueNotifier(0);
206+
final stream = valueNotifier.toValueStream(replayValue: true);
207+
final expected = [0, 4, 5];
208+
209+
var i = 0;
210+
final subscription = stream.listen(
211+
expectAsync1(
212+
(v) => expect(v, expected[i++]),
213+
count: expected.length,
214+
max: expected.length,
215+
),
216+
)..pause();
217+
218+
// no effect
219+
valueNotifier.value = 1;
220+
valueNotifier.value = 2;
221+
valueNotifier.value = 3;
222+
223+
subscription.resume();
224+
225+
await pumpEventQueue();
226+
valueNotifier.value = 4;
227+
valueNotifier.value = 5;
228+
});
229+
230+
test('replay + pause after events queue.', () async {
231+
final valueNotifier = ValueNotifier(0);
232+
final stream = valueNotifier.toValueStream(replayValue: true);
233+
final expected = [0, 4, 5];
234+
235+
var i = 0;
236+
final subscription = stream.listen(
237+
expectAsync1(
238+
(v) => expect(v, expected[i++]),
239+
count: expected.length,
240+
max: expected.length,
241+
),
242+
);
243+
244+
await pumpEventQueue();
245+
subscription.pause();
246+
247+
// no effect
248+
valueNotifier.value = 1;
249+
valueNotifier.value = 2;
250+
valueNotifier.value = 3;
251+
252+
subscription.resume();
253+
254+
valueNotifier.value = 4;
255+
valueNotifier.value = 5;
256+
});
257+
});
85258
});
86259
}

0 commit comments

Comments
 (0)