Skip to content

Commit 10578b1

Browse files
committed
♻️ Refactoring code.
1 parent 652cb34 commit 10578b1

File tree

3 files changed

+74
-62
lines changed

3 files changed

+74
-62
lines changed

lib/src/listenable_to_stream.dart

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import 'dart:async';
22

3-
import 'package:flutter/foundation.dart';
3+
import 'package:flutter/foundation.dart' show Listenable, VoidCallback;
44

55
/// Convert this [Listenable] to a [Stream].
66
extension ListenableToStream<T extends Listenable> on T {
@@ -11,12 +11,15 @@ extension ListenableToStream<T extends Listenable> on T {
1111

1212
controller = StreamController<T>(
1313
sync: true,
14-
onListen: () => addListener(listener = () => controller.add(this)),
14+
onListen: () {
15+
listener = () => controller.add(this);
16+
addListener(listener);
17+
},
1518
onCancel: () {
1619
try {
1720
removeListener(listener);
1821
listener = null;
19-
} catch (_) {}
22+
} catch (_ /*Ignore*/) {}
2023
},
2124
);
2225

lib/src/streamx.dart

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import 'dart:async'
2+
show MultiStreamController, Stream, StreamController, StreamSubscription;
3+
4+
/// @private
5+
/// Prepends a value to the source [Stream].
6+
extension StartWithExtension<T> on Stream<T> {
7+
/// @private
8+
/// Prepends a value to the source [Stream].
9+
Stream<T> startWith(T Function() seededProvider) {
10+
MultiStreamController<T> controller;
11+
StreamSubscription<T> subscription;
12+
13+
final listenUpStream = () => listen(
14+
controller.addSync,
15+
onError: controller.addErrorSync,
16+
onDone: () {
17+
subscription = null;
18+
controller.closeSync();
19+
},
20+
);
21+
22+
final onListen = (MultiStreamController<T> c) {
23+
if (controller != null) {
24+
return;
25+
}
26+
27+
controller = c;
28+
controller.addSync(seededProvider());
29+
30+
subscription = listenUpStream();
31+
controller.onCancel = () {
32+
subscription?.cancel();
33+
subscription = null;
34+
};
35+
};
36+
37+
return Stream.multi(onListen, isBroadcast: false).toSingleSubscription();
38+
}
39+
}
40+
41+
/// @private
42+
/// Convert a [Stream] to Single-Subscription [Stream].
43+
extension ToSingleSubscriptionStreamExtension<T> on Stream<T> {
44+
/// @private
45+
/// Convert a [Stream] to Single-Subscription [Stream].
46+
Stream<T> toSingleSubscription() {
47+
StreamController<T> controller;
48+
StreamSubscription<T> subscription;
49+
50+
controller = StreamController<T>(
51+
sync: true,
52+
onListen: () {
53+
subscription = listen(
54+
controller.add,
55+
onError: controller.addError,
56+
onDone: controller.close,
57+
);
58+
},
59+
onPause: () => subscription.pause(),
60+
onResume: () => subscription.resume(),
61+
onCancel: () => subscription.cancel(),
62+
);
63+
64+
return controller.stream;
65+
}
66+
}
Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import 'dart:async'
2-
show MultiStreamController, Stream, StreamController, StreamSubscription;
1+
import 'dart:async' show Stream, StreamSubscription;
32

43
import 'package:flutter/foundation.dart' show ValueListenable;
54
import 'package:rxdart/rxdart.dart' show ErrorAndStackTrace, ValueStream;
65

76
import 'listenable_to_stream.dart';
7+
import 'streamx.dart';
88

99
/// Convert this [ValueListenable] to a [ValueStream].
1010
/// The returned [ValueStream] is a Single-Subscription [Stream].
@@ -68,60 +68,3 @@ class ValueListenableStream<T> extends Stream<T> implements ValueStream<T> {
6868
);
6969
}
7070
}
71-
72-
/// TODO
73-
extension StartWithExtension<T> on Stream<T> {
74-
/// TODO
75-
Stream<T> startWith(T Function() seededProvider) {
76-
MultiStreamController<T> multiController;
77-
StreamSubscription<T> upstreamSubscription;
78-
79-
final listenUpStream = () => listen(
80-
multiController.addSync,
81-
onError: multiController.addErrorSync,
82-
onDone: () {
83-
upstreamSubscription = null;
84-
multiController.closeSync();
85-
},
86-
);
87-
88-
final onListen = (MultiStreamController<T> c) {
89-
if (multiController != null) {
90-
return;
91-
}
92-
93-
multiController = c;
94-
multiController.addSync(seededProvider());
95-
96-
upstreamSubscription = listenUpStream();
97-
multiController.onCancel = () {
98-
upstreamSubscription?.cancel();
99-
upstreamSubscription = null;
100-
};
101-
};
102-
103-
return Stream.multi(onListen, isBroadcast: false).toSingleSubscription();
104-
}
105-
106-
/// TODO
107-
Stream<T> toSingleSubscription() {
108-
StreamController<T> controller;
109-
StreamSubscription<T> subscription;
110-
111-
controller = StreamController<T>(
112-
sync: true,
113-
onListen: () {
114-
subscription = listen(
115-
controller.add,
116-
onError: controller.addError,
117-
onDone: controller.close,
118-
);
119-
},
120-
onPause: () => subscription.pause(),
121-
onResume: () => subscription.resume(),
122-
onCancel: () => subscription.cancel(),
123-
);
124-
125-
return controller.stream;
126-
}
127-
}

0 commit comments

Comments
 (0)