Skip to content

Commit 3760cd3

Browse files
committed
updated
1 parent 8507708 commit 3760cd3

File tree

2 files changed

+75
-51
lines changed

2 files changed

+75
-51
lines changed
Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import 'dart:async';
1+
import 'dart:async'
2+
show MultiStreamController, Stream, StreamController, StreamSubscription;
23

3-
import 'package:flutter/foundation.dart';
4-
import 'package:rxdart/rxdart.dart';
4+
import 'package:flutter/foundation.dart' show ValueListenable;
5+
import 'package:rxdart/rxdart.dart' show ErrorAndStackTrace, ValueStream;
56

67
import 'listenable_to_stream.dart';
78

@@ -51,14 +52,12 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
5152
void Function() onDone,
5253
bool cancelOnError,
5354
}) {
55+
final getValue = ([void _]) => _valueListenable.value;
56+
5457
if (_replayValue) {
55-
_stream ??= _valueListenable
56-
.toStream()
57-
.map((_) => _valueListenable.value)
58-
.shareBehavior(value);
58+
_stream ??= _valueListenable.toStream().map(getValue).startWith(getValue);
5959
} else {
60-
_stream ??=
61-
_valueListenable.toStream().map((_) => _valueListenable.value);
60+
_stream ??= _valueListenable.toStream().map(getValue);
6261
}
6362

6463
return _stream.listen(
@@ -70,59 +69,59 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
7069
}
7170
}
7271

73-
extension _ShareValueExtension<T> on Stream<T> {
74-
Stream<T> shareBehavior(T seeded) {
75-
final controllers = <MultiStreamController<T>>[];
76-
StreamSubscription<T> subscription;
77-
78-
var latestValue = seeded;
79-
var cancel = false;
80-
var done = false;
72+
/// TODO
73+
extension StartWithExtension<T> on Stream<T> {
74+
/// TODO
75+
Stream<T> startWith(T Function() seededProvider) {
76+
MultiStreamController<T> multiController;
77+
StreamSubscription<T> upstreamSubscription;
8178

8279
final listenUpStream = () => listen(
83-
(event) {
84-
latestValue = event;
85-
controllers.forEach((c) => c.addSync(event));
86-
},
87-
onError: (e, StackTrace st) =>
88-
controllers.forEach((c) => c.addErrorSync(e, st)),
80+
multiController.addSync,
81+
onError: multiController.addErrorSync,
8982
onDone: () {
90-
done = true;
91-
subscription = null;
92-
93-
controllers.forEach((c) {
94-
c.onCancel = null;
95-
c.closeSync();
96-
});
97-
controllers.clear();
83+
upstreamSubscription = null;
84+
multiController.closeSync();
9885
},
9986
);
10087

101-
final onListen = (MultiStreamController<T> controller) {
102-
if (cancel) {
103-
return controller.closeSync();
104-
}
105-
controller.addSync(latestValue);
106-
if (done) {
107-
return controller.closeSync();
88+
final onListen = (MultiStreamController<T> c) {
89+
if (multiController != null) {
90+
return;
10891
}
10992

110-
final wasEmpty = controllers.isEmpty;
111-
controllers.add(controller);
112-
if (wasEmpty) {
113-
subscription = listenUpStream();
114-
}
93+
multiController = c;
94+
multiController.addSync(seededProvider());
11595

116-
controller.onCancel = () {
117-
controllers.remove(controller);
118-
if (controllers.isEmpty) {
119-
subscription?.cancel();
120-
subscription = null;
121-
cancel = true;
122-
}
96+
upstreamSubscription = listenUpStream();
97+
multiController.onCancel = () {
98+
upstreamSubscription?.cancel();
99+
upstreamSubscription = null;
123100
};
124101
};
125102

126-
return Stream.multi(onListen, isBroadcast: true);
103+
return Stream.multi(onListen, isBroadcast: false).toSingleSubscription();
104+
}
105+
106+
/// TODO
107+
Stream<T> toSingleSubscription() {
108+
StreamController<T> controller;
109+
StreamSubscription<T> subscription;
110+
111+
controller = StreamController<T>(
112+
sync: true,
113+
onListen: () {
114+
subscription = listen(
115+
controller.add,
116+
onError: controller.addError,
117+
onDone: controller.close,
118+
);
119+
},
120+
onPause: () => subscription.pause(),
121+
onResume: () => subscription.resume(),
122+
onCancel: () => subscription.cancel(),
123+
);
124+
125+
return controller.stream;
127126
}
128127
}

test/listenable_stream_test.dart

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@ import 'package:flutter/foundation.dart';
22
import 'package:flutter_test/flutter_test.dart';
33
import 'package:listenable_stream/listenable_stream.dart';
44

5+
void _isSingleSubscriptionStream(Stream<dynamic> stream) {
6+
expect(stream.isBroadcast, isFalse);
7+
8+
final listen = () => stream.listen(null);
9+
listen();
10+
expect(listen, throwsStateError);
11+
}
12+
513
void main() {
614
group('ListenableToStream', () {
715
test('Emit self when calling `notifyListeners()`', () {
@@ -23,6 +31,11 @@ void main() {
2331
changeNotifier.notifyListeners();
2432
changeNotifier.notifyListeners();
2533
});
34+
35+
test('Single-Subscription Stream', () {
36+
final stream = ChangeNotifier().toStream();
37+
_isSingleSubscriptionStream(stream);
38+
});
2639
});
2740

2841
group('ValueListenableToStream', () {
@@ -56,5 +69,17 @@ void main() {
5669
valueNotifier.value = 2;
5770
valueNotifier.value = 3;
5871
});
72+
73+
test('Single-Subscription Stream', () {
74+
{
75+
final stream = ValueNotifier(0).toValueStream();
76+
_isSingleSubscriptionStream(stream);
77+
}
78+
79+
{
80+
final stream = ValueNotifier(0).toValueStream(replayValue: true);
81+
_isSingleSubscriptionStream(stream);
82+
}
83+
});
5984
});
6085
}

0 commit comments

Comments
 (0)