Skip to content

Commit 7310229

Browse files
authored
Merge pull request #201 from bosch-io/bugfix/reconnect-behaviour
Adds a ClientReconnectingException which is thrown if the client is attempting a reconnect while a message should be sent.
2 parents 48725f0 + 69bccb6 commit 7310229

19 files changed

+357
-115
lines changed

java/src/main/java/org/eclipse/ditto/client/DittoClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public interface DittoClient {
5252
* Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection.
5353
*
5454
* @param dittoProtocolAdaptable the adaptable to send
55-
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable}
55+
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or
56+
* which failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client is
57+
* in a reconnecting state.
5658
* @throws IllegalStateException when no twin/live connection was configured for this client
5759
*/
5860
CompletionStage<Adaptable> sendDittoProtocol(Adaptable dittoProtocolAdaptable);

java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ interface Builder {
182182
* Register a consumer of errors which occur during opening the connection initially and on reconnects.
183183
*
184184
* @param handler the handler that will be called with the cause of the connection error.
185+
* @return this builder.
185186
* @since 1.2.0
186187
*/
187188
Builder connectionErrorHandler(@Nullable Consumer<Throwable> handler);
@@ -190,6 +191,7 @@ interface Builder {
190191
* Register a contextListener which is notified whenever the connection is disconnected.
191192
*
192193
* @param contextListener the handler that will be called with details about the disconnection.
194+
* @return this builder.
193195
* @since 2.1.0
194196
*/
195197
Builder disconnectedListener(@Nullable Consumer<DisconnectedContext> contextListener);

java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.Duration;
1919
import java.util.Map;
2020
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionStage;
2223
import java.util.function.Function;
2324
import java.util.function.Predicate;
@@ -40,6 +41,7 @@
4041
import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
4142
import org.eclipse.ditto.client.internal.bus.Classification;
4243
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
44+
import org.eclipse.ditto.client.management.ClientReconnectingException;
4345
import org.eclipse.ditto.client.messaging.MessagingProvider;
4446
import org.eclipse.ditto.json.JsonField;
4547
import org.eclipse.ditto.json.JsonObject;
@@ -138,6 +140,8 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) {
138140
* @param <R> type of the result.
139141
* @return future of the result if the expected response arrives or a failed future on error.
140142
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
143+
* If the client is reconnecting while this method is called the future fails with a
144+
* {@link ClientReconnectingException}.
141145
*/
142146
protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> CompletionStage<R> askPolicyCommand(
143147
final T command,
@@ -159,11 +163,14 @@ protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> Co
159163
* @param <R> type of the result.
160164
* @return future of the result if the expected response arrives or a failed future on error.
161165
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
166+
* If the client is reconnecting while this method is called the future fails with a
167+
* {@link ClientReconnectingException}.
162168
*/
163169
protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> CompletionStage<R> askThingCommand(
164170
final T command,
165171
final Class<S> expectedResponse,
166172
final Function<S, R> onSuccess) {
173+
167174
final ThingCommand<?> commandWithChannel = validateAckRequests(setChannel(command, channel));
168175
return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class,
169176
ErrorResponse::getDittoRuntimeException);
@@ -180,33 +187,42 @@ protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> Completio
180187
* @param <S> type of the expected success response.
181188
* @param <E> type of the expected error response.
182189
* @param <R> type of the result.
183-
* @return future of the result.
190+
* @return future of the result. The future can be exceptional with a {@link ClientReconnectingException} if the
191+
* client is reconnecting while this method is called.
184192
*/
185193
protected <S, E, R> CompletionStage<R> sendSignalAndExpectResponse(final Signal<?> signal,
186194
final Class<S> expectedResponseClass,
187195
final Function<S, R> onSuccess,
188196
final Class<E> expectedErrorResponseClass,
189197
final Function<E, ? extends RuntimeException> onError) {
190198

191-
final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
192-
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());
193-
194-
messagingProvider.emit(signalToJsonString(signal));
195-
return responseFuture.thenApply(responseAdaptable -> {
196-
final Signal<?> response = signalFromAdaptable(responseAdaptable);
197-
if (expectedErrorResponseClass.isInstance(response)) {
198-
// extracted runtime exception will be wrapped in CompletionException.
199-
throw onError.apply(expectedErrorResponseClass.cast(response));
200-
} else if (response instanceof Acknowledgements) {
201-
final CommandResponse<?> commandResponse =
202-
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
203-
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
204-
} else if (expectedResponseClass.isInstance(response)) {
205-
return onSuccess.apply(expectedResponseClass.cast(response));
206-
} else {
207-
throw new ClassCastException("Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
208-
}
209-
});
199+
try {
200+
final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
201+
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());
202+
203+
messagingProvider.emit(signalToJsonString(signal));
204+
return responseFuture.thenApply(responseAdaptable -> {
205+
final Signal<?> response = signalFromAdaptable(responseAdaptable);
206+
if (expectedErrorResponseClass.isInstance(response)) {
207+
// extracted runtime exception will be wrapped in CompletionException.
208+
throw onError.apply(expectedErrorResponseClass.cast(response));
209+
} else if (response instanceof Acknowledgements) {
210+
final CommandResponse<?> commandResponse =
211+
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
212+
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
213+
} else if (expectedResponseClass.isInstance(response)) {
214+
return onSuccess.apply(expectedResponseClass.cast(response));
215+
} else {
216+
throw new ClassCastException(
217+
"Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
218+
}
219+
});
220+
} catch (final ClientReconnectingException cre) {
221+
return CompletableFuture.supplyAsync(() -> {
222+
throw cre;
223+
});
224+
}
225+
210226
}
211227

212228
/**

java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ public CompletionStage<Void> startConsumption(final Option<?>... consumptionOpti
163163
* Starts the consumption of twin events / messages / live events and commands.
164164
*
165165
* @param consumptionConfig the configuration Map to apply for the consumption.
166-
* @return a CompletionStage that terminates when the start operation was successful.
166+
* @return a CompletionStage that terminates when the start operation was successful or fails if the client is in
167+
* a reconnecting state
167168
*/
168169
protected abstract CompletionStage<Void> doStartConsumption(Map<String, String> consumptionConfig);
169170

@@ -648,6 +649,7 @@ public void registerForThingChanges(final String registrationId, final Consumer<
648649
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
649650
* or not.
650651
* @return the subscription ID.
652+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
651653
*/
652654
protected AdaptableBus.SubscriptionId subscribe(
653655
@Nullable final AdaptableBus.SubscriptionId previousSubscriptionId,
@@ -730,6 +732,7 @@ private static String appendCorrelationIdParameter(final String protocolCommand,
730732
* @param protocolCommandAck the expected acknowledgement.
731733
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
732734
* or not.
735+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
733736
*/
734737
protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscriptionId,
735738
final String protocolCommand,

java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ private static PoliciesImpl configurePolicyClient(final MessagingProvider messag
234234
final String busName = TopicPath.Channel.NONE.getName();
235235
final PointerBus bus = BusFactory.createPointerBus(busName, messagingProvider.getExecutorService());
236236
init(bus, messagingProvider);
237-
final MessagingConfiguration messagingConfiguration = messagingProvider.getMessagingConfiguration();
238237
final OutgoingMessageFactory messageFactory = getOutgoingMessageFactoryForPolicies(messagingProvider);
239238
return PoliciesImpl.newInstance(messagingProvider, messageFactory, bus);
240239
}

java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public interface LiveCommandProcessor {
4141
* Publish a signal.
4242
*
4343
* @param signal the signal to publish.
44+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting state.
4445
*/
4546
void publishLiveSignal(Signal<?> signal);
4647

java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public interface LiveCommandHandler<L extends LiveCommand<L, B>, B extends LiveC
3434
* @param type the type of live commands. MUST be an interface satisfying the recursive type bound.
3535
* @param commandHandler constructor of any response or event to publish.
3636
* @param <L> type of live commands.
37+
* @param <B> type of live command answers.
3738
* @return the live command handler.
3839
*/
3940
static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCommandHandler<L, B> of(
@@ -49,6 +50,7 @@ static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCom
4950
* @param type the type of live commands. MUST be an interface satisfying the recursive type bound.
5051
* @param commandHandler constructor of any response or event to publish and sender of any acknowledgements.
5152
* @param <L> type of live commands.
53+
* @param <B> type of live command answers.
5254
* @return the live command handler.
5355
*/
5456
static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCommandHandler<L, B> withAcks(
@@ -77,6 +79,7 @@ static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCom
7779
* To be called after runtime type check of the live command.
7880
*
7981
* @param liveCommand the live command.
82+
* @param signalPublisher the signal publisher.
8083
* @return the result of calling the command handler on the command.
8184
*/
8285
default LiveCommandAnswerBuilder.BuildStep castAndApply(final LiveCommand<?, ?> liveCommand,

java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public interface EventEmitter<F extends EventFactory> {
3030
*
3131
* @param eventFunction Function providing a EventFactory and requiring a Event as result.
3232
* @throws NullPointerException if {@code eventFunction} is {@code null}.
33+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting.
3334
*/
3435
void emitEvent(Function<F, Event<?>> eventFunction);
3536

java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
5151
import org.eclipse.ditto.client.live.messages.PendingMessage;
5252
import org.eclipse.ditto.client.live.messages.RepliableMessage;
53+
import org.eclipse.ditto.client.management.ClientReconnectingException;
5354
import org.eclipse.ditto.client.messaging.MessagingProvider;
5455
import org.eclipse.ditto.json.JsonKey;
5556
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
@@ -158,44 +159,58 @@ protected CompletionStage<Void> doStartConsumption(final Map<String, String> con
158159
CompletableFuture.allOf(completableFutureEvents, completableFutureMessages,
159160
completableFutureLiveCommands);
160161

161-
// register message handler which handles live events:
162-
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, previousSubscriptionId) -> {
163-
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
164-
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
165-
return subscribe(previousSubscriptionId,
166-
streamingType,
167-
subscriptionMessage,
168-
streamingType.startAck(),
169-
completableFutureEvents
170-
);
171-
});
172-
173-
// register message handler which handles incoming messages:
174-
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType, previousSubscriptionId) -> {
175-
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
176-
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
177-
return subscribeAndPublishMessage(previousSubscriptionId,
178-
streamingType,
179-
subscriptionMessage,
180-
streamingType.startAck(),
181-
completableFutureMessages,
182-
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), adaptableAsLiveMessage(adaptable)));
183-
});
184-
185-
// register message handler which handles live commands:
186-
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType, previousSubscriptionId) -> {
187-
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
188-
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
189-
190-
return subscribeAndPublishMessage(previousSubscriptionId,
191-
streamingType,
192-
subscriptionMessage,
193-
streamingType.startAck(),
194-
completableFutureLiveCommands,
195-
adaptable -> bus -> bus.getExecutor().submit(() -> handleLiveCommandOrResponse(adaptable))
196-
);
197-
});
198-
return completableFutureCombined;
162+
try {
163+
// register message handler which handles live events:
164+
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT,
165+
(streamingType, previousSubscriptionId) -> {
166+
final String subscriptionMessage =
167+
buildProtocolCommand(streamingType.start(), consumptionConfig);
168+
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
169+
return subscribe(previousSubscriptionId,
170+
streamingType,
171+
subscriptionMessage,
172+
streamingType.startAck(),
173+
completableFutureEvents
174+
);
175+
});
176+
177+
// register message handler which handles incoming messages:
178+
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE,
179+
(streamingType, previousSubscriptionId) -> {
180+
final String subscriptionMessage =
181+
buildProtocolCommand(streamingType.start(), consumptionConfig);
182+
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
183+
return subscribeAndPublishMessage(previousSubscriptionId,
184+
streamingType,
185+
subscriptionMessage,
186+
streamingType.startAck(),
187+
completableFutureMessages,
188+
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable),
189+
adaptableAsLiveMessage(adaptable)));
190+
});
191+
192+
// register message handler which handles live commands:
193+
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND,
194+
(streamingType, previousSubscriptionId) -> {
195+
final String subscriptionMessage =
196+
buildProtocolCommand(streamingType.start(), consumptionConfig);
197+
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
198+
199+
return subscribeAndPublishMessage(previousSubscriptionId,
200+
streamingType,
201+
subscriptionMessage,
202+
streamingType.startAck(),
203+
completableFutureLiveCommands,
204+
adaptable -> bus -> bus.getExecutor()
205+
.submit(() -> handleLiveCommandOrResponse(adaptable))
206+
);
207+
});
208+
return completableFutureCombined;
209+
} catch (final ClientReconnectingException cre) {
210+
return CompletableFuture.supplyAsync(() -> {
211+
throw cre;
212+
});
213+
}
199214
}
200215

201216
/*

java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ interface MessageSendable<T> {
228228
* by its potential targets. </p>
229229
*
230230
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
231+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
232+
* state.
231233
*/
232234
void send();
233235

@@ -238,6 +240,8 @@ interface MessageSendable<T> {
238240
* @param responseConsumer the Consumer which should be notified with the response ot the Throwable in case of
239241
* an error.
240242
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
243+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
244+
* state.
241245
*/
242246
default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsumer) {
243247
send(ByteBuffer.class, responseConsumer);
@@ -253,6 +257,8 @@ default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsu
253257
* an error.
254258
* @param <R> the type of the response message's payload.
255259
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
260+
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
261+
* state.
256262
* @since 1.0.0
257263
*/
258264
<R> void send(Class<R> responseType, BiConsumer<Message<R>, Throwable> responseConsumer);

0 commit comments

Comments
 (0)