Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,10 @@ public void testAgentNodesInTwoStores() {

await().until(() -> {
Map<AgentMemberAddr, AgentMemberMetadata> agentMembers = agentOnS1.membership().blockingFirst();
log.info("S1: {}", agentMembers);
return agentMembers.size() == 1;
});
await().until(() -> {
Map<AgentMemberAddr, AgentMemberMetadata> agentMembers = agentOnS2.membership().blockingFirst();
log.info("S2: {}", agentMembers);
return agentMembers.size() == 1;
});

Expand Down Expand Up @@ -229,20 +227,16 @@ public void testAgentNodesInThreeStores() {
await().until(() -> agentOnS3.membership().blockingFirst().size() == 4);

// unhost agentNode2 from s1
log.info("Stop hosting agentNode11 from s1");
agentOnS1.deregister(agentMember11OnS1);
await().until(() -> agentOnS1.membership().blockingFirst().size() == 3);
await().until(() -> agentOnS2.membership().blockingFirst().size() == 3);
await().until(() -> agentOnS3.membership().blockingFirst().size() == 3);

log.info("Stop hosting agentNode1 from s1");
// unhost agentNode 1 from s1
agentOnS1.deregister(agentMember1OnS1);
await().until(() -> agentOnS2.membership().blockingFirst().size() == 2);
await().until(() -> agentOnS3.membership().blockingFirst().size() == 2);


log.info("Re-hosting agentNode1 from s1 with different metadata attached");
// re-host agentNode1 in s1 with different metadata
agentOnS1.register("agentNode1");
agentMember1OnS1 = agentOnS1.register("agentNode1");
Expand Down Expand Up @@ -344,7 +338,6 @@ public void testHostClusterPartitionAndHealing() {
await().until(() -> host3.membership().blockingFirst().size() == 3);

// isolate s1 from others
log.info("isolate s1");
storeMgr.isolate("s1");
await().forever().until(() -> host1.membership().blockingFirst().size() == 1);
await().forever().until(() -> host2.membership().blockingFirst().size() == 2);
Expand Down Expand Up @@ -382,13 +375,11 @@ public void testAgentClusterPartitionAndHealing() {
await().until(() -> agentOnS3.membership().blockingFirst().size() == 4);

// isolate s2 from others
log.info("isolate s1");
storeMgr.isolate("s1");
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 2);
await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 2);
await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 2);

log.info("integrate s1");
// integrate s1 into the cluster
storeMgr.integrate("s1");
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.agent.proto.AgentMemberAddr;
Expand Down Expand Up @@ -101,6 +102,9 @@ public void crash(String hostId) {
crashedHostMap.put(crashedEndpoint, crashedAgentHost);
ITransport transport = hostTransportMap.remove(hostId);
crashedHostTransportMap.put(hostId, transport);
AgentHostMeta meta = hostMetaMap.get(hostId);
// bump crdt store id
meta.options.crdtStoreOptions().id(UUID.randomUUID().toString());
}

public void integrate(String hostId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public void teardown(Method method) {
if (storeMgr != null) {
log.info("Shutting down test cluster");
AgentTestCluster lastStoreMgr = this.storeMgr;
// run in a separate thread to avoid blocking the test thread
new Thread(lastStoreMgr::shutdown).start();
lastStoreMgr.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.memberlist;
Expand All @@ -34,10 +34,6 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
import org.apache.bifromq.basecluster.messenger.IMessenger;
import org.apache.bifromq.basecluster.messenger.MessageEnvelope;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.schedulers.Timed;
Expand All @@ -51,6 +47,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
import org.apache.bifromq.basecluster.messenger.IMessenger;
import org.apache.bifromq.basecluster.messenger.MessageEnvelope;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -118,9 +118,11 @@ public void join() {
public void joinKnownEndpoint() {
AutoSeeder seeder =
new AutoSeeder(messenger, scheduler, memberList, addressResolver, joinTimeout, joinInterval);
membersSubject.onNext(new HashMap<>() {{
put(REMOTE_HOST_1_ENDPOINT, 0);
}});
membersSubject.onNext(new HashMap<>() {
{
put(REMOTE_HOST_1_ENDPOINT, 0);
}
});
try {
seeder.join(Collections.singleton(REMOTE_ADDR_1)).join();
verify(messenger, atMost(0)).send(any(), any(), anyBoolean());
Expand All @@ -131,14 +133,16 @@ public void joinKnownEndpoint() {

@Test
public void stopJoinEarlier() {
when(messenger.send(any(), any(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
AutoSeeder seeder =
new AutoSeeder(messenger, scheduler, memberList, addressResolver, joinTimeout, joinInterval);
CompletableFuture<Void> joinResult = seeder.join(Collections.singleton(REMOTE_ADDR_1));
when(messenger.send(any(), any(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
verify(messenger, timeout(200).atLeast(1)).send(any(), any(), anyBoolean());
membersSubject.onNext(new HashMap<>() {{
put(REMOTE_HOST_1_ENDPOINT, 0);
}});
membersSubject.onNext(new HashMap<>() {
{
put(REMOTE_HOST_1_ENDPOINT, 0);
}
});
joinResult.join();
}
}
Loading