Skip to content

Commit 4f1a176

Browse files
Start RxJava 3 support by copying RxJava 2 module.
1 parent 0345d22 commit 4f1a176

File tree

9 files changed

+566
-1
lines changed

9 files changed

+566
-1
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ def projectNamesToPublish = [
6464
'objectbox-java-api',
6565
'objectbox-java',
6666
'objectbox-kotlin',
67-
'objectbox-rxjava'
67+
'objectbox-rxjava',
68+
'objectbox-rxjava3'
6869
]
6970

7071
def hasSigningProperties() {

objectbox-rxjava3/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
RxJava 3 APIs for ObjectBox
2+
===========================
3+
While ObjectBox has [data observers and reactive extensions](https://docs.objectbox.io/data-observers-and-rx) built-in,
4+
this project adds RxJava 3 support.
5+
6+
For general object changes, you can use `RxBoxStore` to create an `Observable`.
7+
8+
`RxQuery` allows you to interact with ObjectBox `Query` objects using:
9+
* Flowable
10+
* Observable
11+
* Single
12+
13+
For example to get query results and subscribe to future updates (Object changes will automatically emmit new data):
14+
15+
```java
16+
Query query = box.query().build();
17+
RxQuery.observable(query).subscribe(this);
18+
```
19+
20+
Adding the library to your project
21+
-----------------
22+
Grab via Gradle:
23+
```gradle
24+
implementation "io.objectbox:objectbox-rxjava3:$objectboxVersion"
25+
```
26+
27+
Links
28+
-----
29+
[Data Observers and Rx Documentation](https://docs.objectbox.io/data-observers-and-rx)
30+
31+
[Note App example](https://github.com/objectbox/objectbox-examples/blob/master/objectbox-example/src/main/java/io/objectbox/example/ReactiveNoteActivity.java)

objectbox-rxjava3/build.gradle

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apply plugin: 'java'
2+
3+
sourceCompatibility = JavaVersion.VERSION_1_8
4+
targetCompatibility = JavaVersion.VERSION_1_8
5+
6+
dependencies {
7+
compile project(':objectbox-java')
8+
compile 'io.reactivex.rxjava2:rxjava:2.2.18'
9+
10+
testCompile "junit:junit:$junit_version"
11+
// Mockito 3.x requires Java 8.
12+
testCompile 'org.mockito:mockito-core:2.28.2'
13+
}
14+
15+
task javadocJar(type: Jar, dependsOn: javadoc) {
16+
classifier = 'javadoc'
17+
from 'build/docs/javadoc'
18+
}
19+
20+
task sourcesJar(type: Jar) {
21+
from sourceSets.main.allSource
22+
classifier = 'sources'
23+
}
24+
25+
artifacts {
26+
// java plugin adds jar.
27+
archives javadocJar
28+
archives sourcesJar
29+
}
30+
31+
uploadArchives {
32+
repositories {
33+
mavenDeployer {
34+
// Basic definitions are defined in root project
35+
pom.project {
36+
name 'ObjectBox RxJava 3 API'
37+
description 'RxJava 3 extensions for ObjectBox'
38+
39+
licenses {
40+
license {
41+
name 'The Apache Software License, Version 2.0'
42+
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
43+
distribution 'repo'
44+
}
45+
}
46+
}
47+
}
48+
}
49+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.rx;
18+
19+
import io.objectbox.BoxStore;
20+
import io.objectbox.reactive.DataObserver;
21+
import io.objectbox.reactive.DataSubscription;
22+
import io.reactivex.Observable;
23+
import io.reactivex.ObservableEmitter;
24+
import io.reactivex.ObservableOnSubscribe;
25+
import io.reactivex.functions.Cancellable;
26+
27+
/**
28+
* Static methods to Rx-ify ObjectBox queries.
29+
*/
30+
public abstract class RxBoxStore {
31+
/**
32+
* Using the returned Observable, you can be notified about data changes.
33+
* Once a transaction is committed, you will get info on classes with changed Objects.
34+
*/
35+
public static <T> Observable<Class> observable(final BoxStore boxStore) {
36+
return Observable.create(new ObservableOnSubscribe<Class>() {
37+
@Override
38+
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
39+
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
40+
@Override
41+
public void onData(Class data) {
42+
if (!emitter.isDisposed()) {
43+
emitter.onNext(data);
44+
}
45+
}
46+
});
47+
emitter.setCancellable(new Cancellable() {
48+
@Override
49+
public void cancel() throws Exception {
50+
dataSubscription.cancel();
51+
}
52+
});
53+
}
54+
});
55+
}
56+
57+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.rx;
18+
19+
import java.util.List;
20+
21+
import io.objectbox.query.Query;
22+
import io.objectbox.reactive.DataObserver;
23+
import io.objectbox.reactive.DataSubscription;
24+
import io.reactivex.BackpressureStrategy;
25+
import io.reactivex.Flowable;
26+
import io.reactivex.FlowableEmitter;
27+
import io.reactivex.FlowableOnSubscribe;
28+
import io.reactivex.Observable;
29+
import io.reactivex.ObservableEmitter;
30+
import io.reactivex.ObservableOnSubscribe;
31+
import io.reactivex.Single;
32+
import io.reactivex.SingleEmitter;
33+
import io.reactivex.SingleOnSubscribe;
34+
import io.reactivex.functions.Cancellable;
35+
36+
/**
37+
* Static methods to Rx-ify ObjectBox queries.
38+
*/
39+
public abstract class RxQuery {
40+
/**
41+
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
42+
* Uses BackpressureStrategy.BUFFER.
43+
*/
44+
public static <T> Flowable<T> flowableOneByOne(final Query<T> query) {
45+
return flowableOneByOne(query, BackpressureStrategy.BUFFER);
46+
}
47+
48+
/**
49+
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
50+
* Uses given BackpressureStrategy.
51+
*/
52+
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
53+
return Flowable.create(new FlowableOnSubscribe<T>() {
54+
@Override
55+
public void subscribe(final FlowableEmitter<T> emitter) throws Exception {
56+
createListItemEmitter(query, emitter);
57+
}
58+
59+
}, strategy);
60+
}
61+
62+
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
63+
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
64+
@Override
65+
public void onData(List<T> data) {
66+
for (T datum : data) {
67+
if (emitter.isCancelled()) {
68+
return;
69+
} else {
70+
emitter.onNext(datum);
71+
}
72+
}
73+
if (!emitter.isCancelled()) {
74+
emitter.onComplete();
75+
}
76+
}
77+
});
78+
emitter.setCancellable(new Cancellable() {
79+
@Override
80+
public void cancel() throws Exception {
81+
dataSubscription.cancel();
82+
}
83+
});
84+
}
85+
86+
/**
87+
* The returned Observable emits Query results as Lists.
88+
* Never completes, so you will get updates when underlying data changes
89+
* (see {@link Query#subscribe()} for details).
90+
*/
91+
public static <T> Observable<List<T>> observable(final Query<T> query) {
92+
return Observable.create(new ObservableOnSubscribe<List<T>>() {
93+
@Override
94+
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
95+
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
96+
@Override
97+
public void onData(List<T> data) {
98+
if (!emitter.isDisposed()) {
99+
emitter.onNext(data);
100+
}
101+
}
102+
});
103+
emitter.setCancellable(new Cancellable() {
104+
@Override
105+
public void cancel() throws Exception {
106+
dataSubscription.cancel();
107+
}
108+
});
109+
}
110+
});
111+
}
112+
113+
/**
114+
* The returned Single emits one Query result as a List.
115+
*/
116+
public static <T> Single<List<T>> single(final Query<T> query) {
117+
return Single.create(new SingleOnSubscribe<List<T>>() {
118+
@Override
119+
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
120+
query.subscribe().single().observer(new DataObserver<List<T>>() {
121+
@Override
122+
public void onData(List<T> data) {
123+
if (!emitter.isDisposed()) {
124+
emitter.onSuccess(data);
125+
}
126+
}
127+
});
128+
// no need to cancel, single never subscribes
129+
}
130+
});
131+
}
132+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.query;
18+
19+
import io.objectbox.reactive.DataObserver;
20+
import io.objectbox.reactive.DataPublisher;
21+
import io.objectbox.reactive.DataPublisherUtils;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.CopyOnWriteArraySet;
26+
27+
public class FakeQueryPublisher<T> implements DataPublisher<List<T>> {
28+
29+
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet();
30+
31+
private List<T> queryResult = Collections.emptyList();
32+
33+
public List<T> getQueryResult() {
34+
return queryResult;
35+
}
36+
37+
public void setQueryResult(List<T> queryResult) {
38+
this.queryResult = queryResult;
39+
}
40+
41+
@Override
42+
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
43+
observers.add(observer);
44+
}
45+
46+
@Override
47+
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
48+
observer.onData(queryResult);
49+
}
50+
51+
public void publish() {
52+
for (DataObserver<List<T>> observer : observers) {
53+
observer.onData(queryResult);
54+
}
55+
}
56+
57+
@Override
58+
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
59+
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
60+
}
61+
62+
}

0 commit comments

Comments
 (0)