Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6220a66
1. Reduce query stuttering effect during range topology change
popduke Sep 27, 2025
a185edc
Refactor BatchPubCall and related classes to improve fan-out handling…
popduke Sep 27, 2025
0a8b025
Don't report redundant ServerBusy event
popduke Sep 27, 2025
239e7e5
Reduce loading time by making DistWorkerCoProc/InboxStoreCoProc reset…
popduke Sep 27, 2025
f297a9f
Reduce memory overhead by doing contract to root after remove
popduke Sep 17, 2025
65622a0
1. Refactor base-kv to support report leader change to CoProc
popduke Sep 29, 2025
f0f80fb
Optimize retry logic for pushing qos1/2 messages and adjust related e…
popduke Oct 1, 2025
ba00d47
New event for matching retained messages successfully and related tests
popduke Oct 1, 2025
d307161
Add createdAt field to InboxMetadata and implement drop event reporti…
popduke Oct 1, 2025
5ab3bbc
Add an api to retrieve inbox state
popduke Oct 2, 2025
7bd8c4f
Reduce memory overhead during resetting
popduke Oct 2, 2025
d9774d4
1. fixed the bug causing DistWorkerCleaner stop running
popduke Oct 2, 2025
7952cc8
Conditional clear batch call state during reset to avoid task leaking
popduke Oct 3, 2025
80949f4
Prevent concurrent update non-thread safe result proto builder
popduke Oct 3, 2025
700b9a8
Improved child removal logic and add benchmarking for performance eva…
popduke Oct 5, 2025
b91e437
Add set session type to ClientInfo before making attach request
popduke Oct 6, 2025
44a78bd
Enhance DistWorkerCleaner with heuristic interval and step
popduke Oct 6, 2025
7ded554
Implemented dynamic sending window for confirmable messages
popduke Oct 6, 2025
6f8b8ca
Improve gossiping:
popduke Oct 6, 2025
5bb788f
Optimize child branch detachment and compression logic in TopicLevelTrie
popduke Oct 10, 2025
78f576b
1. Enhance TopicIndex to support custom value equality strategy
popduke Oct 10, 2025
a2e0483
Update tenant stats on inbox clearance and add integration test for s…
popduke Oct 10, 2025
aed5691
Refactor InboxMetaCache to reduce refresh/seek operation
popduke Oct 10, 2025
d6e496b
Correct range lookup key for InboxCheckSubScheduler
popduke Oct 10, 2025
49f451f
1. ensure strict fifo order for check permission call
popduke Oct 10, 2025
335b46c
Schedule an on-demand hint/confirm timeout to ensure fetch loop non-stop
popduke Oct 11, 2025
5c18f87
Add @JsonMerge annotation to service configuration classes to reduce …
popduke Oct 12, 2025
cb27fa2
Reduce IO overhead for InboxStoreCoProc
popduke Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.messenger;

import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -41,59 +40,20 @@
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;

@Slf4j
final class Gossiper {

@Builder
private static class GossipState {
public final GossipMessage message;

public final long infectionPeriod;

private final CompletableFuture<Duration> spreadSuccessSignal = new CompletableFuture<>();

private final long start = System.nanoTime();

private final Set<InetSocketAddress> infected = new HashSet<>();

@Getter
public boolean confirmed;

void addInfectedAddress(InetSocketAddress address) {
infected.add(address);
}

boolean isInfected(InetSocketAddress address) {
return infected.contains(address);
}

CompletableFuture<Duration> spreadSuccessSignal() {
return spreadSuccessSignal;
}

void confirmSpreadSuccess() {
spreadSuccessSignal.complete(Duration.ofNanos(System.nanoTime() - start));
confirmed = true;
}
}

private long currentPeriod = 0;
private long gossipCounter = 0;
// nanos
private long prevPeriodTime = -1;

private final Cache<String, GossipState> currentGossips;

private final String id;

private final int retransmitMultiplier;

private final Duration spreadPeriod;

private final Subject<GossipMessage> gossipPublisher;

private final Observable<Timed<GossipMessage>> gossipSink;
private long currentPeriod = 0;
private long gossipCounter = 0;
// nanos
private long prevPeriodTime = -1;

Gossiper(String id, int retransmitMultiplier, Duration spreadPeriod, Scheduler scheduler) {
// global unique id of the gossiper
Expand Down Expand Up @@ -140,28 +100,33 @@ public long nextPeriod(int totalGossipers) {
prevPeriodTime = currentPeriodTime;
// If time interval between current and previous period is too long, do confirm or sweep in advance
if (elapsedPeriod > periodsToSweep) {
log.warn("Too many elapsed periods, sweep gossips in advance, currentPeriod={}, elapsedPeriod={}",
currentPeriod, elapsedPeriod);
sweepSpreadGossips(gossipState -> true);
// Avoid sweeping when isolated to prevent information loss
// in case the node is temporarily partitioned.
if (totalGossipers > 1) {
log.warn("Too many elapsed periods, sweep gossips in advance, currentPeriod={}, elapsedPeriod={}",
currentPeriod, elapsedPeriod);
sweepSpreadGossips(gossipState -> true);
}
return currentPeriod;
}
if (elapsedPeriod > periodsToSpread) {
log.warn("Some gossips are too old to spread, confirm gossips in advance, "
+ "currentPeriod={}, elapsedPeriod={}",
currentPeriod, elapsedPeriod);
double confirmRatio = elapsedPeriod / (double) periodsToSweep;
confirmGossipsSpread(gossipState ->
!gossipState.confirmed && (
ThreadLocalRandom.current().nextDouble() < confirmRatio ||
currentPeriod > gossipState.infectionPeriod + periodsToSpread)
);
// Do not early-confirm when isolated (no other gossipers).
if (totalGossipers > 1) {
log.warn("Some gossips are too old to spread, confirm gossips in advance, "
+ "currentPeriod={}, elapsedPeriod={}", currentPeriod, elapsedPeriod);
double confirmRatio = elapsedPeriod / (double) periodsToSweep;
confirmGossipsSpread(gossipState ->
!gossipState.confirmed && (ThreadLocalRandom.current().nextDouble() < confirmRatio
|| currentPeriod > gossipState.infectionPeriod + periodsToSpread)
);
}
return currentPeriod;
}
}
// do normal period action
sweepSpreadGossips(gossipState -> currentPeriod > gossipState.infectionPeriod + periodsToSweep);
confirmGossipsSpread(gossipState -> !gossipState.confirmed &&
currentPeriod > gossipState.infectionPeriod + periodsToSpread);
confirmGossipsSpread(gossipState -> !gossipState.confirmed
&& currentPeriod > gossipState.infectionPeriod + periodsToSpread);
prevPeriodTime = currentPeriodTime;
return currentPeriod;
}
Expand Down Expand Up @@ -217,4 +182,37 @@ int gossipPeriodsToSweep(int retransmitMultiplier, int totalGossipers) {
int ceilLog2(int num) {
return num <= 1 ? 1 : 32 - Integer.numberOfLeadingZeros(num - 1);
}

@Builder
private static class GossipState {
public final GossipMessage message;

public final long infectionPeriod;

private final CompletableFuture<Duration> spreadSuccessSignal = new CompletableFuture<>();

private final long start = System.nanoTime();

private final Set<InetSocketAddress> infected = new HashSet<>();

@Getter
public boolean confirmed;

void addInfectedAddress(InetSocketAddress address) {
infected.add(address);
}

boolean isInfected(InetSocketAddress address) {
return infected.contains(address);
}

CompletableFuture<Duration> spreadSuccessSignal() {
return spreadSuccessSignal;
}

void confirmSpreadSuccess() {
spreadSuccessSignal.complete(Duration.ofNanos(System.nanoTime() - start));
confirmed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.messenger;

import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import java.net.InetSocketAddress;
import lombok.Builder;
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;

@Builder
class MessengerMessageEnvelope {
public final MessengerMessage message;
public final InetSocketAddress recipient;
public final InetSocketAddress sender; // null if gossip
public final InetSocketAddress sender;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.messenger;

import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.transport.PacketEnvelope;
import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.InvalidProtocolBufferException;
import io.reactivex.rxjava3.core.Observable;
Expand All @@ -33,6 +30,9 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.transport.PacketEnvelope;

@Slf4j
final class MessengerTransport {
Expand Down Expand Up @@ -61,25 +61,13 @@ Observable<Timed<MessengerMessageEnvelope>> receive() {

private Observable<Timed<MessengerMessageEnvelope>> convert(PacketEnvelope packetEnvelope) {
return Observable.fromIterable(packetEnvelope.data.stream().map(b -> {
MessengerMessageEnvelope.MessengerMessageEnvelopeBuilder messageEnvelopeBuilder =
MessengerMessageEnvelope.builder()
.recipient(packetEnvelope.recipient);
try {
MessengerMessage mm = MessengerMessage.parseFrom(b);
messageEnvelopeBuilder.message(mm);
switch (mm.getMessengerMessageTypeCase()) {
case DIRECT:
return new Timed<MessengerMessageEnvelope>(
messageEnvelopeBuilder.sender(packetEnvelope.sender).build(),
System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
case GOSSIP:
default:
return new Timed<MessengerMessageEnvelope>(
messageEnvelopeBuilder.build(),
System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
MessengerMessageEnvelope mmEnvelop = MessengerMessageEnvelope.builder()
.recipient(packetEnvelope.recipient)
.message(MessengerMessage.parseFrom(b))
.sender(packetEnvelope.sender)
.build();
return new Timed<>(mmEnvelop, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InvalidProtocolBufferException e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import io.reactivex.rxjava3.observers.TestObserver;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -362,10 +363,10 @@ public void testAgentClusterPartitionAndHealing() {
IAgentMember agentMember1OnS1 = agentOnS1.register("agentNode1OnS1");
agentMember1OnS1.metadata(copyFromUtf8("agentNode1OnS1"));
IAgentMember agentMember2OnS1 = agentOnS1.register("agentNode2OnS1");
agentMember1OnS1.metadata(copyFromUtf8("agentNode2OnS1"));
agentMember2OnS1.metadata(copyFromUtf8("agentNode2OnS1"));

IAgentMember agentMemberOnS2 = agentOnS2.register("agentNodeOnS2");
agentMember2OnS1.metadata(copyFromUtf8("agentNodeOnS2"));
agentMemberOnS2.metadata(copyFromUtf8("agentNodeOnS2"));

IAgentMember agentMemberOnS3 = agentOnS3.register("agentNodeOnS3");
agentMemberOnS3.metadata(copyFromUtf8("agentNodeOnS3"));
Expand All @@ -374,17 +375,17 @@ public void testAgentClusterPartitionAndHealing() {
await().until(() -> agentOnS2.membership().blockingFirst().size() == 4);
await().until(() -> agentOnS3.membership().blockingFirst().size() == 4);

// isolate s2 from others
// isolate s1 from others
storeMgr.isolate("s1");
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 2);
await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 2);
await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 2);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS1.membership().blockingFirst().size() == 2);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS2.membership().blockingFirst().size() == 2);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS3.membership().blockingFirst().size() == 2);

// integrate s1 into the cluster
storeMgr.integrate("s1");
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 4);
await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 4);
await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 4);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS1.membership().blockingFirst().size() == 4);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS2.membership().blockingFirst().size() == 4);
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS3.membership().blockingFirst().size() == 4);
}

@StoreCfgs(stores = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,24 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basekv.raft;

import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.bifromq.basekv.raft.event.CommitEvent;
import org.apache.bifromq.basekv.raft.event.ElectionEvent;
import org.apache.bifromq.basekv.raft.event.SnapshotRestoredEvent;
Expand All @@ -34,19 +47,6 @@
import org.apache.bifromq.basekv.raft.proto.RequestVoteReply;
import org.apache.bifromq.basekv.raft.proto.Snapshot;
import org.apache.bifromq.basekv.raft.proto.Voting;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.slf4j.Logger;

abstract class RaftNodeState implements IRaftNodeState {
Expand Down Expand Up @@ -248,7 +248,7 @@ protected void submitSnapshot(ByteString requested, String fromLeader) {
(installed, ex) -> onSnapshotInstalled.done(requested, installed, ex));
}

protected void notifyCommit() {
protected void notifyCommit(boolean isLeader) {
log.trace("Notify commit index[{}]", commitIndex);
for (Iterator<Map.Entry<Long, ProposeTask>> it = uncommittedProposals.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Long, ProposeTask> entry = it.next();
Expand Down Expand Up @@ -282,7 +282,7 @@ protected void notifyCommit() {
task.future.completeExceptionally(DropProposalException.overridden());
}
}
listener.onEvent(new CommitEvent(id, commitIndex));
listener.onEvent(new CommitEvent(id, commitIndex, isLeader));
}

protected void notifyLeaderElected(String leaderId, long term) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ RaftNodeState stableTo(long stabledIndex) {
if (task.committed) {
commitIndex = index;
log.trace("Advanced commitIndex[{}]", commitIndex);
notifyCommit();
notifyCommit(false);
}
toRemove.add(index);
} else {
Expand Down Expand Up @@ -531,7 +531,7 @@ private void handleAppendEntries(String fromLeader, AppendEntries appendEntries)
log.trace("Advanced commitIndex[from:{},to:{}]", commitIndex, newCommitIndex);
commitIndex = newCommitIndex;
// report to application
notifyCommit();
notifyCommit(false);
} else {
// entries between lastIndex and newCommitIndex missing, probably because the channel between
// leader and follower is lossy.
Expand All @@ -554,7 +554,7 @@ private void handleAppendEntries(String fromLeader, AppendEntries appendEntries)
stabilizingIndexes.firstKey(), commitIndex, newCommitIndex);
commitIndex = newCommitIndex;
// report to application
notifyCommit();
notifyCommit(false);
} else {
if (newCommitIndex > stabilizingIndexes.lastKey()) {
// if the newCommitIndex is greater than the largest local stabilizing index
Expand Down
Loading
Loading