25
25
import io .reactivex .FlowableEmitter ;
26
26
import io .reactivex .FlowableOnSubscribe ;
27
27
import io .reactivex .Maybe ;
28
+ import io .reactivex .MaybeEmitter ;
29
+ import io .reactivex .MaybeOnSubscribe ;
30
+ import io .reactivex .MaybeSource ;
28
31
import io .reactivex .Single ;
29
32
import io .reactivex .SingleEmitter ;
30
33
import io .reactivex .SingleOnSubscribe ;
31
- import io .reactivex .SingleSource ;
32
34
import io .reactivex .functions .Cancellable ;
33
35
import io .reactivex .functions .Function ;
34
36
import io .reactivex .functions .Predicate ;
@@ -58,8 +60,9 @@ public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Except
58
60
final ValueEventListener valueEventListener = new ValueEventListener () {
59
61
@ Override
60
62
public void onDataChange (DataSnapshot dataSnapshot ) {
61
- if (dataSnapshot .exists ())
63
+ if (dataSnapshot .exists ()) {
62
64
emitter .onNext (dataSnapshot );
65
+ }
63
66
}
64
67
65
68
@ Override
@@ -86,15 +89,16 @@ public void cancel() throws Exception {
86
89
* the given {@link DataSnapshot} exists.
87
90
*/
88
91
@ NonNull
89
- public static Single <DataSnapshot > observeSingleValueEvent (@ NonNull final Query query ) {
90
- return Single .create (new SingleOnSubscribe <DataSnapshot >() {
92
+ public static Maybe <DataSnapshot > observeSingleValueEvent (@ NonNull final Query query ) {
93
+ return Maybe .create (new MaybeOnSubscribe <DataSnapshot >() {
91
94
@ Override
92
- public void subscribe (final SingleEmitter <DataSnapshot > emitter ) throws Exception {
95
+ public void subscribe (final MaybeEmitter <DataSnapshot > emitter ) throws Exception {
93
96
query .addListenerForSingleValueEvent (new ValueEventListener () {
94
97
@ Override
95
98
public void onDataChange (DataSnapshot dataSnapshot ) {
96
- if (dataSnapshot .exists ())
99
+ if (dataSnapshot .exists ()) {
97
100
emitter .onSuccess (dataSnapshot );
101
+ }
98
102
emitter .onComplete ();
99
103
}
100
104
@@ -267,10 +271,10 @@ public void cancel() throws Exception {
267
271
*/
268
272
@ NonNull
269
273
public static Flowable <DataSnapshot > observeMultipleSingleValueEvent (@ NonNull DatabaseReference ... whereRefs ) {
270
- return Single .merge (Flowable .fromArray (whereRefs )
271
- .map (new Function <DatabaseReference , SingleSource <? extends DataSnapshot >>() {
274
+ return Maybe .merge (Flowable .fromArray (whereRefs )
275
+ .map (new Function <DatabaseReference , MaybeSource <? extends DataSnapshot >>() {
272
276
@ Override
273
- public SingleSource <? extends DataSnapshot > apply (@ io .reactivex .annotations .NonNull DatabaseReference databaseReference ) throws
277
+ public MaybeSource <? extends DataSnapshot > apply (@ io .reactivex .annotations .NonNull DatabaseReference databaseReference ) throws
274
278
Exception {
275
279
return observeSingleValueEvent (databaseReference );
276
280
}
0 commit comments