Skip to content

Commit 19c2ec5

Browse files
DerSchwilkStanchev Aleksandar
authored andcommitted
Introduce options for java live message API
This allows setting the condition header to live messages. Co-authored-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io> Signed-off-by: David Schwilk <david.schwilk@bosch.io>
1 parent b4bb350 commit 19c2ec5

File tree

10 files changed

+148
-7
lines changed

10 files changed

+148
-7
lines changed

java/src/main/java/org/eclipse/ditto/client/internal/OutgoingMessageFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.eclipse.ditto.messages.model.Message;
5353
import org.eclipse.ditto.messages.model.MessageBuilder;
5454
import org.eclipse.ditto.messages.model.MessageHeaders;
55+
import org.eclipse.ditto.messages.model.MessageHeadersBuilder;
5556
import org.eclipse.ditto.messages.model.MessagesModelFactory;
5657
import org.eclipse.ditto.policies.model.Policy;
5758
import org.eclipse.ditto.policies.model.PolicyId;
@@ -544,11 +545,16 @@ public DeleteFeatureProperties deleteFeatureProperties(final ThingId thingId,
544545
* @param <T> the type of the payload.
545546
* @return a sendMessage message.
546547
*/
547-
public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, final Message<T> message) {
548+
public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, final Message<T> message,
549+
final Option<?>... options) {
548550

549-
final MessageHeaders messageHeaders = message.getHeaders().toBuilder()
550-
.correlationId(message.getHeaders().getCorrelationId().orElseGet(() -> UUID.randomUUID().toString()))
551-
.build();
551+
final DittoHeaders dittoHeaders = buildDittoHeaders(EnumSet.of(CONDITION), options);
552+
final MessageHeadersBuilder messageHeadersBuilder = message.getHeaders().toBuilder()
553+
.correlationId(message.getHeaders().getCorrelationId().orElseGet(() -> UUID.randomUUID().toString()));
554+
if (dittoHeaders.getCondition().isPresent()) {
555+
messageHeadersBuilder.condition(dittoHeaders.getCondition().get());
556+
}
557+
MessageHeaders messageHeaders = messageHeadersBuilder.build();
552558

553559
final MessageBuilder<T> messageBuilder = message.getPayload()
554560
.map(payload -> {

java/src/main/java/org/eclipse/ditto/client/live/Live.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,27 @@ public interface Live extends CommonManagement<LiveThingHandle, LiveFeatureHandl
5757
*/
5858
<T> PendingMessage<T> message();
5959

60+
/**
61+
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message}
62+
* <em>FROM</em> or <em>TO</em> a "Live" {@link Thing} or a "Live" Thing's {@link
63+
* org.eclipse.ditto.things.model.Feature Feature}. <p> Example: </p>
64+
* <pre>
65+
* client.live().message()
66+
* .from("org.eclipse.ditto:fireDetectionDevice")
67+
* .featureId("smokeDetector")
68+
* .subject("fireAlert")
69+
* .payload(JsonFactory.newObject("{\"action\" : \"call fire department\"}"))
70+
* .send();
71+
* </pre>
72+
*
73+
* @param <T> the type of the Message's payload.
74+
* @param options options sent to the outbound message.
75+
* @param options options sent to the outbound message.
76+
* @return a new message builder that offers the functionality to create and send the message.
77+
* @since 3.1.0
78+
*/
79+
<T> PendingMessage<T> message(Option<?>... options);
80+
6081
/**
6182
* Start consuming changes, messages and commands on this {@code live()} channel.
6283
*

java/src/main/java/org/eclipse/ditto/client/live/LiveFeatureHandle.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.eclipse.ditto.client.live.messages.MessageRegistration;
1919
import org.eclipse.ditto.client.live.messages.PendingMessageWithFeatureId;
2020
import org.eclipse.ditto.client.management.FeatureHandle;
21+
import org.eclipse.ditto.client.options.Option;
2122

2223
/**
2324
* A {@code LiveFeatureHandle} provides management and registration functionality for specific {@code Live Thing}
@@ -48,4 +49,26 @@ public interface LiveFeatureHandle extends FeatureHandle, MessageRegistration, F
4849
*/
4950
<T> PendingMessageWithFeatureId<T> message();
5051

52+
/**
53+
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message} <em>TO</em>
54+
* or <em>FROM</em> the {@code Feature} handled by this {@code LiveFeatureHandle}. <p> Example: </p>
55+
* <pre>
56+
* client.live()
57+
* .forId("org.eclipse.ditto:fireDetectionDevice")
58+
* .forFeature("smokeDetector")
59+
* .message()
60+
* .from()
61+
* .subject("fireAlert")
62+
* .payload("{\"action\" : \"call fire department\"}")
63+
* .contentType("application/json")
64+
* .send();
65+
* </pre>
66+
*
67+
* @param <T> the type of the Message's payload.
68+
* @param options options sent to the outbound message.
69+
* @return a new message builder that offers the functionality to create and send the message.
70+
* @since 3.1.0
71+
*/
72+
<T> PendingMessageWithFeatureId<T> message(Option<?>... options);
73+
5174
}

java/src/main/java/org/eclipse/ditto/client/live/LiveThingHandle.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.eclipse.ditto.client.live.messages.MessageRegistration;
2323
import org.eclipse.ditto.client.live.messages.PendingMessageWithThingId;
2424
import org.eclipse.ditto.client.management.ThingHandle;
25+
import org.eclipse.ditto.client.options.Option;
2526

2627
/**
2728
* A {@code LiveThingHandle} provides management and registration functionality for specific <em>Live Things</em>.
@@ -50,4 +51,23 @@ public interface LiveThingHandle
5051
*/
5152
<T> PendingMessageWithThingId<T> message();
5253

54+
/**
55+
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message}
56+
* <em>FROM</em> or <em>TO</em> the {@code Thing} handled by this {@code LiveThingHandle}. <p> Example: </p>
57+
* <pre>
58+
* client.live().forId("org.eclipse.ditto:fireDetectionDevice").message()
59+
* .from()
60+
* .subject("fireAlert")
61+
* .payload("{\"action\" : \"call fire department\"}")
62+
* .contentType("application/json")
63+
* .send();
64+
* </pre>
65+
*
66+
* @param <T> the type of the Message's payload.
67+
* @param options options sent to the outbound message.
68+
* @return a new message builder that offers the functionality to create and send the message.
69+
* @since 3.1.0
70+
*/
71+
<T> PendingMessageWithThingId<T> message(Option<?>... options);
72+
5373
}

java/src/main/java/org/eclipse/ditto/client/live/internal/LiveFeatureHandleImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.eclipse.ditto.client.live.messages.RepliableMessage;
4343
import org.eclipse.ditto.client.management.internal.FeatureHandleImpl;
4444
import org.eclipse.ditto.client.messaging.MessagingProvider;
45+
import org.eclipse.ditto.client.options.Option;
4546
import org.eclipse.ditto.protocol.TopicPath;
4647
import org.eclipse.ditto.things.model.ThingId;
4748
import org.slf4j.Logger;
@@ -89,6 +90,12 @@ public <T> PendingMessageWithFeatureId<T> message() {
8990
messagingProvider).withThingAndFeatureIds(getEntityId(), getFeatureId());
9091
}
9192

93+
@Override
94+
public <T> PendingMessageWithFeatureId<T> message(final Option<?>... options) {
95+
return PendingMessageImpl.<T>of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
96+
messagingProvider).withThingAndFeatureIds(getEntityId(), getFeatureId());
97+
}
98+
9299
@Override
93100
public <T, U> void registerForMessage(final String registrationId,
94101
final String subject,

java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.eclipse.ditto.client.live.messages.RepliableMessage;
5353
import org.eclipse.ditto.client.management.ClientReconnectingException;
5454
import org.eclipse.ditto.client.messaging.MessagingProvider;
55+
import org.eclipse.ditto.client.options.Option;
5556
import org.eclipse.ditto.json.JsonKey;
5657
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
5758
import org.eclipse.ditto.messages.model.Message;
@@ -224,6 +225,12 @@ public <T> PendingMessage<T> message() {
224225
messagingProvider);
225226
}
226227

228+
@Override
229+
public <T> PendingMessage<T> message(final Option<?>... options) {
230+
return PendingMessageImpl.of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
231+
messagingProvider);
232+
}
233+
227234
@Override
228235
public <T, U> void registerForMessage(final String registrationId,
229236
final String subject,

java/src/main/java/org/eclipse/ditto/client/live/internal/LiveThingHandleImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.eclipse.ditto.client.live.messages.RepliableMessage;
4343
import org.eclipse.ditto.client.management.internal.ThingHandleImpl;
4444
import org.eclipse.ditto.client.messaging.MessagingProvider;
45+
import org.eclipse.ditto.client.options.Option;
4546
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
4647
import org.eclipse.ditto.protocol.TopicPath;
4748
import org.eclipse.ditto.things.model.ThingId;
@@ -98,6 +99,12 @@ public <T> PendingMessageWithThingId<T> message() {
9899
messagingProvider).withThingId(getEntityId());
99100
}
100101

102+
@Override
103+
public <T> PendingMessageWithThingId<T> message(final Option<?>... options) {
104+
return PendingMessageImpl.<T>of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
105+
messagingProvider).withThingId(getEntityId());
106+
}
107+
101108
@Override
102109
public <T, U> void registerForMessage(final String registrationId,
103110
final String subject,

java/src/main/java/org/eclipse/ditto/client/live/internal/PendingMessageImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.eclipse.ditto.client.live.messages.PendingMessageWithThingId;
2727
import org.eclipse.ditto.client.live.messages.internal.ImmutableMessageSender;
2828
import org.eclipse.ditto.client.messaging.MessagingProvider;
29+
import org.eclipse.ditto.client.options.Option;
30+
import org.eclipse.ditto.client.options.Options;
2931
import org.eclipse.ditto.messages.model.Message;
3032
import org.eclipse.ditto.things.model.ThingId;
3133
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
@@ -41,17 +43,21 @@ final class PendingMessageImpl<T> implements PendingMessage<T> {
4143
private final MessageSerializerRegistry messageSerializerRegistry;
4244
private final ProtocolAdapter protocolAdapter;
4345
private final MessagingProvider messagingProvider;
46+
private final Option<?>[] options;
4447

4548
private PendingMessageImpl(final Logger logger,
4649
final OutgoingMessageFactory outgoingMessageFactory,
4750
final MessageSerializerRegistry messageSerializerRegistry,
4851
final ProtocolAdapter protocolAdapter,
49-
final MessagingProvider messagingProvider) {
52+
final MessagingProvider messagingProvider,
53+
final Option<?>... options) {
54+
5055
this.logger = logger;
5156
this.outgoingMessageFactory = outgoingMessageFactory;
5257
this.messageSerializerRegistry = messageSerializerRegistry;
5358
this.protocolAdapter = protocolAdapter;
5459
this.messagingProvider = messagingProvider;
60+
this.options = options;
5561
}
5662

5763
static <T> PendingMessageImpl<T> of(final Logger logger,
@@ -120,7 +126,7 @@ public MessageSender.SetFeatureIdOrSubject<T> to(final ThingId thingId) {
120126

121127
private void sendMessage(final Message<T> message, @Nullable final ResponseConsumer<?> responseConsumer) {
122128
final Message<?> toBeSentMessage =
123-
outgoingMessageFactory.sendMessage(messageSerializerRegistry, message);
129+
outgoingMessageFactory.sendMessage(messageSerializerRegistry, message, options);
124130
logger.trace("Message about to send: {}", toBeSentMessage);
125131
if (responseConsumer != null) {
126132
toBeSentMessage.getCorrelationId().ifPresent(correlationId ->

java/src/main/java/org/eclipse/ditto/client/options/internal/AbstractOptionVisitor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.eclipse.ditto.client.options.Option;
2424
import org.eclipse.ditto.client.options.OptionName;
2525

26-
2726
/**
2827
* This abstract implementation of {@link OptionVisitor} implements the parts which are common for all option visitors
2928
* like comparing the name of the option with the expected name, getting the value from the option and handling

java/src/test/java/org/eclipse/ditto/client/internal/OutgoingMessageFactoryTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,18 @@
1919
import org.assertj.core.api.JUnitSoftAssertions;
2020
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
2121
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
22+
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
23+
import org.eclipse.ditto.client.live.messages.MessageSerializers;
24+
import org.eclipse.ditto.client.live.messages.internal.DefaultMessageSerializerRegistry;
2225
import org.eclipse.ditto.client.options.OptionName;
2326
import org.eclipse.ditto.client.options.Options;
27+
import org.eclipse.ditto.json.JsonObject;
28+
import org.eclipse.ditto.messages.model.Message;
29+
import org.eclipse.ditto.messages.model.MessageDirection;
30+
import org.eclipse.ditto.messages.model.MessageHeaders;
31+
import org.eclipse.ditto.messages.model.MessagesModelFactory;
32+
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage;
33+
import org.eclipse.ditto.things.model.ThingId;
2434
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeature;
2535
import org.junit.Before;
2636
import org.junit.Rule;
@@ -81,4 +91,39 @@ public void deleteThingWithLiveChannelConditionExpressionThrowsException() {
8191
.withNoCause();
8292
}
8393

94+
@Test
95+
public void LiveMessageWithOnlyAllowedOptionsReturnsExpected() {
96+
final Message<?> liveMessage = underTest.sendMessage(new DefaultMessageSerializerRegistry(), getMessage(),
97+
Options.condition(CONDITION_EXPRESSION));
98+
99+
softly.assertThat((CharSequence) liveMessage.getEntityId())
100+
.as("entity ID")
101+
.isEqualTo(THING_ID);
102+
softly.assertThat(liveMessage.getHeaders())
103+
.as("Ditto headers")
104+
.satisfies(dittoHeaders -> {
105+
softly.assertThat(dittoHeaders)
106+
.as("condition expression")
107+
.containsEntry(DittoHeaderDefinition.CONDITION.getKey(), CONDITION_EXPRESSION);
108+
});
109+
}
110+
111+
@Test
112+
public void liverMessageWithLiveChannelConditionExpressionThrowsException() {
113+
Assertions.assertThatIllegalArgumentException()
114+
.isThrownBy(() -> underTest.sendMessage(new DefaultMessageSerializerRegistry(), getMessage(),
115+
Options.liveChannelCondition(LIVE_CHANNEL_CONDITION_EXPRESSION)))
116+
.withMessage("Option '%s' is not allowed. This operation only allows [%s, %s, %s].",
117+
OptionName.Global.LIVE_CHANNEL_CONDITION,
118+
OptionName.Global.CONDITION,
119+
OptionName.Global.DITTO_HEADERS,
120+
OptionName.Modify.RESPONSE_REQUIRED)
121+
.withNoCause();
122+
}
123+
124+
private static Message<?> getMessage() {
125+
return MessagesModelFactory.newMessageBuilder(MessageHeaders.newBuilder(MessageDirection.TO,
126+
THING_ID, "subject").build()).build();
127+
}
128+
84129
}

0 commit comments

Comments
 (0)