Skip to content

Commit e9d3720

Browse files
author
Yufei Cai
authored
Merge pull request #53 from bosch-io/feature/search-protocol
Add search support and restructure messaging providers.
2 parents 21efb53 + 9c1743f commit e9d3720

File tree

67 files changed

+5405
-3369
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+5405
-3369
lines changed

java/pom.xml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@
208208
<mockito.version>3.1.0</mockito.version>
209209
<jsonassert.version>1.2.3</jsonassert.version>
210210

211+
<!-- reactive streams versions -->
212+
<reactive-streams.version>1.0.3</reactive-streams.version>
213+
<reactive-streams-tck.version>${reactive-streams.version}</reactive-streams-tck.version>
214+
<!-- carefully set testng version to one whose junit dependency is equal to ${junit.version} -->
215+
<reactive-streams-tck.testng.version>6.14.3</reactive-streams-tck.testng.version>
216+
211217
<json-unit.version>1.28.1</json-unit.version>
212218
<pax.exam.version>4.13.0</pax.exam.version>
213219
<felix.version>6.0.3</felix.version>
@@ -622,6 +628,10 @@
622628
<plugin>
623629
<groupId>org.apache.maven.plugins</groupId>
624630
<artifactId>maven-compiler-plugin</artifactId>
631+
<configuration>
632+
<source>8</source>
633+
<target>8</target>
634+
</configuration>
625635
</plugin>
626636
<plugin>
627637
<groupId>org.codehaus.mojo</groupId>
@@ -759,6 +769,11 @@
759769
<artifactId>slf4j-api</artifactId>
760770
<version>${slf4j.version}</version>
761771
</dependency>
772+
<dependency>
773+
<groupId>org.reactivestreams</groupId>
774+
<artifactId>reactive-streams</artifactId>
775+
<version>${reactive-streams.version}</version>
776+
</dependency>
762777

763778
<!-- ### Compile - OSGi ### -->
764779
<dependency>
@@ -799,6 +814,11 @@
799814
<artifactId>ditto-model-things</artifactId>
800815
<version>${ditto.version}</version>
801816
</dependency>
817+
<dependency>
818+
<groupId>org.eclipse.ditto</groupId>
819+
<artifactId>ditto-model-thingsearch</artifactId>
820+
<version>${ditto.version}</version>
821+
</dependency>
802822

803823
<dependency>
804824
<groupId>org.eclipse.ditto</groupId>
@@ -825,6 +845,11 @@
825845
<artifactId>ditto-signals-commands-messages</artifactId>
826846
<version>${ditto.version}</version>
827847
</dependency>
848+
<dependency>
849+
<groupId>org.eclipse.ditto</groupId>
850+
<artifactId>ditto-signals-commands-thingsearch</artifactId>
851+
<version>${ditto.version}</version>
852+
</dependency>
828853
<dependency>
829854
<groupId>org.eclipse.ditto</groupId>
830855
<artifactId>ditto-signals-events-base</artifactId>
@@ -835,6 +860,11 @@
835860
<artifactId>ditto-signals-events-things</artifactId>
836861
<version>${ditto.version}</version>
837862
</dependency>
863+
<dependency>
864+
<groupId>org.eclipse.ditto</groupId>
865+
<artifactId>ditto-signals-events-thingsearch</artifactId>
866+
<version>${ditto.version}</version>
867+
</dependency>
838868

839869
<dependency>
840870
<groupId>org.eclipse.ditto</groupId>
@@ -945,6 +975,20 @@
945975
<scope>test</scope>
946976
</dependency>
947977

978+
<!-- Testing - reactive streams compatibility -->
979+
<dependency>
980+
<groupId>org.reactivestreams</groupId>
981+
<artifactId>reactive-streams-tck</artifactId>
982+
<version>${reactive-streams-tck.version}</version>
983+
<scope>test</scope>
984+
</dependency>
985+
<dependency>
986+
<groupId>org.testng</groupId>
987+
<artifactId>testng</artifactId>
988+
<version>${reactive-streams-tck.testng.version}</version>
989+
<scope>test</scope>
990+
</dependency>
991+
948992
<!-- ### Testing - Ditto artifacts ### -->
949993
<dependency>
950994
<groupId>org.eclipse.ditto</groupId>

java/src/main/assembly/assembly.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,17 @@
3838
<include>org.eclipse.ditto:ditto-model-policies</include>
3939
<include>org.eclipse.ditto:ditto-model-messages</include>
4040
<include>org.eclipse.ditto:ditto-model-things</include>
41+
<include>org.eclipse.ditto:ditto-model-thingsearch</include>
4142
<include>org.eclipse.ditto:ditto-signals-base</include>
4243
<include>org.eclipse.ditto:ditto-signals-commands-base</include>
44+
<include>org.eclipse.ditto:ditto-signals-commands-thingsearch</include>
4345
<include>org.eclipse.ditto:ditto-signals-commands-things</include>
4446
<include>org.eclipse.ditto:ditto-signals-commands-policies</include>
4547
<include>org.eclipse.ditto:ditto-signals-commands-messages</include>
4648
<include>org.eclipse.ditto:ditto-signals-commands-live</include>
4749
<include>org.eclipse.ditto:ditto-signals-events-base</include>
4850
<include>org.eclipse.ditto:ditto-signals-events-things</include>
51+
<include>org.eclipse.ditto:ditto-signals-events-thingsearch</include>
4952
<include>org.eclipse.ditto:ditto-protocol-adapter</include>
5053
<include>org.eclipse.ditto:ditto-client</include>
5154
</includes>

java/src/main/java/org/eclipse/ditto/client/DittoClients.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import javax.annotation.concurrent.Immutable;
1717

1818
import org.eclipse.ditto.client.internal.DefaultDittoClient;
19-
import org.eclipse.ditto.client.internal.ResponseForwarder;
2019
import org.eclipse.ditto.client.live.internal.MessageSerializerFactory;
2120
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
2221
import org.eclipse.ditto.client.messaging.MessagingProvider;
@@ -83,11 +82,13 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
8382
* @since 1.1.0
8483
*/
8584
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
86-
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider) {
85+
final MessagingProvider liveMessagingProvider,
86+
final MessagingProvider policyMessagingProvider) {
8787

8888
final MessageSerializerRegistry messageSerializerRegistry =
8989
MessageSerializerFactory.newInstance().getMessageSerializerRegistry();
90-
return newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider, messageSerializerRegistry);
90+
return newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
91+
messageSerializerRegistry);
9192
}
9293

9394
/**
@@ -103,9 +104,11 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
103104
* could not be established
104105
*/
105106
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
106-
final MessagingProvider liveMessagingProvider, final MessageSerializerRegistry messageSerializerRegistry) {
107+
final MessagingProvider liveMessagingProvider,
108+
final MessageSerializerRegistry messageSerializerRegistry) {
107109

108-
return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider, messageSerializerRegistry);
110+
return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider,
111+
messageSerializerRegistry);
109112
}
110113

111114
/**
@@ -126,9 +129,10 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
126129
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider,
127130
final MessageSerializerRegistry messageSerializerRegistry) {
128131

129-
final ResponseForwarder responseForwarder = ResponseForwarder.getInstance();
130-
return DefaultDittoClient.newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
131-
responseForwarder, messageSerializerRegistry);
132+
return DefaultDittoClient.newInstance(twinMessagingProvider,
133+
liveMessagingProvider,
134+
policyMessagingProvider,
135+
messageSerializerRegistry);
132136
}
133137

134138
}

java/src/main/java/org/eclipse/ditto/client/changes/internal/ImmutableThingChange.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private static JsonValue getJsonValueForThing(@Nullable final Thing thing) {
8282
* deleted.
8383
* @param revision the revision (change counter) of the change.
8484
* @param timestamp the timestamp of the change.
85+
* @param extra the extra data to be included in the change.
8586
* @throws NullPointerException if any required argument is {@code null}.
8687
*/
8788
public ImmutableThingChange(final ThingId thingId,

java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package org.eclipse.ditto.client.configuration;
1414

1515
import java.net.URI;
16+
import java.time.Duration;
1617
import java.util.Optional;
1718

1819
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
@@ -24,6 +25,13 @@
2425
*/
2526
public interface MessagingConfiguration {
2627

28+
/**
29+
* Returns how long to wait for a response before giving up.
30+
*
31+
* @return the timeout.
32+
*/
33+
Duration getTimeout();
34+
2735
/**
2836
* Returns the JSON schema version to use for messaging.
2937
*
@@ -63,6 +71,14 @@ public interface MessagingConfiguration {
6371
*/
6472
interface Builder {
6573

74+
/**
75+
* Set the timeout waiting for a response.
76+
*
77+
* @param timeout the timeout.
78+
* @return this builder.
79+
*/
80+
Builder timeout(Duration timeout);
81+
6682
/**
6783
* Sets the {@code JSON schema version}.
6884
* <p>

java/src/main/java/org/eclipse/ditto/client/configuration/WebSocketMessagingConfiguration.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,21 @@
1212
*/
1313
package org.eclipse.ditto.client.configuration;
1414

15-
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
15+
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
16+
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
1617

17-
import javax.annotation.Nullable;
1818
import java.net.URI;
1919
import java.text.MessageFormat;
20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.Optional;
2324
import java.util.regex.Matcher;
2425
import java.util.regex.Pattern;
2526

26-
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
27-
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
27+
import javax.annotation.Nullable;
28+
29+
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
2830

2931
/**
3032
* Provides Ditto WebSocket messaging specific configuration.
@@ -33,15 +35,18 @@
3335
*/
3436
public final class WebSocketMessagingConfiguration implements MessagingConfiguration {
3537

38+
private final Duration timeout;
3639
private final JsonSchemaVersion jsonSchemaVersion;
3740
private final URI endpointUri;
3841
private final boolean reconnectEnabled;
3942
@Nullable private final ProxyConfiguration proxyConfiguration;
4043
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;
4144

42-
private WebSocketMessagingConfiguration(final JsonSchemaVersion jsonSchemaVersion, final URI endpointUri,
45+
private WebSocketMessagingConfiguration(final Duration timeout, final JsonSchemaVersion jsonSchemaVersion,
46+
final URI endpointUri,
4347
final boolean reconnectEnabled, @Nullable final ProxyConfiguration proxyConfiguration,
4448
@Nullable final TrustStoreConfiguration trustStoreConfiguration) {
49+
this.timeout = timeout;
4550
this.jsonSchemaVersion = jsonSchemaVersion;
4651
this.endpointUri = endpointUri;
4752
this.reconnectEnabled = reconnectEnabled;
@@ -53,6 +58,11 @@ public static MessagingConfiguration.Builder newBuilder() {
5358
return new WebSocketMessagingConfigurationBuilder();
5459
}
5560

61+
@Override
62+
public Duration getTimeout() {
63+
return timeout;
64+
}
65+
5666
@Override
5767
public JsonSchemaVersion getJsonSchemaVersion() {
5868
return jsonSchemaVersion;
@@ -82,14 +92,21 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes
8292

8393
private static final List<String> ALLOWED_URI_SCHEME = Arrays.asList("wss", "ws");
8494
private static final String WS_PATH = "/ws/";
85-
private static final String WS_PATH_REGEX = "/ws/(1|2)/?";
95+
private static final String WS_PATH_REGEX = "/ws/([12])/?";
8696

97+
private Duration timeout = Duration.ofSeconds(60L);
8798
private JsonSchemaVersion jsonSchemaVersion = JsonSchemaVersion.LATEST;
8899
private URI endpointUri;
89100
private boolean reconnectEnabled = true;
90101
private ProxyConfiguration proxyConfiguration;
91102
private TrustStoreConfiguration trustStoreConfiguration;
92103

104+
@Override
105+
public Builder timeout(final Duration timeout) {
106+
this.timeout = timeout;
107+
return this;
108+
}
109+
93110
@Override
94111
public MessagingConfiguration.Builder jsonSchemaVersion(final JsonSchemaVersion jsonSchemaVersion) {
95112
this.jsonSchemaVersion = checkNotNull(jsonSchemaVersion, "jsonSchemaVersion");
@@ -130,8 +147,8 @@ public MessagingConfiguration.Builder trustStoreConfiguration(
130147

131148
@Override
132149
public MessagingConfiguration build() {
133-
final URI wsEndpointUri= appendWsPathIfNecessary(this.endpointUri, jsonSchemaVersion);
134-
return new WebSocketMessagingConfiguration(jsonSchemaVersion, wsEndpointUri, reconnectEnabled,
150+
final URI wsEndpointUri = appendWsPathIfNecessary(this.endpointUri, jsonSchemaVersion);
151+
return new WebSocketMessagingConfiguration(timeout, jsonSchemaVersion, wsEndpointUri, reconnectEnabled,
135152
proxyConfiguration, trustStoreConfiguration);
136153
}
137154

@@ -152,13 +169,15 @@ private static boolean needToAppendWsPath(final URI baseUri) {
152169
return !matcher.find();
153170
}
154171

155-
private static void checkIfBaseUriAndSchemaVersionMatch(final URI baseUri, final JsonSchemaVersion schemaVersion) {
172+
private static void checkIfBaseUriAndSchemaVersionMatch(final URI baseUri,
173+
final JsonSchemaVersion schemaVersion) {
156174
final String path = removeTrailingSlashFromPath(baseUri.getPath());
157-
final String apiVersion = path.substring(path.length() - 1, path.length());
175+
final String apiVersion = path.substring(path.length() - 1);
158176
if (!schemaVersion.toString().equals(apiVersion)) {
159-
throw new IllegalArgumentException("The jsonSchemaVersion and apiVersion of the endpoint do not match. " +
160-
"Either remove the ws path from the endpoint or " +
161-
"use the same jsonSchemaVersion as in the ws path of the endpoint.");
177+
throw new IllegalArgumentException(
178+
"The jsonSchemaVersion and apiVersion of the endpoint do not match. " +
179+
"Either remove the ws path from the endpoint or " +
180+
"use the same jsonSchemaVersion as in the ws path of the endpoint.");
162181
}
163182
}
164183

0 commit comments

Comments
 (0)