Skip to content

Commit 3b06401

Browse files
Move objectbox-rxjava into this repository.
1 parent 733cdae commit 3b06401

File tree

8 files changed

+549
-0
lines changed

8 files changed

+549
-0
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def projectNamesToPublish =
4242
'objectbox-java-api',
4343
'objectbox-java',
4444
'objectbox-kotlin',
45+
'objectbox-rxjava',
4546
'objectbox-generator',
4647
'objectbox-daocompat',
4748
'linux',
@@ -139,6 +140,7 @@ task installAll {
139140
dependsOn ':objectbox-java-api:install'
140141
dependsOn ':objectbox-java:install'
141142
dependsOn ':objectbox-kotlin:install'
143+
dependsOn ':objectbox-rxjava:install'
142144
doLast {
143145
println("Installed version $version")
144146
}
@@ -150,6 +152,7 @@ task deployAll {
150152
dependsOn ':objectbox-java-api:uploadArchives'
151153
dependsOn ':objectbox-java:uploadArchives'
152154
dependsOn ':objectbox-kotlin:uploadArchives'
155+
dependsOn ':objectbox-rxjava:uploadArchives'
153156
}
154157

155158
// this task is also used by the composite build ('objectbox-deploy'), check before making changes

objectbox-rxjava/build.gradle

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
apply plugin: 'java'
2+
3+
group = 'io.objectbox'
4+
version= rootProject.version
5+
6+
sourceCompatibility = 1.7
7+
8+
dependencies {
9+
compile project(':objectbox-java')
10+
compile 'io.reactivex.rxjava2:rxjava:2.0.6'
11+
12+
testCompile 'junit:junit:4.12'
13+
testCompile 'org.mockito:mockito-core:2.7.10'
14+
}
15+
16+
javadoc {
17+
failOnError = false
18+
title = "ObjectBox RxJava2 ${version} API"
19+
excludes = [] // Unfinished APIs if any
20+
options.bottom = 'Available under the Apache License, Version 2.0 - <i>Copyright &#169; 2017 <a href="http://objectbox.io/">ObjectBox Ltd</a>. All Rights Reserved.</i>'
21+
doLast {
22+
copy {
23+
from '../javadoc-style/'
24+
into "build/docs/javadoc/"
25+
}
26+
}
27+
}
28+
29+
task javadocJar(type: Jar, dependsOn: javadoc) {
30+
classifier = 'javadoc'
31+
from 'build/docs/javadoc'
32+
}
33+
34+
task sourcesJar(type: Jar) {
35+
from sourceSets.main.allSource
36+
classifier = 'sources'
37+
}
38+
39+
artifacts {
40+
archives jar
41+
archives javadocJar
42+
archives sourcesJar
43+
}
44+
45+
uploadArchives {
46+
repositories {
47+
mavenDeployer {
48+
// Basic definitions are defined in root project
49+
pom.project {
50+
name 'ObjectBox RxJava API'
51+
description 'RxJava extension for ObjectBox'
52+
53+
licenses {
54+
license {
55+
name 'The Apache Software License, Version 2.0'
56+
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
57+
distribution 'repo'
58+
}
59+
}
60+
}
61+
}
62+
}
63+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.rx;
18+
19+
import io.objectbox.BoxStore;
20+
import io.objectbox.reactive.DataObserver;
21+
import io.objectbox.reactive.DataSubscription;
22+
import io.reactivex.Observable;
23+
import io.reactivex.ObservableEmitter;
24+
import io.reactivex.ObservableOnSubscribe;
25+
import io.reactivex.functions.Cancellable;
26+
27+
/**
28+
* Static methods to Rx-ify ObjectBox queries.
29+
*/
30+
public abstract class RxBoxStore {
31+
/**
32+
* Using the returned Observable, you can be notified about data changes.
33+
* Once a transaction is committed, you will get info on classes with changed Objects.
34+
*/
35+
public static <T> Observable<Class> observable(final BoxStore boxStore) {
36+
return Observable.create(new ObservableOnSubscribe<Class>() {
37+
@Override
38+
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
39+
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
40+
@Override
41+
public void onData(Class data) {
42+
if (!emitter.isDisposed()) {
43+
emitter.onNext(data);
44+
}
45+
}
46+
});
47+
emitter.setCancellable(new Cancellable() {
48+
@Override
49+
public void cancel() throws Exception {
50+
dataSubscription.cancel();
51+
}
52+
});
53+
}
54+
});
55+
}
56+
57+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.rx;
18+
19+
import java.util.List;
20+
21+
import io.objectbox.query.Query;
22+
import io.objectbox.reactive.DataObserver;
23+
import io.objectbox.reactive.DataSubscription;
24+
import io.reactivex.BackpressureStrategy;
25+
import io.reactivex.Flowable;
26+
import io.reactivex.FlowableEmitter;
27+
import io.reactivex.FlowableOnSubscribe;
28+
import io.reactivex.Observable;
29+
import io.reactivex.ObservableEmitter;
30+
import io.reactivex.ObservableOnSubscribe;
31+
import io.reactivex.Single;
32+
import io.reactivex.SingleEmitter;
33+
import io.reactivex.SingleOnSubscribe;
34+
import io.reactivex.functions.Cancellable;
35+
36+
/**
37+
* Static methods to Rx-ify ObjectBox queries.
38+
*/
39+
public abstract class RxQuery {
40+
/**
41+
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
42+
* Uses BackpressureStrategy.BUFFER.
43+
*/
44+
public static <T> Flowable<T> flowableOneByOne(final Query<T> query) {
45+
return flowableOneByOne(query, BackpressureStrategy.BUFFER);
46+
}
47+
48+
/**
49+
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
50+
* Uses given BackpressureStrategy.
51+
*/
52+
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
53+
return Flowable.create(new FlowableOnSubscribe<T>() {
54+
@Override
55+
public void subscribe(final FlowableEmitter<T> emitter) throws Exception {
56+
createListItemEmitter(query, emitter);
57+
}
58+
59+
}, strategy);
60+
}
61+
62+
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
63+
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
64+
@Override
65+
public void onData(List<T> data) {
66+
for (T datum : data) {
67+
if (emitter.isCancelled()) {
68+
return;
69+
} else {
70+
emitter.onNext(datum);
71+
}
72+
}
73+
if (!emitter.isCancelled()) {
74+
emitter.onComplete();
75+
}
76+
}
77+
});
78+
emitter.setCancellable(new Cancellable() {
79+
@Override
80+
public void cancel() throws Exception {
81+
dataSubscription.cancel();
82+
}
83+
});
84+
}
85+
86+
/**
87+
* The returned Observable emits Query results as Lists.
88+
* Never completes, so you will get updates when underlying data changes.
89+
*/
90+
public static <T> Observable<List<T>> observable(final Query<T> query) {
91+
return Observable.create(new ObservableOnSubscribe<List<T>>() {
92+
@Override
93+
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
94+
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
95+
@Override
96+
public void onData(List<T> data) {
97+
if (!emitter.isDisposed()) {
98+
emitter.onNext(data);
99+
}
100+
}
101+
});
102+
emitter.setCancellable(new Cancellable() {
103+
@Override
104+
public void cancel() throws Exception {
105+
dataSubscription.cancel();
106+
}
107+
});
108+
}
109+
});
110+
}
111+
112+
/**
113+
* The returned Single emits one Query result as a List.
114+
*/
115+
public static <T> Single<List<T>> single(final Query<T> query) {
116+
return Single.create(new SingleOnSubscribe<List<T>>() {
117+
@Override
118+
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
119+
query.subscribe().single().observer(new DataObserver<List<T>>() {
120+
@Override
121+
public void onData(List<T> data) {
122+
if (!emitter.isDisposed()) {
123+
emitter.onSuccess(data);
124+
}
125+
}
126+
});
127+
// no need to cancel, single never subscribes
128+
}
129+
});
130+
}
131+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.query;
18+
19+
import io.objectbox.reactive.DataObserver;
20+
import io.objectbox.reactive.DataPublisher;
21+
import io.objectbox.reactive.DataPublisherUtils;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.CopyOnWriteArraySet;
26+
27+
public class FakeQueryPublisher<T> implements DataPublisher<List<T>> {
28+
29+
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet();
30+
31+
private List<T> queryResult = Collections.emptyList();
32+
33+
public List<T> getQueryResult() {
34+
return queryResult;
35+
}
36+
37+
public void setQueryResult(List<T> queryResult) {
38+
this.queryResult = queryResult;
39+
}
40+
41+
@Override
42+
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
43+
observers.add(observer);
44+
}
45+
46+
@Override
47+
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
48+
observer.onData(queryResult);
49+
}
50+
51+
public void publish() {
52+
for (DataObserver<List<T>> observer : observers) {
53+
observer.onData(queryResult);
54+
}
55+
}
56+
57+
@Override
58+
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
59+
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
60+
}
61+
62+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2017 ObjectBox Ltd. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.objectbox.query;
18+
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
21+
22+
import io.objectbox.Box;
23+
import io.objectbox.BoxStore;
24+
import io.objectbox.reactive.SubscriptionBuilder;
25+
26+
public class MockQuery<T> {
27+
private Box box;
28+
private BoxStore boxStore;
29+
private final Query query;
30+
private final FakeQueryPublisher fakeQueryPublisher;
31+
32+
public MockQuery(boolean hasOrder) {
33+
// box = mock(Box.class);
34+
// boxStore = mock(BoxStore.class);
35+
// when(box.getStore()).thenReturn(boxStore);
36+
query = mock(Query.class);
37+
fakeQueryPublisher = new FakeQueryPublisher();
38+
SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(fakeQueryPublisher, null, null);
39+
when(query.subscribe()).thenReturn(subscriptionBuilder);
40+
}
41+
42+
public Box getBox() {
43+
return box;
44+
}
45+
46+
public BoxStore getBoxStore() {
47+
return boxStore;
48+
}
49+
50+
public Query getQuery() {
51+
return query;
52+
}
53+
54+
public FakeQueryPublisher getFakeQueryPublisher() {
55+
return fakeQueryPublisher;
56+
}
57+
}

0 commit comments

Comments
 (0)