From d0f0c9c46962619f87252fd182d00cde075a40df Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 24 Feb 2021 16:34:00 +0100 Subject: [PATCH 1/4] Implicit health check #47 Signed-off-by: Daniel Kec --- spec/src/main/asciidoc/architecture.asciidoc | 21 +++ tck/pom.xml | 20 ++- .../messaging/tck/health/ChannelRegister.java | 164 ++++++++++++++++++ .../tck/health/HealthAllExcludedTestBean.java | 48 +++++ .../tck/health/HealthAssertions.java | 103 +++++++++++ .../messaging/tck/health/HealthBase.java | 63 +++++++ ...HealthCancelledConnectorInChannelTest.java | 127 ++++++++++++++ ...ealthCancelledConnectorOutChannelTest.java | 127 ++++++++++++++ .../HealthCancelledInnerChannelTest.java | 126 ++++++++++++++ .../health/HealthErroredAndCancelledTest.java | 129 ++++++++++++++ .../HealthErroredConnectorInChannelTest.java | 126 ++++++++++++++ .../HealthErroredConnectorOutChannelTest.java | 126 ++++++++++++++ .../health/HealthErroredInnerChannelTest.java | 128 ++++++++++++++ .../tck/health/HealthExclusionTest.java | 125 +++++++++++++ .../health/HealthLiveExcludedTestBean.java | 48 +++++ .../tck/health/HealthLiveExclusionTest.java | 135 ++++++++++++++ .../health/HealthReadyExcludedTestBean.java | 48 +++++ .../tck/health/HealthReadyExclusionTest.java | 112 ++++++++++++ .../messaging/tck/health/HealthTestBean.java | 59 +++++++ .../tck/health/HealthTestConnector.java | 57 ++++++ .../messaging/tck/health/TestPublisher.java | 115 ++++++++++++ .../messaging/tck/health/TestSubscriber.java | 115 ++++++++++++ 22 files changed, 2119 insertions(+), 3 deletions(-) create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java create mode 100644 tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java diff --git a/spec/src/main/asciidoc/architecture.asciidoc b/spec/src/main/asciidoc/architecture.asciidoc index c6b9c479..e9f6dc94 100644 --- a/spec/src/main/asciidoc/architecture.asciidoc +++ b/spec/src/main/asciidoc/architecture.asciidoc @@ -1331,6 +1331,27 @@ The connector is responsible for the acknowledgment (positive or negative) of th * An outgoing connector must acknowledge the incoming `org.eclipse.microprofile.reactive.messaging.Message` once it has successfully dispatched the message. * An outgoing connector must acknowledge negatively the incoming `org.eclipse.microprofile.reactive.messaging.Message` if it cannot be dispatched. +== Health + +When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness and liveness checks are produced. + +=== Readiness +Readiness check `messaging` gets UP only when subscribers of all messaging channels are subscribed to theirs respective publishers and `onSubscribe` signal is detected on all channels. + +=== Liveness + +Liveness check `messaging` gets DOWN only when `cancel` or `onError` signals are detected on any of the messaging channels. + +=== Excluding channels + +For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with `mp.health.live.exclude-channel` or `mp.health.live.exclude-channel` config keys. + +[source, properties] +---- +mp.health.live.exclude-channel= +mp.health.ready.exclude-channel= +---- + == Metrics When MicroProfile Reactive Messaging is used in an environment where MicroProfile Metrics is enabled, the Reactive Messaging implementation automatically produces metrics. diff --git a/tck/pom.xml b/tck/pom.xml index 22d170ca..8325aae8 100644 --- a/tck/pom.xml +++ b/tck/pom.xml @@ -34,6 +34,7 @@ 1.4.1.Final 1.2.6 + 1.1.6 @@ -70,19 +71,19 @@ microprofile-reactive-streams-operators-api provided - + org.eclipse.microprofile.config microprofile-config-api provided - + jakarta.enterprise jakarta.enterprise.cdi-api provided - + jakarta.annotation jakarta.annotation-api @@ -120,6 +121,19 @@ awaitility + + jakarta.json + jakarta.json-api + ${version.glassfish.json} + provided + + + + org.glassfish + jakarta.json + ${version.glassfish.json} + + org.jboss.shrinkwrap shrinkwrap-api diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java new file mode 100644 index 00000000..91e96fde --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import javax.enterprise.context.ApplicationScoped; + +import org.junit.Assert; + +@ApplicationScoped +public class ChannelRegister { + + private final Map> channelMap = new HashMap<>(); + private final ReentrantLock mapLock = new ReentrantLock(); + + @SuppressWarnings("unchecked") +

Channel

get(String channelName) { + try { + mapLock.lock(); + channelMap.putIfAbsent(channelName, new Channel

()); + return (Channel

) channelMap.get(channelName); + } + finally { + mapLock.unlock(); + } + } + + Collection> getAllChannels() { + try { + mapLock.lock(); + return Collections.unmodifiableCollection(channelMap.values()); + } + finally { + mapLock.unlock(); + } + } + + /** + * Iterate over all registered channels, on each's publisher invoke onSubscribe signal + * and block until any onSubscribe signal is received on its subscriber. + */ + void readyAll() { + getAllChannels().forEach(Channel::ready); + } + + /** + * Iterate over all registered channels, on each's publisher invoke onError signal + * and block until any error signal is received on its subscriber. + */ + void failAll() { + getAllChannels().forEach(Channel::fail); + } + + /** + * Iterate over all registered channels, on each's publisher invoke onComplete signal + * and block until any complete signal is received on its subscriber. + */ + void completeAll() { + getAllChannels().forEach(Channel::complete); + } + + /** + * Iterate over all registered channels, on each's subscriber subscription invoke cancel signal + * and block until any cancel signal is received on its publisher. + */ + void cancelAll() { + getAllChannels().forEach(Channel::cancel); + } + + public static class Channel { + private final TestPublisher publisher; + private final TestSubscriber subscriber; + + public Channel() { + publisher = new TestPublisher<>(100, TimeUnit.MILLISECONDS); + subscriber = new TestSubscriber<>(100, TimeUnit.MILLISECONDS); + } + + /** + * Access this channel's publisher. + * + * @return this channel's test publisher + */ + public TestPublisher publisher() { + return publisher; + } + + /** + * Access this channel's subscriber. + * + * @return this channel's test subscriber + */ + public TestSubscriber subscriber() { + return subscriber; + } + + /** + * Trigger onSubscribe signal and block until received by subscriber. + * + * @return this channel + */ + public Channel ready() { + this.publisher().ready(); + this.subscriber().awaitSubscription(); + return this; + } + + /** + * Trigger onComplete signal and block until received by subscriber. + * + * @return this channel + */ + public Channel complete() { + this.publisher().complete(); + this.subscriber().awaitCompletion(); + return this; + } + + /** + * Trigger onError signal and block until received by subscriber. + * + * @return this channel + */ + public Channel fail() { + Exception exception = new Exception("BOOM!!!"); + this.publisher().fail(exception); + Assert.assertEquals(exception.getMessage(), this.subscriber().awaitError().getMessage()); + return this; + } + + /** + * Trigger cancel signal and block until received by publisher. + * + * @return this channel + */ + public Channel cancel() { + this.subscriber().awaitSubscription().cancel(); + this.publisher().awaitCancel(); + return this; + } + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java new file mode 100644 index 00000000..3ca26490 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +@ApplicationScoped +public class HealthAllExcludedTestBean { + + public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(ALL_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(ALL_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(ALL_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(ALL_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java new file mode 100644 index 00000000..8f84648d --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.stream.Collectors; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; + +import static org.junit.Assert.assertEquals; + +class HealthAssertions { + + private int responseCode; + private JsonObject jsonResponse; + + static HealthAssertions create(String url) { + try { + HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); + con.setRequestMethod("GET"); + con.setConnectTimeout(5000); + con.setReadTimeout(5000); + + con.connect(); + HealthAssertions healthAssertions = new HealthAssertions(); + healthAssertions.responseCode = con.getResponseCode(); + + InputStream is = con.getErrorStream(); + if(is == null){ + is = con.getInputStream(); + } + JsonReader jsonReader = Json.createReader(is); + healthAssertions.jsonResponse = jsonReader.readObject(); + is.close(); + con.disconnect(); + return healthAssertions; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + HealthAssertions assertResponseCode(int expected){ + assertEquals(expected, this.responseCode); + return this; + } + + HealthAssertions assertResponseCodeUp(){ + assertResponseCode(200); + return this; + } + + HealthAssertions assertResponseCodeDown(){ + assertResponseCode(503); + return this; + } + + HealthAssertions assertMessagingStatus(String expectedStatus){ + JsonArray checks = this.jsonResponse.getJsonArray("checks"); + List messagingObjects = checks.stream() + .map(JsonValue::asJsonObject) + .filter(o -> o.containsKey("name") && "messaging".equals(o.getString("name"))) + .collect(Collectors.toList()); + assertEquals(1, messagingObjects.size()); + JsonObject messagingObject = messagingObjects.get(0); + assertEquals(expectedStatus, messagingObject.getString("status")); + return this; + } + + HealthAssertions assertMessagingStatusUp(){ + assertMessagingStatus("UP"); + return this; + } + + HealthAssertions assertMessagingStatusDown(){ + assertMessagingStatus("DOWN"); + return this; + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java new file mode 100644 index 00000000..d70feff8 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.net.URI; + +import org.eclipse.microprofile.reactive.messaging.tck.metrics.ConfigAsset; +import org.eclipse.microprofile.reactive.messaging.tck.metrics.TestConnector; +import org.jboss.arquillian.test.api.ArquillianResource; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.WebArchive; + +public class HealthBase { + + public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; + public static final String LIVE_EXCLUDED_CHANNEL = "live-excluded-channel"; + public static final String READY_EXCLUDED_CHANNEL = "ready-excluded-channel"; + public static final String CHANNEL_CONNECTOR_IN = "channel-connector-in"; + public static final String CHANNEL_CONNECTOR_OUT = "channel-connector-out"; + public static final String CHANNEL_INNER = "inner-channel"; + + protected static WebArchive prepareArchive(){ + ConfigAsset config = new ConfigAsset() + .put("mp.messaging.incoming.channel-connector-in.connector", TestConnector.ID) + .put("mp.messaging.outgoing.channel-connector-out.connector", TestConnector.ID) + .put("mp.health.ready.exclude-channel", READY_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL) + .put("mp.health.live.exclude-channel", LIVE_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL); + + return ShrinkWrap.create(WebArchive.class, HealthExclusionTest.class.getName() + ".war") + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsResource(config, "META-INF/microprofile-config.properties") + .addClasses(HealthTestBean.class, HealthTestConnector.class, ChannelRegister.class); + } + + @ArquillianResource + private URI uri; + + protected HealthAssertions getLiveness() { + return HealthAssertions.create(uri + "/health/live"); + } + + protected HealthAssertions getReadiness() { + return HealthAssertions.create(uri + "/health/ready"); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java new file mode 100644 index 00000000..1072d337 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthCancelledConnectorInChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java new file mode 100644 index 00000000..cfbdd37d --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthCancelledConnectorOutChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java new file mode 100644 index 00000000..030284e8 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java @@ -0,0 +1,126 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthCancelledInnerChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_INNER).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java new file mode 100644 index 00000000..7bd401c6 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java @@ -0,0 +1,129 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthErroredAndCancelledTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_INNER).fail(); + channelRegister.get(CHANNEL_CONNECTOR_IN).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java new file mode 100644 index 00000000..29791727 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java @@ -0,0 +1,126 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthErroredConnectorInChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java new file mode 100644 index 00000000..279e6584 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java @@ -0,0 +1,126 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthErroredConnectorOutChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java new file mode 100644 index 00000000..03e7d0c4 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java @@ -0,0 +1,128 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthErroredInnerChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java new file mode 100644 index 00000000..1f06a2d8 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java @@ -0,0 +1,125 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment(name = "HealthExclusionTest") + public static WebArchive deployment() { + return prepareArchive().addClass(HealthAllExcludedTestBean.class); + } + + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // ALL_EXCLUDED_CHANNEL is excluded by config + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testLivenessWithFailedExcluded() { + channelRegister.get(ALL_EXCLUDED_CHANNEL).ready(); + channelRegister.get(ALL_EXCLUDED_CHANNEL).fail(); + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java new file mode 100644 index 00000000..a5c7d840 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.LIVE_EXCLUDED_CHANNEL; + +@ApplicationScoped +public class HealthLiveExcludedTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(LIVE_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(LIVE_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(LIVE_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(LIVE_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java new file mode 100644 index 00000000..e540b809 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java @@ -0,0 +1,135 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthLiveExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment(name = "HealthLiveExclusionTest") + public static WebArchive deployment() { + return prepareArchive().addClass(HealthLiveExcludedTestBean.class); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testPartialReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // LIVE_EXCLUDED_CHANNEL is not excluded from readiness check + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testReadiness() { + channelRegister.get(LIVE_EXCLUDED_CHANNEL).ready(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLivenessWithFailedExcluded() { + channelRegister.get(LIVE_EXCLUDED_CHANNEL).fail(); + // LIVE_EXCLUDED_CHANNEL is excluded from liveness check + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java new file mode 100644 index 00000000..24f212a6 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.READY_EXCLUDED_CHANNEL; + +@ApplicationScoped +public class HealthReadyExcludedTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(READY_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(READY_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(READY_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(READY_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java new file mode 100644 index 00000000..6c665443 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java @@ -0,0 +1,112 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(Arquillian.class) +public class HealthReadyExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment(name = "HealthReadyExclusionTest") + public static WebArchive deployment() { + return prepareArchive().addClass(HealthReadyExcludedTestBean.class); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // READY_EXCLUDED_CHANNEL is excluded by config + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java new file mode 100644 index 00000000..4669342e --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_CONNECTOR_IN; +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_CONNECTOR_OUT; +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_INNER; + +@ApplicationScoped +public class HealthTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + @Incoming(CHANNEL_CONNECTOR_IN) + public Subscriber consumeConnectorChannel() { + return channelRegisterBean.get(CHANNEL_CONNECTOR_IN).subscriber(); + } + + @Outgoing(CHANNEL_CONNECTOR_OUT) + public Publisher produceConnectorChannel() { + return channelRegisterBean.get(CHANNEL_CONNECTOR_OUT).publisher(); + } + + @Incoming(CHANNEL_INNER) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(CHANNEL_INNER).subscriber(); + } + + @Outgoing(CHANNEL_INNER) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(CHANNEL_INNER).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java new file mode 100644 index 00000000..5017ac95 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory; +import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; +import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; + +import javax.inject.Inject; + +@Connector(HealthTestConnector.ID) +public class HealthTestConnector implements OutgoingConnectorFactory, IncomingConnectorFactory { + + public static final String ID = "test-connector"; + + @Inject + private ChannelRegister channelRegisterBean; + + @Override + public PublisherBuilder> getPublisherBuilder(final Config config) { + return ReactiveStreams.fromPublisher( + channelRegisterBean.get(config.getValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class)) + .publisher() + ).map(Message::of); + } + + @Override + public SubscriberBuilder, Void> getSubscriberBuilder(final Config config) { + return ReactiveStreams.>builder() + .map(Message::getPayload) + .onError(Throwable::printStackTrace) + .to(channelRegisterBean.get(config.getValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class)) + .subscriber()); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java new file mode 100644 index 00000000..6e4b4e86 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * Simple manual publisher with guarded state, not thread safe, sequential non-concurrent calls are expected. + * @param payload type + */ +public class TestPublisher implements Publisher { + + private final CompletableFuture> subscriberFuture = new CompletableFuture<>(); + private final CompletableFuture cancelFuture = new CompletableFuture<>(); + private final long timeout; + private final TimeUnit timeUnit; + private final AtomicReference state = new AtomicReference<>(State.INIT); + + private enum State { + INIT, READY, FAILED, CANCELLED, COMPLETED + } + + + public TestPublisher(long timeout, TimeUnit timeUnit) { + this.timeout = timeout; + this.timeUnit = timeUnit; + } + + @Override + public void subscribe(final Subscriber subscriber) { + subscriberFuture.complete(subscriber); + } + + public TestPublisher ready() { + if (state.compareAndSet(State.INIT, State.READY)) { + awaitSubscriber().onSubscribe(new Subscription() { + @Override + public void request(final long n) { + //noop + } + + @Override + public void cancel() { + if (state.getAndUpdate(s -> s != State.COMPLETED && s != State.FAILED ? State.CANCELLED : s) + != State.CANCELLED) { + cancelFuture.complete(null); + } + } + }); + } + return this; + } + + public TestPublisher emit(PAYLOAD payload) { + awaitSubscriber().onNext(payload); + return this; + } + + public TestPublisher fail(Throwable t) { + if (state.compareAndSet(State.READY, State.FAILED)) { + awaitSubscriber().onError(t); + } + return this; + } + + public TestPublisher complete() { + if (state.compareAndSet(State.READY, State.COMPLETED)) { + awaitSubscriber().onComplete(); + } + return this; + } + + public TestPublisher awaitCancel() { + try { + cancelFuture.get(timeout, timeUnit); + return this; + } + catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Subscriber awaitSubscriber() { + try { + return subscriberFuture.get(timeout, timeUnit); + } + catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java new file mode 100644 index 00000000..d01b25cd --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class TestSubscriber implements Subscriber { + + private CompletableFuture subscription = new CompletableFuture<>(); + private CompletableFuture error = new CompletableFuture<>(); + private CompletableFuture complete = new CompletableFuture<>(); + private List receivedItems = new CopyOnWriteArrayList<>(); + private long timeout; + private TimeUnit timeoutUnit; + + public TestSubscriber(long timeout, TimeUnit timeoutUnit) { + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + @Override + public void onSubscribe(final Subscription subscription) { + this.subscription.complete(subscription); + } + + @Override + public void onNext(final PAYLOAD payload) { + receivedItems.add(payload); + } + + @Override + public void onError(final Throwable t) { + error.complete(t); + } + + @Override + public void onComplete() { + complete.complete(null); + } + + private Subscription getSubscription() { + try { + return this.subscription.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + public Throwable awaitError() { + try { + return this.error.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + public void awaitCompletion() { + try { + this.complete.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + public ReadyManualSubscriber awaitSubscription() { + this.getSubscription(); + return new ReadyManualSubscriber(this); + } + + public static class ReadyManualSubscriber { + + private final TestSubscriber testSubscriber; + + public ReadyManualSubscriber(final TestSubscriber testSubscriber) { + this.testSubscriber = testSubscriber; + } + + public ReadyManualSubscriber request(long n) { + this.testSubscriber.getSubscription().request(n); + return this; + } + + public ReadyManualSubscriber cancel() { + this.testSubscriber.getSubscription().cancel(); + return this; + } + } +} From 4cba0f51ceed30b768ae23fc2baae86fa1401d94 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 24 Feb 2021 16:41:30 +0100 Subject: [PATCH 2/4] Timeout cleanup Signed-off-by: Daniel Kec --- .../reactive/messaging/tck/health/ChannelRegister.java | 4 ++-- .../reactive/messaging/tck/health/HealthAssertions.java | 4 ++-- .../reactive/messaging/tck/health/HealthBase.java | 2 ++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java index 91e96fde..cdfaa4e7 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java @@ -94,8 +94,8 @@ public static class Channel { private final TestSubscriber subscriber; public Channel() { - publisher = new TestPublisher<>(100, TimeUnit.MILLISECONDS); - subscriber = new TestSubscriber<>(100, TimeUnit.MILLISECONDS); + publisher = new TestPublisher<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + subscriber = new TestSubscriber<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } /** diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java index 8f84648d..bbf95755 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java @@ -42,8 +42,8 @@ static HealthAssertions create(String url) { try { HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); con.setRequestMethod("GET"); - con.setConnectTimeout(5000); - con.setReadTimeout(5000); + con.setConnectTimeout(HealthBase.TIMEOUT_MILLIS); + con.setReadTimeout(HealthBase.TIMEOUT_MILLIS); con.connect(); HealthAssertions healthAssertions = new HealthAssertions(); diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java index d70feff8..026b4c0e 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java @@ -29,6 +29,8 @@ public class HealthBase { + public static final int TIMEOUT_MILLIS = 5000; + public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; public static final String LIVE_EXCLUDED_CHANNEL = "live-excluded-channel"; public static final String READY_EXCLUDED_CHANNEL = "ready-excluded-channel"; From 1e3caf6c54e426d06864c27a24fe1680ac40e991 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 25 Feb 2021 21:50:35 +0100 Subject: [PATCH 3/4] Review issues * Rewording definition of state transition description of liveness and readiness * Move config key to messaging context * Document test cases * Utilize ArchiveExtender Signed-off-by: Daniel Kec --- spec/src/main/asciidoc/architecture.asciidoc | 31 +++++-- .../messaging/tck/health/ChannelRegister.java | 20 +++++ .../tck/health/HealthAssertions.java | 80 ++++++++++++++++- .../messaging/tck/health/HealthBase.java | 26 ++++-- ...HealthCancelledConnectorInChannelTest.java | 18 ++++ ...ealthCancelledConnectorOutChannelTest.java | 18 ++++ .../HealthCancelledInnerChannelTest.java | 18 ++++ .../health/HealthErroredAndCancelledTest.java | 18 ++++ .../HealthErroredConnectorInChannelTest.java | 18 ++++ .../HealthErroredConnectorOutChannelTest.java | 18 ++++ .../health/HealthErroredInnerChannelTest.java | 18 ++++ .../tck/health/HealthExclusionTest.java | 22 ++++- .../tck/health/HealthLiveExclusionTest.java | 24 +++++- .../tck/health/HealthReadyExclusionTest.java | 21 ++++- .../messaging/tck/health/TestPublisher.java | 75 +++++++++++++--- .../messaging/tck/health/TestSubscriber.java | 85 ++++++++++++------- 16 files changed, 446 insertions(+), 64 deletions(-) diff --git a/spec/src/main/asciidoc/architecture.asciidoc b/spec/src/main/asciidoc/architecture.asciidoc index e9f6dc94..9c2a6d90 100644 --- a/spec/src/main/asciidoc/architecture.asciidoc +++ b/spec/src/main/asciidoc/architecture.asciidoc @@ -1,5 +1,5 @@ // -// Copyright (c) 2018, 2020 Contributors to the Eclipse Foundation +// Copyright (c) 2018, 2021 Contributors to the Eclipse Foundation // // See the NOTICE file(s) distributed with this work for additional // information regarding copyright ownership. @@ -1333,23 +1333,40 @@ The connector is responsible for the acknowledgment (positive or negative) of th == Health -When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness and liveness checks are produced. +When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness +and liveness checks `messaging` are produced. + +[source, json] +---- +{ + "status": "UP", + "checks": [ + { + "name": "messaging", + "status": "UP" + } + ] +} +---- === Readiness -Readiness check `messaging` gets UP only when subscribers of all messaging channels are subscribed to theirs respective publishers and `onSubscribe` signal is detected on all channels. +State of the readiness check `messaging` is DOWN until subscribers of all messaging channels are subscribed to their +respective publishers and `onSubscribe` signal is detected on all channels. === Liveness -Liveness check `messaging` gets DOWN only when `cancel` or `onError` signals are detected on any of the messaging channels. +State of the liveness check `messaging` is UP until `cancel` or `onError` signals are detected on any of the messaging +channels. === Excluding channels -For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with `mp.health.live.exclude-channel` or `mp.health.live.exclude-channel` config keys. +For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with +`mp.messaging.health.live.exclude` or `mp.messaging.health.ready.exclude` config keys. [source, properties] ---- -mp.health.live.exclude-channel= -mp.health.ready.exclude-channel= +mp.messaging.health.live.exclude=, +mp.messaging.health.ready.exclude=, ---- == Metrics diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java index cdfaa4e7..453c63ff 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java @@ -29,12 +29,23 @@ import org.junit.Assert; +/** + * Helper bean for keeping references of all the messaging channels + * used in the health test scenario. + */ @ApplicationScoped public class ChannelRegister { private final Map> channelMap = new HashMap<>(); private final ReentrantLock mapLock = new ReentrantLock(); + /** + * Return representation of messaging channel by its name. + * + * @param channelName name of the messaging channel + * @param

payload type of the channel + * @return the channel with references to its publisher and subscriber + */ @SuppressWarnings("unchecked")

Channel

get(String channelName) { try { @@ -47,6 +58,11 @@

Channel

get(String channelName) { } } + /** + * Return all the channels used in the health test scenario. + * + * @return all the channels + */ Collection> getAllChannels() { try { mapLock.lock(); @@ -89,6 +105,10 @@ void cancelAll() { getAllChannels().forEach(Channel::cancel); } + /** + * Represents a messaging channel used in the health test scenario. + * @param type of the channel's payload + */ public static class Channel { private final TestPublisher publisher; private final TestSubscriber subscriber; diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java index bbf95755..5b5ac4f3 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java @@ -33,11 +33,20 @@ import static org.junit.Assert.assertEquals; +/** + * Health check response representation with ability to assert it's state. + */ class HealthAssertions { private int responseCode; private JsonObject jsonResponse; + /** + * Connect health endpoint and download response to be able to assert it later. + * + * @param url of the health check endpoint + * @return new instance of the health assertions + */ static HealthAssertions create(String url) { try { HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); @@ -64,21 +73,54 @@ static HealthAssertions create(String url) { } } - HealthAssertions assertResponseCode(int expected){ + /** + * Assert health check to have desired HTTP status code. + * + * @param expectedStatus 200 or 503 + * @return this health response assertion + */ + HealthAssertions assertResponseCode(int expected) { assertEquals(expected, this.responseCode); return this; } + /** + * Assert health check to have HTTP status code 200. + * + * @return this health response assertion + */ HealthAssertions assertResponseCodeUp(){ assertResponseCode(200); return this; } + /** + * Assert health check to have HTTP status code 503. + * + * @return this health response assertion + */ HealthAssertions assertResponseCodeDown(){ assertResponseCode(503); return this; } + /** + * Assert messaging check to have desired status. + *

{@code
+     * {
+     *   "status": "UP",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "EXPECTED_STATUS"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ HealthAssertions assertMessagingStatus(String expectedStatus){ JsonArray checks = this.jsonResponse.getJsonArray("checks"); List messagingObjects = checks.stream() @@ -91,12 +133,46 @@ HealthAssertions assertMessagingStatus(String expectedStatus){ return this; } + /** + * Assert messaging check to have desired status UP. + *
{@code
+     * {
+     *   "status": "UP",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "UP"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ HealthAssertions assertMessagingStatusUp(){ assertMessagingStatus("UP"); return this; } - HealthAssertions assertMessagingStatusDown(){ + /** + * Assert messaging check to have desired status DOWN. + *
{@code
+     * {
+     *   "status": "DOWN",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "DOWN"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ + HealthAssertions assertMessagingStatusDown() { assertMessagingStatus("DOWN"); return this; } diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java index 026b4c0e..7de9d6a4 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java @@ -19,12 +19,15 @@ package org.eclipse.microprofile.reactive.messaging.tck.health; import java.net.URI; +import java.util.ServiceLoader; +import org.eclipse.microprofile.reactive.messaging.tck.ArchiveExtender; import org.eclipse.microprofile.reactive.messaging.tck.metrics.ConfigAsset; import org.eclipse.microprofile.reactive.messaging.tck.metrics.TestConnector; import org.jboss.arquillian.test.api.ArquillianResource; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.jboss.shrinkwrap.api.spec.WebArchive; public class HealthBase { @@ -38,17 +41,26 @@ public class HealthBase { public static final String CHANNEL_CONNECTOR_OUT = "channel-connector-out"; public static final String CHANNEL_INNER = "inner-channel"; - protected static WebArchive prepareArchive(){ + protected static WebArchive prepareArchive(Class... classes) { ConfigAsset config = new ConfigAsset() .put("mp.messaging.incoming.channel-connector-in.connector", TestConnector.ID) .put("mp.messaging.outgoing.channel-connector-out.connector", TestConnector.ID) - .put("mp.health.ready.exclude-channel", READY_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL) - .put("mp.health.live.exclude-channel", LIVE_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL); + .put("mp.messaging.health.ready.exclude", READY_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL) + .put("mp.messaging.health.live.exclude", LIVE_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL); - return ShrinkWrap.create(WebArchive.class, HealthExclusionTest.class.getName() + ".war") - .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") - .addAsResource(config, "META-INF/microprofile-config.properties") - .addClasses(HealthTestBean.class, HealthTestConnector.class, ChannelRegister.class); + JavaArchive testJar = ShrinkWrap + .create(JavaArchive.class, "healthTest.jar") + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsManifestResource(config, "microprofile-config.properties") + .addClasses(HealthTestBean.class, HealthTestConnector.class, ChannelRegister.class) + .addClasses(classes) + .as(JavaArchive.class); + + ServiceLoader.load(ArchiveExtender.class).iterator().forEachRemaining(ext -> ext.extend(testJar)); + + return ShrinkWrap + .create(WebArchive.class, "healthTest.war") + .addAsLibrary(testJar); } @ArquillianResource diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java index 1072d337..11b66ba2 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Cancelled connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthCancelledConnectorInChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java index cfbdd37d..984f3c87 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Cancelled connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthCancelledConnectorOutChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java index 030284e8..62907d9c 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Cancelled inner channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthCancelledInnerChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java index 7bd401c6..fe4cfbea 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Combination of failed and cancelled channels scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to one of the channels and cancel to another
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to the remaining channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthErroredAndCancelledTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java index 29791727..474daefd 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Failed connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to channel with publisher provided by connector
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthErroredConnectorInChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java index 279e6584..467f56c4 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Failed connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to channel with subscriber provided by connector
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthErroredConnectorOutChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java index 03e7d0c4..f462484d 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java @@ -28,6 +28,24 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Failed inner channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthErroredInnerChannelTest extends HealthBase { diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java index 1f06a2d8..1cc1cb0b 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java @@ -28,15 +28,33 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Channel excluded from readiness and liveness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal all channels except the excluded one
  • + *
  • 3. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels except the excluded one
  • + *
  • 4. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 5. Send onSubscribe signal to excluded channel
  • + *
  • 6. Send onError signal to excluded channel
  • + *
  • 7. Assert {@code /health/live} is UP as onError signal has been issued only on excluded channel
  • + *
  • 8. Send onError signal to one of the not excluded channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to the remaining channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthExclusionTest extends HealthBase{ @Inject private ChannelRegister channelRegister; - @Deployment(name = "HealthExclusionTest") + @Deployment public static WebArchive deployment() { - return prepareArchive().addClass(HealthAllExcludedTestBean.class); + return prepareArchive(HealthAllExcludedTestBean.class); } diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java index e540b809..ccd3dcf0 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java @@ -28,15 +28,35 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Channel excluded from liveness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal all channels except the one excluded from liveness check
  • + *
  • 3. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued + * on all the channels, the one is excluded only from liveness
  • + *
  • 4. Send onSubscribe signal to remaining channel
  • + *
  • 5. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels
  • + *
  • 6. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 7. Send onError signal to excluded channel
  • + *
  • 8. Assert {@code /health/live} is UP as onError signal has been issued only on excluded channel
  • + *
  • 9. Send onError signal to one of the not excluded channels
  • + *
  • 10. Assert {@code /health/live} is DOWN as onError signal has been issued
  • + *
  • 11. Assert {@code /health/ready} is still UP
  • + *
  • 12. Send onError signal to the remaining channels
  • + *
  • 13. Assert {@code /health/live} is still DOWN
  • + *
  • 14. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthLiveExclusionTest extends HealthBase{ @Inject private ChannelRegister channelRegister; - @Deployment(name = "HealthLiveExclusionTest") + @Deployment public static WebArchive deployment() { - return prepareArchive().addClass(HealthLiveExcludedTestBean.class); + return prepareArchive(HealthLiveExcludedTestBean.class); } @Test diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java index 6c665443..7f23c5f5 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java @@ -28,15 +28,31 @@ import org.junit.Test; import org.junit.runner.RunWith; +/** + * Channel excluded from readiness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal to all channels except the one excluded from readiness check
  • + *
  • 3. Assert {@code /health/ready} is UP as onSubscribe has been issued to all except the excluded channel
  • + *
  • 4. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 5. Send onSubscribe signal to remaining channel
  • + *
  • 6. Send onError signal to excluded channel
  • + *
  • 7. Assert {@code /health/live} is DOWN as onError signal has been issued channel excluded only from readiness
  • + *
  • 8. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels
  • + *
  • 9. Send onError signal to the remaining channels
  • + *
  • 10. Assert {@code /health/live} is still DOWN
  • + *
  • 11. Assert {@code /health/ready} is still UP
  • + *
+ */ @RunWith(Arquillian.class) public class HealthReadyExclusionTest extends HealthBase{ @Inject private ChannelRegister channelRegister; - @Deployment(name = "HealthReadyExclusionTest") + @Deployment public static WebArchive deployment() { - return prepareArchive().addClass(HealthReadyExcludedTestBean.class); + return prepareArchive(HealthReadyExcludedTestBean.class); } @Test @@ -74,6 +90,7 @@ public void testLiveness() { @InSequence(4) @RunAsClient public void testOneErroredChannelLiveness() { + channelRegister.get(READY_EXCLUDED_CHANNEL).ready(); channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); getLiveness() .assertResponseCodeDown() diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java index 6e4b4e86..17e0dcf7 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java @@ -32,7 +32,7 @@ * Simple manual publisher with guarded state, not thread safe, sequential non-concurrent calls are expected. * @param payload type */ -public class TestPublisher implements Publisher { +class TestPublisher implements Publisher { private final CompletableFuture> subscriberFuture = new CompletableFuture<>(); private final CompletableFuture cancelFuture = new CompletableFuture<>(); @@ -41,11 +41,36 @@ public class TestPublisher implements Publisher { private final AtomicReference state = new AtomicReference<>(State.INIT); private enum State { - INIT, READY, FAILED, CANCELLED, COMPLETED + /** + * Initial state until {@link TestPublisher#ready()} is called. + */ + INIT, + /** + * After {@link TestPublisher#ready()} is called until {@link TestPublisher#fail(Throwable)} or + * {@link TestPublisher#complete()} is invoked or cancel signal is received from downstream. + */ + READY, + /** + * After {@link TestPublisher#fail(Throwable)} is invoked. + */ + FAILED, + /** + * After cancel signal is received from downstream. + */ + CANCELLED, + /** + * After {@link TestPublisher#complete()} is invoked. + */ + COMPLETED } - - public TestPublisher(long timeout, TimeUnit timeUnit) { + /** + * Create new test publisher with timeout for all blocking operations. + * + * @param timeout timeout value + * @param timeUnit unit for evaluation of timeout value + */ + TestPublisher(long timeout, TimeUnit timeUnit) { this.timeout = timeout; this.timeUnit = timeUnit; } @@ -55,7 +80,13 @@ public void subscribe(final Subscriber subscriber) { subscriberFuture.complete(subscriber); } - public TestPublisher ready() { + /** + * Block until subscriber is available + * and send onSubscribe signal to downstream if publisher is in {@link State#INIT INIT} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher ready() { if (state.compareAndSet(State.INIT, State.READY)) { awaitSubscriber().onSubscribe(new Subscription() { @Override @@ -75,26 +106,38 @@ public void cancel() { return this; } - public TestPublisher emit(PAYLOAD payload) { - awaitSubscriber().onNext(payload); - return this; - } - - public TestPublisher fail(Throwable t) { + /** + * Block until subscriber is available + * and send onError signal to downstream if publisher is in {@link State#READY READY} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher fail(Throwable t) { if (state.compareAndSet(State.READY, State.FAILED)) { awaitSubscriber().onError(t); } return this; } - public TestPublisher complete() { + /** + * Block until subscriber is available + * and send onComplete signal to downstream if publisher is in {@link State#READY READY} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher complete() { if (state.compareAndSet(State.READY, State.COMPLETED)) { awaitSubscriber().onComplete(); } return this; } - public TestPublisher awaitCancel() { + /** + * Block until cancel signal is received from the subscriber. + * + * @return this test publisher + */ + TestPublisher awaitCancel() { try { cancelFuture.get(timeout, timeUnit); return this; @@ -104,6 +147,12 @@ public TestPublisher awaitCancel() { } } + /** + * Wait for a subscriber, + * block till {@link TestPublisher#subscribe(org.reactivestreams.Subscriber) subscribe} method is invoked. + * + * @return subscriber of this publisher + */ private Subscriber awaitSubscriber() { try { return subscriberFuture.get(timeout, timeUnit); diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java index d01b25cd..24d5fbb0 100644 --- a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java @@ -18,9 +18,7 @@ */ package org.eclipse.microprofile.reactive.messaging.tck.health; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -28,16 +26,26 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -public class TestSubscriber implements Subscriber { - - private CompletableFuture subscription = new CompletableFuture<>(); - private CompletableFuture error = new CompletableFuture<>(); - private CompletableFuture complete = new CompletableFuture<>(); - private List receivedItems = new CopyOnWriteArrayList<>(); - private long timeout; - private TimeUnit timeoutUnit; - - public TestSubscriber(long timeout, TimeUnit timeoutUnit) { +/** + * Test subscriber able to block until desired signal from upstream is received. + * + * @param payload type + */ +class TestSubscriber implements Subscriber { + + private final CompletableFuture subscription = new CompletableFuture<>(); + private final CompletableFuture error = new CompletableFuture<>(); + private final CompletableFuture complete = new CompletableFuture<>(); + private final long timeout; + private final TimeUnit timeoutUnit; + + /** + * Create new test subscriber with timeout for all blocking operations. + * + * @param timeout timeout value + * @param timeoutUnit unit for evaluation of timeout value + */ + TestSubscriber(long timeout, TimeUnit timeoutUnit) { this.timeout = timeout; this.timeoutUnit = timeoutUnit; } @@ -49,7 +57,7 @@ public void onSubscribe(final Subscription subscription) { @Override public void onNext(final PAYLOAD payload) { - receivedItems.add(payload); + //noop } @Override @@ -62,15 +70,11 @@ public void onComplete() { complete.complete(null); } - private Subscription getSubscription() { - try { - return this.subscription.get(timeout, timeoutUnit); - } - catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new RuntimeException(e); - } - } - + /** + * Block until onError signal is received from upstream. + * + * @return cause of the error signal + */ public Throwable awaitError() { try { return this.error.get(timeout, timeoutUnit); @@ -80,6 +84,9 @@ public Throwable awaitError() { } } + /** + * Block until onComplete signal is received from upstream. + */ public void awaitCompletion() { try { this.complete.get(timeout, timeoutUnit); @@ -89,24 +96,44 @@ public void awaitCompletion() { } } + /** + * Block until onSubscribe signal is received from upstream. + * + * @return this + */ public ReadyManualSubscriber awaitSubscription() { this.getSubscription(); return new ReadyManualSubscriber(this); } - public static class ReadyManualSubscriber { + private Subscription getSubscription() { + try { + return this.subscription.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * Wrapped test subscriber with capability of sending cancel signal. + * + * @param payload type + */ + static class ReadyManualSubscriber { private final TestSubscriber testSubscriber; - public ReadyManualSubscriber(final TestSubscriber testSubscriber) { + ReadyManualSubscriber(final TestSubscriber testSubscriber) { this.testSubscriber = testSubscriber; } - public ReadyManualSubscriber request(long n) { - this.testSubscriber.getSubscription().request(n); - return this; - } - + /** + * Block until onSubscribe signal is received from upstream, + * then send cancel signal to upstream. + * + * @return this ready subscriber + */ public ReadyManualSubscriber cancel() { this.testSubscriber.getSubscription().cancel(); return this; From 923a1057c424724aa31f24b2f04b1221c430351c Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 25 Feb 2021 22:20:16 +0100 Subject: [PATCH 4/4] Review issues * Revert trailing whitespaces Signed-off-by: Daniel Kec --- tck/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tck/pom.xml b/tck/pom.xml index 8325aae8..e83b0dff 100644 --- a/tck/pom.xml +++ b/tck/pom.xml @@ -71,19 +71,19 @@ microprofile-reactive-streams-operators-api provided
- + org.eclipse.microprofile.config microprofile-config-api provided - + jakarta.enterprise jakarta.enterprise.cdi-api provided - + jakarta.annotation jakarta.annotation-api