diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java index 973609d57..3ee15525b 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java @@ -174,12 +174,10 @@ public void testAgentNodesInTwoStores() { await().until(() -> { Map agentMembers = agentOnS1.membership().blockingFirst(); - log.info("S1: {}", agentMembers); return agentMembers.size() == 1; }); await().until(() -> { Map agentMembers = agentOnS2.membership().blockingFirst(); - log.info("S2: {}", agentMembers); return agentMembers.size() == 1; }); @@ -229,20 +227,16 @@ public void testAgentNodesInThreeStores() { await().until(() -> agentOnS3.membership().blockingFirst().size() == 4); // unhost agentNode2 from s1 - log.info("Stop hosting agentNode11 from s1"); agentOnS1.deregister(agentMember11OnS1); await().until(() -> agentOnS1.membership().blockingFirst().size() == 3); await().until(() -> agentOnS2.membership().blockingFirst().size() == 3); await().until(() -> agentOnS3.membership().blockingFirst().size() == 3); - log.info("Stop hosting agentNode1 from s1"); // unhost agentNode 1 from s1 agentOnS1.deregister(agentMember1OnS1); await().until(() -> agentOnS2.membership().blockingFirst().size() == 2); await().until(() -> agentOnS3.membership().blockingFirst().size() == 2); - - log.info("Re-hosting agentNode1 from s1 with different metadata attached"); // re-host agentNode1 in s1 with different metadata agentOnS1.register("agentNode1"); agentMember1OnS1 = agentOnS1.register("agentNode1"); @@ -344,7 +338,6 @@ public void testHostClusterPartitionAndHealing() { await().until(() -> host3.membership().blockingFirst().size() == 3); // isolate s1 from others - log.info("isolate s1"); storeMgr.isolate("s1"); await().forever().until(() -> host1.membership().blockingFirst().size() == 1); await().forever().until(() -> host2.membership().blockingFirst().size() == 2); @@ -382,13 +375,11 @@ public void testAgentClusterPartitionAndHealing() { await().until(() -> agentOnS3.membership().blockingFirst().size() == 4); // isolate s2 from others - log.info("isolate s1"); storeMgr.isolate("s1"); await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 2); await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 2); await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 2); - log.info("integrate s1"); // integrate s1 into the cluster storeMgr.integrate("s1"); await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 4); diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java index 399dbd924..1c9fc09c0 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bifromq.basecluster.agent.proto.AgentMemberAddr; @@ -101,6 +102,9 @@ public void crash(String hostId) { crashedHostMap.put(crashedEndpoint, crashedAgentHost); ITransport transport = hostTransportMap.remove(hostId); crashedHostTransportMap.put(hostId, transport); + AgentHostMeta meta = hostMetaMap.get(hostId); + // bump crdt store id + meta.options.crdtStoreOptions().id(UUID.randomUUID().toString()); } public void integrate(String hostId) { diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java index 369cd9415..e8ac5631b 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java @@ -79,8 +79,7 @@ public void teardown(Method method) { if (storeMgr != null) { log.info("Shutting down test cluster"); AgentTestCluster lastStoreMgr = this.storeMgr; - // run in a separate thread to avoid blocking the test thread - new Thread(lastStoreMgr::shutdown).start(); + lastStoreMgr.shutdown(); } } diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java index 6aa8b9e48..f69bd5188 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java @@ -14,7 +14,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.bifromq.basecluster.memberlist; @@ -34,10 +34,6 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import org.apache.bifromq.basecluster.membership.proto.HostEndpoint; -import org.apache.bifromq.basecluster.messenger.IMessenger; -import org.apache.bifromq.basecluster.messenger.MessageEnvelope; -import org.apache.bifromq.basecluster.proto.ClusterMessage; import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Timed; @@ -51,6 +47,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; +import org.apache.bifromq.basecluster.membership.proto.HostEndpoint; +import org.apache.bifromq.basecluster.messenger.IMessenger; +import org.apache.bifromq.basecluster.messenger.MessageEnvelope; +import org.apache.bifromq.basecluster.proto.ClusterMessage; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -118,9 +118,11 @@ public void join() { public void joinKnownEndpoint() { AutoSeeder seeder = new AutoSeeder(messenger, scheduler, memberList, addressResolver, joinTimeout, joinInterval); - membersSubject.onNext(new HashMap<>() {{ - put(REMOTE_HOST_1_ENDPOINT, 0); - }}); + membersSubject.onNext(new HashMap<>() { + { + put(REMOTE_HOST_1_ENDPOINT, 0); + } + }); try { seeder.join(Collections.singleton(REMOTE_ADDR_1)).join(); verify(messenger, atMost(0)).send(any(), any(), anyBoolean()); @@ -131,14 +133,16 @@ public void joinKnownEndpoint() { @Test public void stopJoinEarlier() { + when(messenger.send(any(), any(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null)); AutoSeeder seeder = new AutoSeeder(messenger, scheduler, memberList, addressResolver, joinTimeout, joinInterval); CompletableFuture joinResult = seeder.join(Collections.singleton(REMOTE_ADDR_1)); - when(messenger.send(any(), any(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null)); verify(messenger, timeout(200).atLeast(1)).send(any(), any(), anyBoolean()); - membersSubject.onNext(new HashMap<>() {{ - put(REMOTE_HOST_1_ENDPOINT, 0); - }}); + membersSubject.onNext(new HashMap<>() { + { + put(REMOTE_HOST_1_ENDPOINT, 0); + } + }); joinResult.join(); } }