Skip to content

Commit c5b77ba

Browse files
authored
Merge pull request #77 from bosch-io/feature/e2e-live
Add support for e2e acknowledgements also for live messages/commands
2 parents 0ef4408 + d2457f8 commit c5b77ba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1573
-1368
lines changed

java/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@
437437
</overrideCompatibilityChangeParameters>
438438
<excludes>
439439
<!-- Don't add excludes here before checking with the whole Ditto team -->
440+
<exclude>org.eclipse.ditto.client.ack.internal</exclude>
440441
<exclude>org.eclipse.ditto.client.changes.internal</exclude>
441442
<exclude>org.eclipse.ditto.client.internal</exclude>
442443
<exclude>org.eclipse.ditto.client.live.internal</exclude>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2020 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.eclipse.ditto.client.ack;
14+
15+
import java.util.Collection;
16+
import java.util.function.Consumer;
17+
18+
import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
19+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
20+
21+
/**
22+
* A signal that can be acknowledged.
23+
*
24+
* @since 1.2.0
25+
*/
26+
public interface Acknowledgeable {
27+
28+
/**
29+
* Handles {@code AcknowledgementRequest}s issued by the Ditto backend for a received signal
30+
* translated into this Acknowledgeable by invoking the passed {@code acknowledgementHandles} consumer with
31+
* client side {@code AcknowledgementHandle}s.
32+
*
33+
* @param acknowledgementHandles the consumer to invoke with a collection of {@code AcknowledgementHandle}s used to
34+
* send back {@code Acknowledgements}.
35+
* @since 1.2.0
36+
*/
37+
void handleAcknowledgementRequests(Consumer<Collection<AcknowledgementRequestHandle>> acknowledgementHandles);
38+
39+
/**
40+
* Handles an {@code AcknowledgementRequest} identified by the passed {@code acknowledgementLabel} issued by the
41+
* Ditto backend for a received signal translated into this Acknowledgeable by invoking the passed
42+
* {@code acknowledgementHandle} consumer with a client side {@code AcknowledgementHandle} - if the passed
43+
* acknowledgementLabel was present in the requested acknowledgements.
44+
*
45+
* @param acknowledgementLabel the {@code AcknowledgementLabel} which should be handled - if present - by the passed
46+
* {@code acknowledgementHandle}.
47+
* @param acknowledgementHandle the consumer to invoke with a {@code AcknowledgementHandle} used to
48+
* send back an {@code Acknowledgement}.
49+
* @since 1.2.0
50+
*/
51+
void handleAcknowledgementRequest(AcknowledgementLabel acknowledgementLabel,
52+
Consumer<AcknowledgementRequestHandle> acknowledgementHandle);
53+
54+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2020 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.eclipse.ditto.client.ack;
14+
15+
import java.util.function.BiConsumer;
16+
17+
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
18+
19+
/**
20+
* Interface encapsulating a {@link java.util.function.BiConsumer} which is notified about responses with either the
21+
* response of type {@link R} (if it was successful) or with an {@link Throwable} if there occurred an error.
22+
* Does also hold the type of the expected Message response.
23+
*
24+
* @param <R> the type of the expected response.
25+
*/
26+
public interface ResponseConsumer<R> {
27+
28+
/**
29+
* Returns the type of the expected response.
30+
*
31+
* @return the type of the expected response.
32+
*/
33+
Class<R> getResponseType();
34+
35+
/**
36+
* The BiConsumer which is notified about responses with either
37+
* the response of type {@link R} (if it was successful) or with an {@link Throwable} if there occurred an error.
38+
*
39+
* @return the BiConsumer notified about responses.
40+
*/
41+
BiConsumer<R, Throwable> getResponseConsumer();
42+
43+
/**
44+
* Type-check the argument against the response type and call the response consumer with the right type or
45+
* with an exception.
46+
*
47+
* @param argument the argument to consume.
48+
*/
49+
default void accept(final Object argument) {
50+
if (getResponseType().isInstance(argument)) {
51+
getResponseConsumer().accept(getResponseType().cast(argument), null);
52+
} else if (argument instanceof ThingErrorResponse) {
53+
getResponseConsumer().accept(null, ((ThingErrorResponse) argument).getDittoRuntimeException());
54+
} else if (argument != null) {
55+
getResponseConsumer().accept(null, new ClassCastException(
56+
"Expected: " + getResponseType().getCanonicalName() +
57+
"; Actual: " + argument.getClass().getCanonicalName() +
58+
" (" + argument + ")"
59+
));
60+
} else {
61+
getResponseConsumer().accept(null, new NullPointerException(
62+
"Expected: " + getResponseType().getCanonicalName() + "; Actual: null")
63+
);
64+
}
65+
}
66+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2020 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.eclipse.ditto.client.ack.internal;
14+
15+
import java.util.Set;
16+
17+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
18+
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
19+
20+
/**
21+
* Validate an acknowledgement request from the client.
22+
*/
23+
public final class AcknowledgementRequestsValidator {
24+
25+
private AcknowledgementRequestsValidator() {}
26+
27+
/**
28+
* Validate acknowledgement requests from the client.
29+
*
30+
* @param acknowledgementRequests the acknowledgement requests of a signal sent by the client.
31+
* @param mandatoryLabel the mandatory acknowledgement label for the channel.
32+
*/
33+
public static void validate(final Set<AcknowledgementRequest> acknowledgementRequests,
34+
final AcknowledgementLabel mandatoryLabel) {
35+
36+
if (!acknowledgementRequests.isEmpty() &&
37+
!acknowledgementRequests.contains(AcknowledgementRequest.of(mandatoryLabel))) {
38+
throw new IllegalArgumentException("Expected acknowledgement request for label '" +
39+
mandatoryLabel +
40+
"' not found. Please make sure to always request the '" +
41+
mandatoryLabel +
42+
"' Acknowledgement if you need to process the response in the client.");
43+
}
44+
}
45+
46+
/**
47+
* Create an {@code IllegalStateException} when receiving acknowledgements from the back-end not containing
48+
* an expected acknowledgement label.
49+
*
50+
* @param mandatoryLabel the mandatory acknowledgement label.
51+
* @return the {@code IllegalStateException}.
52+
*/
53+
public static IllegalStateException didNotReceiveAcknowledgement(final AcknowledgementLabel mandatoryLabel) {
54+
return new IllegalStateException("Didn't receive an Acknowledgement for label '" +
55+
mandatoryLabel + "'. Please make sure to always request the '" +
56+
mandatoryLabel + "' Acknowledgement if you need to process the " +
57+
"response in the client.");
58+
}
59+
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*
1111
* SPDX-License-Identifier: EPL-2.0
1212
*/
13-
package org.eclipse.ditto.client.changes.internal;
13+
package org.eclipse.ditto.client.ack.internal;
1414

1515
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
1616

@@ -35,7 +35,7 @@
3535
* @since 1.1.0
3636
*/
3737
@Immutable
38-
final class ImmutableAcknowledgementRequestHandle implements AcknowledgementRequestHandle {
38+
public final class ImmutableAcknowledgementRequestHandle implements AcknowledgementRequestHandle {
3939

4040
private final AcknowledgementLabel acknowledgementLabel;
4141
private final EntityIdWithType entityId;
@@ -50,7 +50,7 @@ final class ImmutableAcknowledgementRequestHandle implements AcknowledgementRequ
5050
* @param dittoHeaders the ditto headers which were contained in the acknowledgement request to handle.
5151
* @param acknowledgementPublisher the consumer for publishing built acknowledgements to the Ditto backend.
5252
*/
53-
ImmutableAcknowledgementRequestHandle(final AcknowledgementLabel acknowledgementLabel,
53+
public ImmutableAcknowledgementRequestHandle(final AcknowledgementLabel acknowledgementLabel,
5454
final EntityIdWithType entityId,
5555
final DittoHeaders dittoHeaders,
5656
final Consumer<Acknowledgement> acknowledgementPublisher) {

java/src/main/java/org/eclipse/ditto/client/changes/Change.java

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,14 @@
1313
package org.eclipse.ditto.client.changes;
1414

1515
import java.time.Instant;
16-
import java.util.Collection;
1716
import java.util.Optional;
18-
import java.util.function.Consumer;
1917

2018
import javax.annotation.Nullable;
2119

20+
import org.eclipse.ditto.client.ack.Acknowledgeable;
2221
import org.eclipse.ditto.json.JsonObject;
2322
import org.eclipse.ditto.json.JsonPointer;
2423
import org.eclipse.ditto.json.JsonValue;
25-
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
2624
import org.eclipse.ditto.model.base.entity.type.WithEntityType;
2725
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
2826
import org.eclipse.ditto.signals.base.WithId;
@@ -32,7 +30,7 @@
3230
*
3331
* @since 1.0.0
3432
*/
35-
public interface Change extends WithId, WithEntityType, WithDittoHeaders<Change> {
33+
public interface Change extends WithId, WithEntityType, WithDittoHeaders<Change>, Acknowledgeable {
3634

3735
/**
3836
* Returns the {@link ChangeAction} which caused this change.
@@ -118,30 +116,4 @@ default boolean isFull() {
118116
*/
119117
Change withPathAndValue(JsonPointer path, @Nullable JsonValue value);
120118

121-
/**
122-
* Handles {@code AcknowledgementRequest}s issued by the Ditto backend for a received event translated into this
123-
* change by invoking the passed {@code acknowledgementHandles} consumer with client side
124-
* {@code AcknowledgementHandle}s.
125-
*
126-
* @param acknowledgementHandles the consumer to invoke with a collection of {@code AcknowledgementHandle}s used to
127-
* send back {@code Acknowledgements}.
128-
* @since 1.1.0
129-
*/
130-
void handleAcknowledgementRequests(Consumer<Collection<AcknowledgementRequestHandle>> acknowledgementHandles);
131-
132-
/**
133-
* Handles an {@code AcknowledgementRequest} identified by the passed {@code acknowledgementLabel} issued by the
134-
* Ditto backend for a received event translated into this change by invoking the passed
135-
* {@code acknowledgementHandle} consumer with a client side {@code AcknowledgementHandle} - if the passed
136-
* acknowledgementLabel was present in the requested acknowledgements.
137-
*
138-
* @param acknowledgementLabel the {@code AcknowledgementLabel} which should be handled - if present - by the passed
139-
* {@code acknowledgementHandle}.
140-
* @param acknowledgementHandle the consumer to invoke with a {@code AcknowledgementHandle} used to
141-
* send back an {@code Acknowledgement}.
142-
* @since 1.1.0
143-
*/
144-
void handleAcknowledgementRequest(AcknowledgementLabel acknowledgementLabel,
145-
Consumer<AcknowledgementRequestHandle> acknowledgementHandle);
146-
147119
}

java/src/main/java/org/eclipse/ditto/client/changes/internal/ImmutableChange.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.annotation.Nullable;
2626
import javax.annotation.concurrent.Immutable;
2727

28+
import org.eclipse.ditto.client.ack.internal.ImmutableAcknowledgementRequestHandle;
2829
import org.eclipse.ditto.client.changes.AcknowledgementRequestHandle;
2930
import org.eclipse.ditto.client.changes.Change;
3031
import org.eclipse.ditto.client.changes.ChangeAction;

java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@
2525

2626
import javax.annotation.Nonnull;
2727

28+
import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
2829
import org.eclipse.ditto.client.internal.bus.Classification;
2930
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
3031
import org.eclipse.ditto.client.messaging.MessagingProvider;
3132
import org.eclipse.ditto.json.JsonField;
3233
import org.eclipse.ditto.json.JsonObject;
3334
import org.eclipse.ditto.json.JsonPointer;
3435
import org.eclipse.ditto.json.JsonValue;
35-
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
36+
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
3637
import org.eclipse.ditto.model.base.common.HttpStatusCode;
3738
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
3839
import org.eclipse.ditto.model.base.headers.DittoHeaders;
@@ -87,6 +88,13 @@ protected AbstractHandle(final MessagingProvider messagingProvider,
8788
this.channel = channel;
8889
}
8990

91+
/**
92+
* Get the label of built-in acknowledgements for this channel.
93+
*
94+
* @return the label.
95+
*/
96+
protected abstract AcknowledgementLabel getThingResponseAcknowledgementLabel();
97+
9098
/**
9199
* Convenience method to turn anything into {@code Void} due to ubiquitous use of {@code CompletableFuture<Void>}
92100
* in the existing API.
@@ -156,7 +164,7 @@ protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> Completio
156164
final T command,
157165
final Class<S> expectedResponse,
158166
final Function<S, R> onSuccess) {
159-
final ThingCommand<?> commandWithChannel = setChannel(command, channel);
167+
final ThingCommand<?> commandWithChannel = validateAckRequests(setChannel(command, channel));
160168
return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class,
161169
ErrorResponse::getDittoRuntimeException);
162170
}
@@ -246,36 +254,40 @@ protected Adaptable adaptOutgoingLiveSignal(final Signal<?> liveSignal) {
246254
return PROTOCOL_ADAPTER.toAdaptable(adjustHeadersForLiveSignal(liveSignal));
247255
}
248256

249-
@SuppressWarnings("unchecked")
257+
@SuppressWarnings({"unchecked", "rawtypes"})
250258
protected static Signal<?> adjustHeadersForLiveSignal(final Signal<?> signal) {
251259
return adjustHeadersForLive((Signal) signal);
252260
}
253261

254-
static CommandResponse<?> extractCommandResponseFromAcknowledgements(final Signal<?> signal,
262+
private ThingCommand<?> validateAckRequests(final ThingCommand<?> thingCommand) {
263+
AcknowledgementRequestsValidator.validate(thingCommand.getDittoHeaders().getAcknowledgementRequests(),
264+
getThingResponseAcknowledgementLabel());
265+
return thingCommand;
266+
}
267+
268+
private CommandResponse<?> extractCommandResponseFromAcknowledgements(final Signal<?> signal,
255269
final Acknowledgements acknowledgements) {
256270
if (areFailedAcknowledgements(acknowledgements.getStatusCode())) {
257271
throw AcknowledgementsFailedException.of(acknowledgements);
258272
} else {
273+
final AcknowledgementLabel expectedLabel = getThingResponseAcknowledgementLabel();
259274
return acknowledgements.stream()
260-
.filter(ack -> ack.getLabel().equals(DittoAcknowledgementLabel.TWIN_PERSISTED))
275+
.filter(ack -> ack.getLabel().equals(expectedLabel))
261276
.findFirst()
262277
.map(ack -> createThingModifyCommandResponseFromAcknowledgement(signal, ack))
263-
.orElseThrow(() -> new IllegalStateException("Didn't receive an Acknowledgement for label '" +
264-
DittoAcknowledgementLabel.TWIN_PERSISTED + "'. Please make sure to always request the '" +
265-
DittoAcknowledgementLabel.TWIN_PERSISTED + "' Acknowledgement if you need to process the " +
266-
"response in the client."));
278+
.orElseThrow(() -> AcknowledgementRequestsValidator.didNotReceiveAcknowledgement(expectedLabel));
267279
}
268280
}
269281

270282
private static boolean areFailedAcknowledgements(final HttpStatusCode statusCode) {
271283
return statusCode.isClientError() || statusCode.isInternalError();
272284
}
273285

274-
private static ThingModifyCommandResponse<ThingModifyCommandResponse<?>>
286+
private static <T extends ThingModifyCommandResponse<T>> ThingModifyCommandResponse<T>
275287
createThingModifyCommandResponseFromAcknowledgement(
276288
final Signal<?> signal,
277289
final Acknowledgement ack) {
278-
return new ThingModifyCommandResponse<ThingModifyCommandResponse<?>>() {
290+
return new ThingModifyCommandResponse<T>() {
279291
@Override
280292
public JsonPointer getResourcePath() {
281293
return signal.getResourcePath();
@@ -308,8 +320,8 @@ public Optional<JsonValue> getEntity(final JsonSchemaVersion schemaVersion) {
308320
}
309321

310322
@Override
311-
public ThingModifyCommandResponse<?> setDittoHeaders(final DittoHeaders dittoHeaders) {
312-
return this;
323+
public T setDittoHeaders(final DittoHeaders dittoHeaders) {
324+
return (T) this;
313325
}
314326

315327
@Override

java/src/main/java/org/eclipse/ditto/client/internal/HandlerRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public final class HandlerRegistry<T extends ThingHandle<F>, F extends FeatureHandle> {
4343

4444
private final PointerBus bus;
45-
private final ConcurrentHashMap<String, Registration<Consumer<PointerWithData>>> registry;
45+
private final Map<String, Registration<Consumer<PointerWithData>>> registry;
4646
private final Map<ThingId, T> thingHandles;
4747
private final Map<String, F> featureHandles;
4848

0 commit comments

Comments
 (0)