-
Notifications
You must be signed in to change notification settings - Fork 37
Health support #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Health support #130
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1331,6 +1331,27 @@ The connector is responsible for the acknowledgment (positive or negative) of th | |
* An outgoing connector must acknowledge the incoming `org.eclipse.microprofile.reactive.messaging.Message` once it has successfully dispatched the message. | ||
* An outgoing connector must acknowledge negatively the incoming `org.eclipse.microprofile.reactive.messaging.Message` if it cannot be dispatched. | ||
|
||
== Health | ||
|
||
When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness and liveness checks are produced. | ||
|
||
=== Readiness | ||
danielkec marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Readiness check `messaging` gets UP only when subscribers of all messaging channels are subscribed to theirs respective publishers and `onSubscribe` signal is detected on all channels. | ||
danielkec marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
=== Liveness | ||
|
||
Liveness check `messaging` gets DOWN only when `cancel` or `onError` signals are detected on any of the messaging channels. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to Readiness - define if it starts in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What should happen if a message gets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe acknowledgement should be related to the ability of the channel to send messages as messaging itself doesn't leverage acking/nacking anyhow. It should be responsibility of the publishers and subscribers(connectors) as that requirement can change case by case. In other words such checks should be present in the connectors or business code if or when needed. |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering about "all channels" while it seems to only require checking the final subscribers (subscriber methods, outgoing channels). Also, about Maybe it should focus on:
Intermediate channels can recover from failures, and implement retry logic, so it should not be reported as a DOWN. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be quite a different angle than the one I was trying to take. There are not much parts of reactive pipelines we are actually managing in the MP Messaging impls and those are the channels. Channels are basically processors which are not mutating the stream (except for un/wrapping) so if I am not sure that the health of connectors, methods and unmanaged streams should be in the scope of MP Messaging implicit health check. I would count those as business code where health and readiness can be monitored with standard MP Health API. Also implicit checks should be consistent, if we differentiate its behavior by its pub/sub kinds, we risk user confusion. Again custom check with exclusion/inclusion can help in such cases. |
||
=== Excluding channels | ||
|
||
For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with `mp.health.live.exclude-channel` or `mp.health.live.exclude-channel` config keys. | ||
danielkec marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
[source, properties] | ||
---- | ||
mp.health.live.exclude-channel=<channel-name> | ||
danielkec marked this conversation as resolved.
Show resolved
Hide resolved
danielkec marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mp.health.ready.exclude-channel=<channel-name> | ||
---- | ||
|
||
== Metrics | ||
|
||
When MicroProfile Reactive Messaging is used in an environment where MicroProfile Metrics is enabled, the Reactive Messaging implementation automatically produces metrics. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
/** | ||
* Copyright (c) 2021 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* You may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.eclipse.microprofile.reactive.messaging.tck.health; | ||
|
||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
|
||
import org.junit.Assert; | ||
|
||
@ApplicationScoped | ||
public class ChannelRegister { | ||
|
||
private final Map<String, Channel<?>> channelMap = new HashMap<>(); | ||
private final ReentrantLock mapLock = new ReentrantLock(); | ||
|
||
@SuppressWarnings("unchecked") | ||
<P> Channel<P> get(String channelName) { | ||
try { | ||
mapLock.lock(); | ||
channelMap.putIfAbsent(channelName, new Channel<P>()); | ||
return (Channel<P>) channelMap.get(channelName); | ||
} | ||
finally { | ||
mapLock.unlock(); | ||
} | ||
} | ||
|
||
Collection<Channel<?>> getAllChannels() { | ||
try { | ||
mapLock.lock(); | ||
return Collections.unmodifiableCollection(channelMap.values()); | ||
} | ||
finally { | ||
mapLock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Iterate over all registered channels, on each's publisher invoke onSubscribe signal | ||
* and block until any onSubscribe signal is received on its subscriber. | ||
*/ | ||
void readyAll() { | ||
getAllChannels().forEach(Channel::ready); | ||
} | ||
|
||
/** | ||
* Iterate over all registered channels, on each's publisher invoke onError signal | ||
* and block until any error signal is received on its subscriber. | ||
*/ | ||
void failAll() { | ||
getAllChannels().forEach(Channel::fail); | ||
} | ||
|
||
/** | ||
* Iterate over all registered channels, on each's publisher invoke onComplete signal | ||
* and block until any complete signal is received on its subscriber. | ||
*/ | ||
void completeAll() { | ||
getAllChannels().forEach(Channel::complete); | ||
} | ||
|
||
/** | ||
* Iterate over all registered channels, on each's subscriber subscription invoke cancel signal | ||
* and block until any cancel signal is received on its publisher. | ||
*/ | ||
void cancelAll() { | ||
getAllChannels().forEach(Channel::cancel); | ||
} | ||
|
||
public static class Channel<PAYLOAD> { | ||
private final TestPublisher<PAYLOAD> publisher; | ||
private final TestSubscriber<PAYLOAD> subscriber; | ||
|
||
public Channel() { | ||
publisher = new TestPublisher<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); | ||
subscriber = new TestSubscriber<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
/** | ||
* Access this channel's publisher. | ||
* | ||
* @return this channel's test publisher | ||
*/ | ||
public TestPublisher<PAYLOAD> publisher() { | ||
return publisher; | ||
} | ||
|
||
/** | ||
* Access this channel's subscriber. | ||
* | ||
* @return this channel's test subscriber | ||
*/ | ||
public TestSubscriber<PAYLOAD> subscriber() { | ||
return subscriber; | ||
} | ||
|
||
/** | ||
* Trigger onSubscribe signal and block until received by subscriber. | ||
* | ||
* @return this channel | ||
*/ | ||
public Channel<PAYLOAD> ready() { | ||
this.publisher().ready(); | ||
this.subscriber().awaitSubscription(); | ||
return this; | ||
} | ||
|
||
/** | ||
* Trigger onComplete signal and block until received by subscriber. | ||
* | ||
* @return this channel | ||
*/ | ||
public Channel<PAYLOAD> complete() { | ||
this.publisher().complete(); | ||
this.subscriber().awaitCompletion(); | ||
return this; | ||
} | ||
|
||
/** | ||
* Trigger onError signal and block until received by subscriber. | ||
* | ||
* @return this channel | ||
*/ | ||
public Channel<PAYLOAD> fail() { | ||
Exception exception = new Exception("BOOM!!!"); | ||
this.publisher().fail(exception); | ||
Assert.assertEquals(exception.getMessage(), this.subscriber().awaitError().getMessage()); | ||
return this; | ||
} | ||
|
||
/** | ||
* Trigger cancel signal and block until received by publisher. | ||
* | ||
* @return this channel | ||
*/ | ||
public Channel<PAYLOAD> cancel() { | ||
this.subscriber().awaitSubscription().cancel(); | ||
this.publisher().awaitCancel(); | ||
return this; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/** | ||
* Copyright (c) 2021 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* You may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.eclipse.microprofile.reactive.messaging.tck.health; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.inject.Inject; | ||
|
||
import org.eclipse.microprofile.reactive.messaging.Incoming; | ||
import org.eclipse.microprofile.reactive.messaging.Outgoing; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
|
||
@ApplicationScoped | ||
public class HealthAllExcludedTestBean { | ||
|
||
public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; | ||
|
||
@Inject | ||
private ChannelRegister channelRegisterBean; | ||
|
||
|
||
@Incoming(ALL_EXCLUDED_CHANNEL) | ||
public Subscriber<String> consumeInnerChannel() { | ||
return channelRegisterBean.<String>get(ALL_EXCLUDED_CHANNEL).subscriber(); | ||
} | ||
|
||
@Outgoing(ALL_EXCLUDED_CHANNEL) | ||
public Publisher<String> produceInnerChannel() { | ||
return channelRegisterBean.<String>get(ALL_EXCLUDED_CHANNEL).publisher(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/** | ||
* Copyright (c) 2021 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* You may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.eclipse.microprofile.reactive.messaging.tck.health; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.json.Json; | ||
import javax.json.JsonArray; | ||
import javax.json.JsonObject; | ||
import javax.json.JsonReader; | ||
import javax.json.JsonValue; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
|
||
class HealthAssertions { | ||
|
||
private int responseCode; | ||
private JsonObject jsonResponse; | ||
|
||
static HealthAssertions create(String url) { | ||
try { | ||
HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to use HTTP to verify the checks? Doesn't Microprofile Health expose an API (asking the question as I never checked). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be great, I didn't found other way. MP Health TCKs use same approach |
||
con.setRequestMethod("GET"); | ||
con.setConnectTimeout(HealthBase.TIMEOUT_MILLIS); | ||
con.setReadTimeout(HealthBase.TIMEOUT_MILLIS); | ||
|
||
con.connect(); | ||
HealthAssertions healthAssertions = new HealthAssertions(); | ||
healthAssertions.responseCode = con.getResponseCode(); | ||
|
||
InputStream is = con.getErrorStream(); | ||
if(is == null){ | ||
is = con.getInputStream(); | ||
} | ||
JsonReader jsonReader = Json.createReader(is); | ||
healthAssertions.jsonResponse = jsonReader.readObject(); | ||
is.close(); | ||
con.disconnect(); | ||
return healthAssertions; | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
HealthAssertions assertResponseCode(int expected){ | ||
assertEquals(expected, this.responseCode); | ||
return this; | ||
} | ||
|
||
HealthAssertions assertResponseCodeUp(){ | ||
assertResponseCode(200); | ||
return this; | ||
} | ||
|
||
HealthAssertions assertResponseCodeDown(){ | ||
assertResponseCode(503); | ||
return this; | ||
} | ||
|
||
HealthAssertions assertMessagingStatus(String expectedStatus){ | ||
JsonArray checks = this.jsonResponse.getJsonArray("checks"); | ||
List<JsonObject> messagingObjects = checks.stream() | ||
.map(JsonValue::asJsonObject) | ||
.filter(o -> o.containsKey("name") && "messaging".equals(o.getString("name"))) | ||
.collect(Collectors.toList()); | ||
assertEquals(1, messagingObjects.size()); | ||
JsonObject messagingObject = messagingObjects.get(0); | ||
assertEquals(expectedStatus, messagingObject.getString("status")); | ||
return this; | ||
} | ||
|
||
HealthAssertions assertMessagingStatusUp(){ | ||
assertMessagingStatus("UP"); | ||
return this; | ||
} | ||
|
||
HealthAssertions assertMessagingStatusDown(){ | ||
assertMessagingStatus("DOWN"); | ||
return this; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case, we don't pass the checks, what extra data should we provide? None won't be very useful (even if these checks tend to be consumed by machines ignoring the extra data). Should we list the unsatisfied channels? Is it implementation specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was playing with that idea but couldn't come with any sufficient reason for requiring channel listing in the checks