Skip to content

Commit 52f6bc7

Browse files
authored
Merge pull request #98 from bosch-io/feature/declared-acks
Allow declaration of acknowledgements for Ditto Java Client
2 parents 066c9c0 + c973112 commit 52f6bc7

File tree

10 files changed

+208
-7
lines changed

10 files changed

+208
-7
lines changed

java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414

1515
import java.net.URI;
1616
import java.time.Duration;
17+
import java.util.Collection;
1718
import java.util.Optional;
19+
import java.util.Set;
1820
import java.util.function.Consumer;
1921

2022
import javax.annotation.Nullable;
2123

24+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
2225
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
2326

2427
/**
@@ -49,6 +52,14 @@ public interface MessagingConfiguration {
4952
*/
5053
URI getEndpointUri();
5154

55+
/**
56+
* Returns the labels of all acknowledgements that are declared to be provided by this connection.
57+
*
58+
* @return the acknowledgment labels.
59+
* @since 1.4.0
60+
*/
61+
Set<AcknowledgementLabel> getDeclaredAcknowledgements();
62+
5263
/**
5364
* @return {@code true} if client should try to reconnect when connection is lost.
5465
*/
@@ -114,6 +125,15 @@ interface Builder {
114125
*/
115126
Builder endpoint(String endpoint);
116127

128+
/**
129+
* Sets the labels of all acknowledgements that are declared to be provided by this client session/connection.
130+
*
131+
* @param acknowledgementLabels the acknowledgement labels
132+
* @return this builder.
133+
* @since 1.4.0
134+
*/
135+
Builder declaredAcknowledgements(Collection<AcknowledgementLabel> acknowledgementLabels);
136+
117137
/**
118138
* Sets if {@code reconnectEnabled}.
119139
* <p> Default is enabled. If a connection was established once, the client tries to reconnect <em>every 5
@@ -157,7 +177,7 @@ interface Builder {
157177
* @param handler the handler that will be called with the cause of the connection error.
158178
* @since 1.2.0
159179
*/
160-
Builder connectionErrorHandler(final Consumer<Throwable> handler);
180+
Builder connectionErrorHandler(@Nullable final Consumer<Throwable> handler);
161181

162182
/**
163183
* Creates a new instance of {@code MessagingConfiguration}.

java/src/main/java/org/eclipse/ditto/client/configuration/WebSocketMessagingConfiguration.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
import java.text.MessageFormat;
2020
import java.time.Duration;
2121
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.HashSet;
2225
import java.util.List;
2326
import java.util.Optional;
27+
import java.util.Set;
2428
import java.util.function.Consumer;
2529
import java.util.regex.Matcher;
2630
import java.util.regex.Pattern;
2731

2832
import javax.annotation.Nullable;
2933

34+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
3035
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
3136

3237
/**
@@ -44,6 +49,7 @@ public final class WebSocketMessagingConfiguration implements MessagingConfigura
4449
@Nullable private final ProxyConfiguration proxyConfiguration;
4550
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;
4651
@Nullable private final Consumer<Throwable> connectionErrorHandler;
52+
private final Set<AcknowledgementLabel> declaredAcknowledgements;
4753

4854
public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuilder builder,
4955
final URI endpointUri) {
@@ -55,6 +61,7 @@ public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuil
5561
trustStoreConfiguration = builder.trustStoreConfiguration;
5662
connectionErrorHandler = builder.connectionErrorHandler;
5763
this.timeout = builder.timeout;
64+
this.declaredAcknowledgements = Collections.unmodifiableSet(builder.declaredAcknowledgements);
5865
this.endpointUri = endpointUri;
5966
}
6067

@@ -77,6 +84,11 @@ public URI getEndpointUri() {
7784
return endpointUri;
7885
}
7986

87+
@Override
88+
public Set<AcknowledgementLabel> getDeclaredAcknowledgements() {
89+
return declaredAcknowledgements;
90+
}
91+
8092
@Override
8193
public boolean isReconnectEnabled() {
8294
return reconnectEnabled;
@@ -116,6 +128,7 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes
116128
@Nullable private ProxyConfiguration proxyConfiguration;
117129
private TrustStoreConfiguration trustStoreConfiguration;
118130
@Nullable private Consumer<Throwable> connectionErrorHandler;
131+
private final Set<AcknowledgementLabel> declaredAcknowledgements = new HashSet<>();
119132

120133
private WebSocketMessagingConfigurationBuilder() {
121134
jsonSchemaVersion = JsonSchemaVersion.LATEST;
@@ -150,6 +163,13 @@ public MessagingConfiguration.Builder endpoint(final String endpoint) {
150163
return this;
151164
}
152165

166+
@Override
167+
public Builder declaredAcknowledgements(final Collection<AcknowledgementLabel> acknowledgementLabels) {
168+
this.declaredAcknowledgements.clear();
169+
this.declaredAcknowledgements.addAll(acknowledgementLabels);
170+
return this;
171+
}
172+
153173
@Override
154174
public MessagingConfiguration.Builder reconnectEnabled(final boolean reconnectEnabled) {
155175
this.reconnectEnabled = reconnectEnabled;
@@ -177,7 +197,7 @@ public MessagingConfiguration.Builder trustStoreConfiguration(
177197
}
178198

179199
@Override
180-
public Builder connectionErrorHandler(final Consumer<Throwable> handler) {
200+
public Builder connectionErrorHandler(@Nullable final Consumer<Throwable> handler) {
181201
this.connectionErrorHandler = handler;
182202
return this;
183203
}

java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package org.eclipse.ditto.client.internal;
1414

1515
import java.text.MessageFormat;
16+
import java.util.Optional;
1617
import java.util.concurrent.CompletableFuture;
1718
import java.util.concurrent.CompletionStage;
1819
import java.util.function.Consumer;
@@ -24,7 +25,9 @@
2425
import org.eclipse.ditto.client.changes.internal.ImmutableFeatureChange;
2526
import org.eclipse.ditto.client.changes.internal.ImmutableFeaturesChange;
2627
import org.eclipse.ditto.client.changes.internal.ImmutableThingChange;
28+
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
2729
import org.eclipse.ditto.client.internal.bus.BusFactory;
30+
import org.eclipse.ditto.client.internal.bus.Classification;
2831
import org.eclipse.ditto.client.internal.bus.JsonPointerSelectors;
2932
import org.eclipse.ditto.client.internal.bus.PointerBus;
3033
import org.eclipse.ditto.client.internal.bus.SelectorUtil;
@@ -37,6 +40,8 @@
3740
import org.eclipse.ditto.client.twin.Twin;
3841
import org.eclipse.ditto.client.twin.internal.TwinImpl;
3942
import org.eclipse.ditto.json.JsonPointer;
43+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
44+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
4045
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
4146
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
4247
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
@@ -47,9 +52,11 @@
4752
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
4853
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
4954
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
55+
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
5056
import org.eclipse.ditto.protocoladapter.TopicPath;
5157
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
5258
import org.eclipse.ditto.signals.base.Signal;
59+
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
5360
import org.eclipse.ditto.signals.events.things.AclEntryCreated;
5461
import org.eclipse.ditto.signals.events.things.AclEntryDeleted;
5562
import org.eclipse.ditto.signals.events.things.AclEntryModified;
@@ -107,6 +114,7 @@ private DefaultDittoClient(final TwinImpl twin, final LiveImpl live, final Polic
107114
this.live = live;
108115
this.policies = policies;
109116
logVersionInformation();
117+
handleSpontaneousErrors();
110118
}
111119

112120
/**
@@ -587,4 +595,48 @@ public CompletionStage<DittoClient> connect() {
587595
.thenCompose(result -> policies.messagingProvider.initializeAsync())
588596
.thenApply(result -> this);
589597
}
598+
599+
private void handleSpontaneousErrors() {
600+
handleSpontaneousErrors(twin.messagingProvider);
601+
if (live.messagingProvider != twin.messagingProvider) {
602+
handleSpontaneousErrors(live.messagingProvider);
603+
}
604+
if (policies.messagingProvider != twin.messagingProvider) {
605+
handleSpontaneousErrors(policies.messagingProvider);
606+
}
607+
}
608+
609+
/**
610+
* Handle {@code DittoRuntimeException}s from the back-end that are not replies of anything.
611+
*
612+
* @param provider the messaging provider.
613+
*/
614+
private static void handleSpontaneousErrors(final MessagingProvider provider) {
615+
final Optional<Consumer<Throwable>> connectionErrorHandler =
616+
provider.getMessagingConfiguration().getConnectionErrorHandler();
617+
if (connectionErrorHandler.isPresent()) {
618+
final AdaptableBus adaptableBus = provider.getAdaptableBus();
619+
final Consumer<Throwable> consumer = connectionErrorHandler.get();
620+
621+
final Classification ackLabelNotUnique =
622+
Classification.forErrorCode(AcknowledgementLabelNotUniqueException.ERROR_CODE);
623+
final Classification ackLabelNotDeclared =
624+
Classification.forErrorCode(AcknowledgementLabelNotDeclaredException.ERROR_CODE);
625+
626+
adaptableBus.subscribeForAdaptableExclusively(ackLabelNotUnique,
627+
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
628+
adaptableBus.subscribeForAdaptableExclusively(ackLabelNotDeclared,
629+
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
630+
}
631+
}
632+
633+
private static Throwable asDittoRuntimeException(final Adaptable adaptable) {
634+
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
635+
if (signal instanceof ErrorResponse) {
636+
return ((ErrorResponse<?>) signal).getDittoRuntimeException();
637+
} else {
638+
return new ClassCastException("Expect an error response, got: " +
639+
ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
640+
}
641+
}
590642
}

java/src/main/java/org/eclipse/ditto/client/internal/bus/AdaptableBus.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.time.Duration;
1616
import java.util.concurrent.CompletionStage;
17+
import java.util.concurrent.ScheduledExecutorService;
1718
import java.util.function.Consumer;
1819
import java.util.function.Predicate;
1920

@@ -83,6 +84,16 @@ public interface AdaptableBus {
8384
*/
8485
SubscriptionId subscribeForAdaptable(Classification tag, Consumer<Adaptable> adaptableConsumer);
8586

87+
/**
88+
* Add a persistent subscriber for an adaptable message and remove all other subscribers.
89+
* Only effective if no one-time string or adaptable subscriber matches.
90+
*
91+
* @param tag the adaptable classification.
92+
* @param adaptableConsumer the consumer of the adaptable message.
93+
* @return the subscription ID.
94+
*/
95+
SubscriptionId subscribeForAdaptableExclusively(Classification tag, Consumer<Adaptable> adaptableConsumer);
96+
8697
/**
8798
* Add a persistent subscriber for an adaptable message that are removed after a timeout.
8899
* If tag requires sequentialization, take care that all consumer and predicate parameters are fast,
@@ -109,6 +120,11 @@ SubscriptionId subscribeForAdaptableWithTimeout(Classification tag,
109120
*/
110121
boolean unsubscribe(@Nullable SubscriptionId subscriptionId);
111122

123+
/**
124+
* @return the scheduled executor service of this adaptable bus.
125+
*/
126+
ScheduledExecutorService getScheduledExecutor();
127+
112128
/**
113129
* Closes the executor of the adaptable bus .
114130
*/

java/src/main/java/org/eclipse/ditto/client/internal/bus/BusFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static AdaptableBus createAdaptableBus() {
5151
.addStringClassifier(Classifiers.identity())
5252
.addAdaptableClassifier(Classifiers.correlationId())
5353
.addAdaptableClassifier(Classifiers.streamingType())
54-
.addAdaptableClassifier(Classifiers.thingsSearch());
54+
.addAdaptableClassifier(Classifiers.thingsSearch())
55+
.addAdaptableClassifier(Classifiers.errorCode());
5556
}
5657
}

java/src/main/java/org/eclipse/ditto/client/internal/bus/Classification.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ static Classification forThingsSearch(final String searchSubscriptionId) {
7070
return new SearchSubscriptionId(searchSubscriptionId);
7171
}
7272

73+
/**
74+
* Create an error-code classification key.
75+
*
76+
* @param errorCode the error code.
77+
* @return the key.
78+
*/
79+
static Classification forErrorCode(final String errorCode) {
80+
return new ErrorCode(errorCode);
81+
}
82+
7383
/**
7484
* Check whether subscribers for this classification requires sequential dispatching.
7585
*
@@ -192,4 +202,11 @@ static <T> Optional<Classification> of(final T value) {
192202
return Optional.of(new Identity<>(value));
193203
}
194204
}
205+
206+
final class ErrorCode extends Literal<String> {
207+
208+
private ErrorCode(final String errorCode) {
209+
super(errorCode);
210+
}
211+
}
195212
}

java/src/main/java/org/eclipse/ditto/client/internal/bus/Classifiers.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Optional;
1717

1818
import org.eclipse.ditto.json.JsonValue;
19+
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
1920
import org.eclipse.ditto.protocoladapter.Adaptable;
2021
import org.eclipse.ditto.protocoladapter.TopicPath;
2122
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
@@ -66,6 +67,15 @@ public static Classifier<Adaptable> thingsSearch() {
6667
return Instances.THINGS_SEARCH_CLASSIFIER;
6768
}
6869

70+
/**
71+
* Classify Ditto protocol errors by error codes.
72+
*
73+
* @return the error code classifier.
74+
*/
75+
public static Classifier<Adaptable> errorCode() {
76+
return Instances.ERROR_CODE_CLASSIFIER;
77+
}
78+
6979
private static final class StreamingTypeClassifier implements Classifier<Adaptable> {
7080

7181
@Override
@@ -116,6 +126,22 @@ public Optional<Classification> classify(final Adaptable message) {
116126
}
117127
}
118128

129+
private static final class ErrorCodeClassifier implements Classifier<Adaptable> {
130+
131+
@Override
132+
public Optional<Classification> classify(final Adaptable message) {
133+
if (message.getTopicPath().getCriterion() == TopicPath.Criterion.ERRORS) {
134+
return message.getPayload()
135+
.getValue()
136+
.filter(JsonValue::isObject)
137+
.flatMap(value -> value.asObject().getValue(DittoRuntimeException.JsonFields.ERROR_CODE))
138+
.map(Classification::forErrorCode);
139+
} else {
140+
return Optional.empty();
141+
}
142+
}
143+
}
144+
119145
private static final class Instances {
120146

121147
private static final Classifier<Adaptable> CORRELATION_ID_CLASSIFIER = adaptable ->
@@ -126,5 +152,7 @@ private static final class Instances {
126152
private static final Classifier<Adaptable> STREAMING_TYPE_CLASSIFIER = new StreamingTypeClassifier();
127153

128154
private static final Classifier<Adaptable> THINGS_SEARCH_CLASSIFIER = new ThingsSearchClassifier();
155+
156+
private static final Classifier<Adaptable> ERROR_CODE_CLASSIFIER = new ErrorCodeClassifier();
129157
}
130158
}

0 commit comments

Comments
 (0)