Skip to content

Commit b3b608d

Browse files
author
Stefan Maute
authored
Merge pull request #103 from bosch-io/bugfix/revert-consumption-fix
Revert consumption fix
2 parents 14b9665 + e27e434 commit b3b608d

File tree

7 files changed

+20
-160
lines changed

7 files changed

+20
-160
lines changed

java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Optional;
27-
import java.util.Set;
2827
import java.util.concurrent.CompletableFuture;
2928
import java.util.concurrent.CompletionStage;
3029
import java.util.function.Consumer;
3130
import java.util.function.Function;
32-
import java.util.stream.Collectors;
33-
import java.util.stream.Stream;
3431

3532
import javax.annotation.Nullable;
3633

@@ -52,7 +49,6 @@
5249
import org.eclipse.ditto.client.options.Option;
5350
import org.eclipse.ditto.client.options.OptionName;
5451
import org.eclipse.ditto.client.options.internal.OptionsEvaluator;
55-
import org.eclipse.ditto.client.twin.internal.UncompletedConsumptionRequestException;
5652
import org.eclipse.ditto.json.JsonFieldSelector;
5753
import org.eclipse.ditto.json.JsonObject;
5854
import org.eclipse.ditto.json.JsonPointer;
@@ -645,15 +641,6 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
645641

646642
LOGGER.trace("Sending {} and waiting for {}", protocolCommand, protocolCommandAck);
647643
final AdaptableBus adaptableBus = messagingProvider.getAdaptableBus();
648-
649-
try {
650-
if (previousSubscriptionId != null
651-
&& checkIfTwinEventIsInsertedTwiceElseThrow(adaptableBus, futureToCompleteOrFailAfterAck)) {
652-
return previousSubscriptionId;
653-
}
654-
} catch (UncompletedConsumptionRequestException e) {
655-
LOGGER.error(e.getMessage());
656-
}
657644
if (previousSubscriptionId != null) {
658645
// remove previous subscription without going through back-end because subscription will be replaced
659646
adaptableBus.unsubscribe(previousSubscriptionId);
@@ -664,28 +651,9 @@ && checkIfTwinEventIsInsertedTwiceElseThrow(adaptableBus, futureToCompleteOrFail
664651
final Classification tag = Classification.forString(protocolCommandAck);
665652
adjoin(adaptableBus.subscribeOnceForString(tag, getTimeout()), futureToCompleteOrFailAfterAck);
666653
messagingProvider.emit(protocolCommand);
667-
668654
return subscriptionId;
669655
}
670656

671-
private boolean checkIfTwinEventIsInsertedTwiceElseThrow(final AdaptableBus adaptableBus,
672-
final CompletableFuture<Void> futureToCompleteOrFailAfterAck) {
673-
674-
final Set<Classification> stringList = Stream.of(
675-
Classification.forString(Classification.StreamingType.TWIN_EVENT.startAck()),
676-
Classification.forString(Classification.StreamingType.LIVE_COMMAND.startAck()),
677-
Classification.forString(Classification.StreamingType.LIVE_EVENT.startAck()),
678-
Classification.forString(Classification.StreamingType.LIVE_MESSAGE.startAck()))
679-
.collect(Collectors.toSet());
680-
681-
if (adaptableBus.getUnmodifiableOneTimeStringConsumers().keySet().stream().anyMatch(stringList::contains)) {
682-
LOGGER.warn("First consumption request on this channel must be completed first");
683-
futureToCompleteOrFailAfterAck.completeExceptionally(new UncompletedConsumptionRequestException());
684-
return true;
685-
}
686-
return false;
687-
}
688-
689657
/**
690658
* Remove a subscription.
691659
*

java/src/main/java/org/eclipse/ditto/client/internal/bus/AdaptableBus.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
package org.eclipse.ditto.client.internal.bus;
1414

1515
import java.time.Duration;
16-
import java.util.Map;
17-
import java.util.Set;
1816
import java.util.concurrent.CompletionStage;
1917
import java.util.concurrent.ScheduledExecutorService;
2018
import java.util.function.Consumer;
@@ -25,7 +23,7 @@
2523
import org.eclipse.ditto.protocoladapter.Adaptable;
2624

2725
/**
28-
* Event bus for messages that are either {@code String} or {@code Adaptable}.
26+
* Event bus for messages that are either {@code String} or {@code} Adaptable.
2927
* On publication of a message as {@code String}, subscribers are notified as follows:
3028
* <ol>
3129
* <li>Message is classified as {@code String}. If a matching one-time subscriber is found, the subscriber is notified
@@ -55,14 +53,6 @@ public interface AdaptableBus {
5553
*/
5654
AdaptableBus addAdaptableClassifier(Classifier<Adaptable> adaptableClassifier);
5755

58-
59-
/**
60-
* Get oneTimeStringConsumers but unmodifiable to only grant read access.
61-
*
62-
* @return a {@code UnmodifiableMap}
63-
*/
64-
Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers();
65-
6656
/**
6757
* Add a one-time subscriber for a string message.
6858
*

java/src/main/java/org/eclipse/ditto/client/internal/bus/DefaultAdaptableBus.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.time.Duration;
1616
import java.time.Instant;
1717
import java.util.Collection;
18-
import java.util.Collections;
1918
import java.util.List;
2019
import java.util.Map;
2120
import java.util.Optional;
@@ -85,11 +84,6 @@ public AdaptableBus addAdaptableClassifier(final Classifier<Adaptable> adaptable
8584
return this;
8685
}
8786

88-
@Override
89-
public final Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers() {
90-
return Collections.unmodifiableMap(oneTimeStringConsumers);
91-
}
92-
9387
@Override
9488
public CompletionStage<String> subscribeOnceForString(final Classification tag, final Duration timeout) {
9589
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
@@ -262,7 +256,7 @@ private boolean publishToPersistentAdaptableSubscribers(final Adaptable adaptabl
262256
if (persistentConsumers != null && !persistentConsumers.isEmpty()) {
263257
publishedToPersistentSubscribers = true;
264258
for (final Entry<Consumer<Adaptable>> entry : persistentConsumers) {
265-
runConsumerAsync(entry.getValue(), adaptable, tag);
259+
runConsumerAsync(entry.value, adaptable, tag);
266260
}
267261
}
268262
}
@@ -314,7 +308,7 @@ private <T> void removeAfterIdle(
314308

315309
private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> registry,
316310
final Entry<T> entry) {
317-
registry.compute(entry.getKey(), (key, previousSet) -> {
311+
registry.compute(entry.key, (key, previousSet) -> {
318312
final Set<Entry<T>> concurrentHashSet =
319313
previousSet != null ? previousSet : ConcurrentHashMap.newKeySet();
320314
concurrentHashSet.add(entry);
@@ -325,7 +319,7 @@ private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> regist
325319
private static <T> void replaceEntry(final Map<Classification, Set<Entry<T>>> registry, final Entry<T> entry) {
326320
final Set<Entry<T>> set = ConcurrentHashMap.newKeySet();
327321
set.add(entry);
328-
registry.put(entry.getKey(), set);
322+
registry.put(entry.key, set);
329323
}
330324

331325
private Optional<Adaptable> parseAsAdaptable(final String message) {
@@ -345,7 +339,7 @@ private Optional<Adaptable> parseAsAdaptable(final String message) {
345339
private <T> void removeEntry(final Map<Classification, Set<Entry<T>>> registry,
346340
final Entry<?> entry,
347341
final Runnable onRemove) {
348-
registry.computeIfPresent(entry.getKey(), (key, set) -> {
342+
registry.computeIfPresent(entry.key, (key, set) -> {
349343
if (set.remove(entry)) {
350344
onRemove.run();
351345
}
@@ -365,7 +359,7 @@ private static <T> T removeOne(final Map<Classification, Set<Entry<T>>> registry
365359
.findAny()
366360
.map(entry -> {
367361
if (set.remove(entry)) {
368-
result.set(entry.getValue());
362+
result.set(entry.value);
369363
}
370364
return set.isEmpty() ? null : set;
371365
})
@@ -377,4 +371,18 @@ private static Throwable timeout(final Duration duration) {
377371
return new TimeoutException("Timed out after " + duration);
378372
}
379373

374+
/**
375+
* Similar to Map.Entry but with object reference identity and fixed key type to act as identifier for
376+
* a subscription.
377+
*/
378+
private static final class Entry<T> implements SubscriptionId {
379+
380+
private final Classification key;
381+
private final T value;
382+
383+
private Entry(final Classification key, final T value) {
384+
this.key = key;
385+
this.value = value;
386+
}
387+
}
380388
}

java/src/main/java/org/eclipse/ditto/client/internal/bus/Entry.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

java/src/main/java/org/eclipse/ditto/client/twin/internal/UncompletedConsumptionRequestException.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

java/src/test/java/org/eclipse/ditto/client/DittoClientTwinTest.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,11 @@
1313
package org.eclipse.ditto.client;
1414

1515
import static org.assertj.core.api.Assertions.assertThat;
16-
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
1716
import static org.eclipse.ditto.client.TestConstants.Thing.THING_ID;
1817
import static org.eclipse.ditto.model.base.acks.AcknowledgementRequest.parseAcknowledgementRequest;
1918

20-
import java.util.concurrent.CompletableFuture;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.TimeoutException;
24-
2519
import org.assertj.core.api.Assertions;
2620
import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
27-
import org.eclipse.ditto.client.twin.internal.UncompletedConsumptionRequestException;
2821
import org.eclipse.ditto.json.JsonPointer;
2922
import org.eclipse.ditto.json.JsonValue;
3023
import org.eclipse.ditto.model.base.common.HttpStatusCode;
@@ -104,38 +97,6 @@ public void testAttributeEventAcknowledgement() {
10497
.isEqualTo(HttpStatusCode.INTERNAL_SERVER_ERROR);
10598
}
10699

107-
@Test
108-
public void startConsumptionParallelOnSameTwinChannelShouldThrowException()
109-
throws InterruptedException, ExecutionException, TimeoutException {
110-
111-
final CompletableFuture<Void> voidCompletableFuture1 = client.twin().startConsumption();
112-
final CompletableFuture<Void> voidCompletableFuture2 = client.twin().startConsumption();
113-
114-
messaging.receivePlainString("START-SEND-EVENTS:ACK");
115-
116-
voidCompletableFuture1.get(10, TimeUnit.SECONDS);
117-
assertThatExceptionOfType(ExecutionException.class)
118-
.isThrownBy(() -> voidCompletableFuture2.get(10, TimeUnit.SECONDS))
119-
.withCauseInstanceOf(UncompletedConsumptionRequestException.class);
120-
}
121-
122-
@Test
123-
public void startConsumptionParallelOnSameLiveChannelShouldThrowException()
124-
throws InterruptedException, ExecutionException, TimeoutException {
125-
126-
final CompletableFuture<Void> voidCompletableFuture1 = client.live().startConsumption();
127-
final CompletableFuture<Void> voidCompletableFuture2 = client.live().startConsumption();
128-
129-
messaging.receivePlainString("START-SEND-LIVE-COMMANDS:ACK");
130-
messaging.receivePlainString("START-SEND-LIVE-EVENTS:ACK");
131-
messaging.receivePlainString("START-SEND-MESSAGES:ACK");
132-
133-
voidCompletableFuture1.get(10, TimeUnit.SECONDS);
134-
assertThatExceptionOfType(ExecutionException.class)
135-
.isThrownBy(() -> voidCompletableFuture2.get(10, TimeUnit.SECONDS))
136-
.withCauseInstanceOf(UncompletedConsumptionRequestException.class);
137-
}
138-
139100
@Test
140101
public void testFeatureEventAcknowledgement() {
141102
client.twin().startConsumption();

java/src/test/java/org/eclipse/ditto/client/messaging/internal/MockMessagingProvider.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,6 @@ public void receiveEvent(final Message<ThingEvent> message) {
150150
adaptableBus.publish(ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
151151
}
152152

153-
public void receivePlainString(final String plain) {
154-
adaptableBus.publish(plain);
155-
}
156-
157153
@Override
158154
public void close() {
159155
executor.shutdownNow();

0 commit comments

Comments
 (0)