Skip to content

Commit 55d5ca2

Browse files
authored
Merge pull request #105 from bosch-io/bugfix/handle-consumption-error
Ditto-java-client bugfix: When starting consumption with invalid filter, wrongly timeout exception is propagate to the user
2 parents f724b82 + 6ac4dfb commit 55d5ca2

14 files changed

+596
-73
lines changed

java/pom.xml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@
207207
<equals-verifier.version>3.0.3</equals-verifier.version>
208208
<mockito.version>3.1.0</mockito.version>
209209
<jsonassert.version>1.2.3</jsonassert.version>
210+
<awaitility.version>4.0.3</awaitility.version>
210211

211212
<!-- reactive streams versions -->
212213
<reactive-streams.version>1.0.3</reactive-streams.version>
@@ -332,12 +333,7 @@
332333
<plugin>
333334
<groupId>org.apache.maven.plugins</groupId>
334335
<artifactId>maven-surefire-plugin</artifactId>
335-
<version>3.0.0-M4</version>
336-
<configuration>
337-
<systemProperties>
338-
<kamon.auto-start>true</kamon.auto-start>
339-
</systemProperties>
340-
</configuration>
336+
<version>3.0.0-M5</version>
341337
</plugin>
342338
<plugin>
343339
<groupId>org.apache.maven.plugins</groupId>
@@ -920,6 +916,12 @@
920916
<version>${assertj.version}</version>
921917
<scope>test</scope>
922918
</dependency>
919+
<dependency>
920+
<groupId>org.awaitility</groupId>
921+
<artifactId>awaitility</artifactId>
922+
<version>${awaitility.version}</version>
923+
<scope>test</scope>
924+
</dependency>
923925
<dependency>
924926
<groupId>org.mutabilitydetector</groupId>
925927
<artifactId>MutabilityDetector</artifactId>

java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Optional;
27+
import java.util.UUID;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionStage;
30+
import java.util.concurrent.atomic.AtomicBoolean;
2931
import java.util.function.Consumer;
3032
import java.util.function.Function;
3133

@@ -53,6 +55,9 @@
5355
import org.eclipse.ditto.json.JsonObject;
5456
import org.eclipse.ditto.json.JsonPointer;
5557
import org.eclipse.ditto.json.JsonValue;
58+
import org.eclipse.ditto.model.base.common.HttpStatusCode;
59+
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
60+
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
5661
import org.eclipse.ditto.model.messages.Message;
5762
import org.eclipse.ditto.model.messages.MessageDirection;
5863
import org.eclipse.ditto.model.messages.MessageHeaders;
@@ -67,6 +72,7 @@
6772
import org.eclipse.ditto.signals.base.Signal;
6873
import org.eclipse.ditto.signals.base.WithOptionalEntity;
6974
import org.eclipse.ditto.signals.commands.base.CommandResponse;
75+
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
7076
import org.eclipse.ditto.signals.commands.things.modify.CreateThing;
7177
import org.eclipse.ditto.signals.commands.things.modify.DeleteThing;
7278
import org.eclipse.ditto.signals.commands.things.modify.ModifyThing;
@@ -93,6 +99,7 @@ public abstract class CommonManagementImpl<T extends ThingHandle<F>, F extends F
9399

94100
protected final OutgoingMessageFactory outgoingMessageFactory;
95101

102+
private final AtomicBoolean subscriptionRequestPending = new AtomicBoolean(false);
96103
private final HandlerRegistry<T, F> handlerRegistry;
97104
private final PointerBus bus;
98105

@@ -111,12 +118,20 @@ protected CommonManagementImpl(
111118

112119
@Override
113120
public CompletableFuture<Void> startConsumption() {
114-
return doStartConsumption(Collections.emptyMap());
121+
// do not call doStartConsumption directly
122+
return startConsumption(new Option[]{});
115123
}
116124

117125
@Override
118126
public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOptions) {
119127

128+
// concurrent consumption requests can have strange effects, so better avoid it
129+
if (!subscriptionRequestPending.compareAndSet(false, true)) {
130+
final CompletableFuture<Void> failedFuture = new CompletableFuture<>();
131+
failedFuture.completeExceptionally(new ConcurrentConsumptionRequestException());
132+
return failedFuture;
133+
}
134+
120135
// only accept "Consumption" related options here:
121136
final Optional<Option<?>> unknownOptionIncluded = Arrays.stream(consumptionOptions)
122137
.filter(option -> !option.getName().equals(OptionName.Consumption.NAMESPACES))
@@ -139,7 +154,8 @@ public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOp
139154
options.getExtraFields().ifPresent(extraFields ->
140155
subscriptionConfig.put(CONSUMPTION_PARAM_EXTRA_FIELDS, extraFields.toString()));
141156

142-
return doStartConsumption(subscriptionConfig);
157+
// make sure to reset the flag when consumption request completes
158+
return doStartConsumption(subscriptionConfig).whenComplete((v, t) -> subscriptionRequestPending.set(false));
143159
}
144160

145161
/**
@@ -639,8 +655,11 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
639655
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
640656
final Function<Adaptable, NotifyMessage> adaptableToNotifier) {
641657

642-
LOGGER.trace("Sending {} and waiting for {}", protocolCommand, protocolCommandAck);
658+
final String correlationId = UUID.randomUUID().toString();
659+
final String protocolCommandWithCorrelationId = appendCorrelationIdParameter(protocolCommand, correlationId);
660+
LOGGER.trace("Sending {} and waiting for {}", protocolCommandWithCorrelationId, protocolCommandAck);
643661
final AdaptableBus adaptableBus = messagingProvider.getAdaptableBus();
662+
644663
if (previousSubscriptionId != null) {
645664
// remove previous subscription without going through back-end because subscription will be replaced
646665
adaptableBus.unsubscribe(previousSubscriptionId);
@@ -649,11 +668,42 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
649668
adaptableBus.subscribeForAdaptable(streamingType,
650669
adaptable -> adaptableToNotifier.apply(adaptable).accept(getBus()));
651670
final Classification tag = Classification.forString(protocolCommandAck);
652-
adjoin(adaptableBus.subscribeOnceForString(tag, getTimeout()), futureToCompleteOrFailAfterAck);
653-
messagingProvider.emit(protocolCommand);
671+
672+
// subscribe exclusively because we allow only one request at a time
673+
final CompletionStage<String> ackStage = adaptableBus.subscribeOnceForStringExclusively(tag, getTimeout());
674+
final CompletableFuture<String> ackFuture = ackStage.toCompletableFuture();
675+
676+
// subscribe for possible error responses by correlationId
677+
final Classification correlationIdTag = Classification.forCorrelationId(correlationId);
678+
adaptableBus.subscribeOnceForAdaptable(correlationIdTag, getTimeout())
679+
.thenAccept(adaptable -> {
680+
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
681+
if (signal instanceof ThingErrorResponse) {
682+
ackFuture.completeExceptionally(((ThingErrorResponse) signal).getDittoRuntimeException());
683+
} else {
684+
ackFuture.completeExceptionally(getUnexpectedSignalException(signal));
685+
}
686+
});
687+
688+
adjoin(ackFuture, futureToCompleteOrFailAfterAck);
689+
messagingProvider.emit(protocolCommandWithCorrelationId);
690+
654691
return subscriptionId;
655692
}
656693

694+
private DittoRuntimeException getUnexpectedSignalException(final Signal<?> signal) {
695+
return DittoRuntimeException
696+
.newBuilder("signal.unexpected", HttpStatusCode.BAD_REQUEST)
697+
.message(() -> String.format("Received unexpected response of type '%s'.", signal.getType()))
698+
.build();
699+
}
700+
701+
private String appendCorrelationIdParameter(final String protocolCommand, final String correlationId) {
702+
final String separator = protocolCommand.contains("?") ? "&" : "?";
703+
return String.format("%s%s%s=%s", protocolCommand, separator,
704+
DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId);
705+
}
706+
657707
/**
658708
* Remove a subscription.
659709
*
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2020 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.eclipse.ditto.client.internal;
14+
15+
/**
16+
* This exception may be thrown if concurrent consumption requests are detected.
17+
*/
18+
public class ConcurrentConsumptionRequestException extends RuntimeException {
19+
20+
private static final long serialVersionUID = -565137801315595348L;
21+
private static final String MESSAGE = "Concurrent consumption requests are not allowed on one channel.";
22+
23+
/**
24+
* Constructs a new {@code UncompletedTwinConsumptionRequestException} object.
25+
*/
26+
public ConcurrentConsumptionRequestException() {
27+
super(MESSAGE, null);
28+
}
29+
30+
}

java/src/main/java/org/eclipse/ditto/client/internal/bus/AdaptableBus.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,21 @@ public interface AdaptableBus {
5858
*
5959
* @param tag the string classification, usually itself.
6060
* @param timeout how long to wait for a match.
61-
* @return a future adaptable matching the tag according to the classifiers, or a failed future
62-
* if no adaptable is matched within the timeout.
61+
* @return a future string matching the tag according to the classifiers, or a failed future
62+
* if no string is matched within the timeout.
6363
*/
6464
CompletionStage<String> subscribeOnceForString(Classification tag, Duration timeout);
6565

66+
/**
67+
* Add a one-time subscriber for a string message by replacing an existing with the same string value.
68+
*
69+
* @param tag the string classification, usually itself.
70+
* @param timeout how long to wait for a match.
71+
* @return a future string matching the tag according to the classifiers, or a failed future
72+
* if no string is matched within the timeout.
73+
*/
74+
CompletionStage<String> subscribeOnceForStringExclusively(Classification tag, Duration timeout);
75+
6676
/**
6777
* Add a one-time subscriber for an adaptable message. Only effective if no one-time string subscriber matches.
6878
*

java/src/main/java/org/eclipse/ditto/client/internal/bus/DefaultAdaptableBus.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ public CompletionStage<String> subscribeOnceForString(final Classification tag,
8989
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
9090
}
9191

92+
@Override
93+
public CompletionStage<String> subscribeOnceForStringExclusively(final Classification tag, final Duration timeout) {
94+
return subscribeOnce(oneTimeStringConsumers, tag, timeout, true);
95+
}
96+
9297
@Override
9398
public CompletionStage<Adaptable> subscribeOnceForAdaptable(final Classification tag,
9499
final Duration timeout) {
@@ -158,8 +163,14 @@ public void publish(final String message) {
158163
@Override
159164
public void shutdownExecutor() {
160165
LOGGER.trace("Shutting down AdaptableBus Executors");
161-
singleThreadedExecutorService.shutdownNow();
162-
scheduledExecutorService.shutdownNow();
166+
try {
167+
singleThreadedExecutorService.shutdownNow();
168+
scheduledExecutorService.shutdownNow();
169+
singleThreadedExecutorService.awaitTermination(2, TimeUnit.SECONDS);
170+
scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
171+
} catch (InterruptedException e) {
172+
LOGGER.info("Waiting for termination was interrupted.");
173+
}
163174
}
164175

165176
// call this in a single-threaded executor so that ordering is preserved
@@ -206,9 +217,21 @@ private <T> CompletionStage<T> subscribeOnce(
206217
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
207218
final Classification tag,
208219
final Duration timeout) {
220+
return subscribeOnce(registry, tag, timeout, false);
221+
}
222+
223+
private <T> CompletionStage<T> subscribeOnce(
224+
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
225+
final Classification tag,
226+
final Duration timeout,
227+
final boolean exclusively) {
209228
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
210229
final Entry<Consumer<T>> subscriber = new Entry<>(tag, resultFuture::complete);
211-
addEntry(registry, subscriber);
230+
if (exclusively) {
231+
replaceEntry(registry, subscriber);
232+
} else {
233+
addEntry(registry, subscriber);
234+
}
212235
removeAfter(registry, subscriber, timeout, resultFuture);
213236
return resultFuture;
214237
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (c) 2020 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.eclipse.ditto.client;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.stream.Collectors;
24+
25+
import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
26+
import org.eclipse.ditto.client.internal.ConcurrentConsumptionRequestException;
27+
import org.junit.Test;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Tests different sequences of failing/succeeding startConsumption invocations.
33+
*/
34+
abstract class AbstractConsumptionDittoClientTest extends AbstractDittoClientTest {
35+
36+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDittoClientTest.class);
37+
38+
@Test
39+
public void concurrentStartConsumptionFails() {
40+
try {
41+
final CompletableFuture<Void> firstRequest = startConsumptionRequest();
42+
final CompletableFuture<Void> concurrentRequest = startConsumptionRequest();
43+
44+
replyToConsumptionRequest();
45+
46+
assertCompletion(firstRequest);
47+
concurrentRequest.get(1, TimeUnit.SECONDS);
48+
} catch (final Exception e) {
49+
assertThat(e)
50+
.isInstanceOf(ExecutionException.class)
51+
.hasCauseInstanceOf(ConcurrentConsumptionRequestException.class);
52+
}
53+
}
54+
55+
protected abstract CompletableFuture<Void> startConsumptionRequest();
56+
57+
protected abstract void replyToConsumptionRequest();
58+
59+
@Test
60+
public void testStartConsumptionCombinations() {
61+
// test all combinations of startConsumption invocations expecting success or failure of the given length
62+
testStartConsumptionSequence(4, Collections.emptyList());
63+
}
64+
65+
protected void testStartConsumptionSequence(int remaining, final List<Boolean> sequence) {
66+
testSequence(sequence);
67+
if (remaining > 0) {
68+
testStartConsumptionSequence(remaining - 1, addElement(sequence, true));
69+
testStartConsumptionSequence(remaining - 1, addElement(sequence, false));
70+
}
71+
}
72+
73+
private void testSequence(final List<Boolean> sequence) {
74+
if (sequence.isEmpty()) {
75+
return;
76+
}
77+
LOGGER.info("Testing startConsumption sequence: {}",
78+
sequence.stream()
79+
.map(success -> success ? "expect success" : "expect failure")
80+
.collect(Collectors.joining(" -> ")));
81+
sequence.forEach(expectSuccess -> {
82+
messaging.clearEmitted();
83+
if (expectSuccess) {
84+
startConsumptionSucceeds();
85+
} else {
86+
startConsumptionAndExpectError();
87+
}
88+
});
89+
}
90+
91+
private List<Boolean> addElement(final List<Boolean> sequence, final boolean b) {
92+
final ArrayList<Boolean> success = new ArrayList<>(sequence);
93+
success.add(b);
94+
return success;
95+
}
96+
97+
protected void assertFailedCompletion(final CompletableFuture<Void> future,
98+
final Class<? extends Exception> exception) {
99+
try {
100+
future.get(1L, TimeUnit.SECONDS);
101+
} catch (final Exception e) {
102+
assertThat(e)
103+
.isInstanceOf(ExecutionException.class)
104+
.hasCauseInstanceOf(exception);
105+
}
106+
}
107+
108+
/**
109+
* Tests a succeeding startConsumption invocation.
110+
*/
111+
protected abstract void startConsumptionSucceeds();
112+
113+
/**
114+
* Tests a failing startConsumption invocation.
115+
*/
116+
protected abstract void startConsumptionAndExpectError();
117+
}

0 commit comments

Comments
 (0)