diff --git a/bifromq-dist/bifromq-dist-coproc-proto/pom.xml b/bifromq-dist/bifromq-dist-coproc-proto/pom.xml
index 6c0ecc7b5..ec34f3778 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/pom.xml
+++ b/bifromq-dist/bifromq-dist-coproc-proto/pom.xml
@@ -34,6 +34,11 @@
org.apache.bifromq
bifromq-common-type
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
org.apache.bifromq
bifromq-dist-worker-schema
@@ -47,6 +52,10 @@
org.openjdk.jmh
jmh-generator-annprocess
+
+ org.awaitility
+ awaitility
+
org.apache.logging.log4j
log4j-api
@@ -71,4 +80,4 @@
-
\ No newline at end of file
+
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ITopicFilterIterator.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ITopicFilterIterator.java
index a79402bc9..1286a83af 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ITopicFilterIterator.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ITopicFilterIterator.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
@@ -29,7 +29,19 @@
*
* @param the value type for topic associated value
*/
-public interface ITopicFilterIterator {
+public interface ITopicFilterIterator extends AutoCloseable {
+ /**
+ * Init the iterator with the given root node.
+ *
+ * @param root the root node of the topic trie
+ */
+ void init(TopicTrieNode root);
+
+ /**
+ * Reset the iterator after using.
+ */
+ void close();
+
/**
* Seek to the given topic filter levels, so that the next topic filter is greater or equals to the given topic
* filter levels.
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/MTopicFilterTrieNode.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/MTopicFilterTrieNode.java
index 104c4eac1..ce97b6f4d 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/MTopicFilterTrieNode.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/MTopicFilterTrieNode.java
@@ -14,18 +14,23 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
-import static java.util.Collections.emptySet;
-import static java.util.Collections.singleton;
-import com.google.common.collect.Sets;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Multi-level topic filter trie node.
@@ -33,15 +38,81 @@
* @param value type
*/
final class MTopicFilterTrieNode extends TopicFilterTrieNode {
- private final Set> backingTopics;
+ private static final ConcurrentLinkedDeque KEYS = new ConcurrentLinkedDeque<>();
+ private static final AtomicLong SEQ = new AtomicLong();
+ private static volatile Ticker TICKER = Ticker.systemTicker();
+ private static final Cache> POOL = Caffeine.newBuilder()
+ .expireAfterAccess(EXPIRE_AFTER)
+ .recordStats()
+ .scheduler(Scheduler.systemScheduler())
+ .ticker(() -> TICKER.read())
+ .removalListener((Long key, MTopicFilterTrieNode> value, RemovalCause cause) -> {
+ KEYS.remove(key);
+ if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
+ value.recycle();
+ }
+ })
+ .build();
+
+ private final Set> backingTopics = new HashSet<>();
+
+ MTopicFilterTrieNode() {
+ }
+
+ static MTopicFilterTrieNode borrow(TopicFilterTrieNode parent,
+ Set> siblingTopicTrieNodes) {
+ while (true) {
+ Long key = KEYS.pollFirst();
+ if (key == null) {
+ break;
+ }
+ @SuppressWarnings("unchecked")
+ MTopicFilterTrieNode pooled = (MTopicFilterTrieNode) POOL.asMap().remove(key);
+ if (pooled != null) {
+ return pooled.init(parent, siblingTopicTrieNodes);
+ }
+ }
+ MTopicFilterTrieNode node = new MTopicFilterTrieNode<>();
+ return node.init(parent, siblingTopicTrieNodes);
+ }
+
+ static void release(MTopicFilterTrieNode> node) {
+ node.recycle();
+ long key = SEQ.incrementAndGet();
+ KEYS.offerLast(key);
+ POOL.put(key, node);
+ }
+
+ // test hooks (package-private)
+ static void poolClear() {
+ POOL.invalidateAll();
+ POOL.cleanUp();
+ KEYS.clear();
+ }
+
+ static void poolCleanUp() {
+ POOL.cleanUp();
+ }
+
+ static int poolApproxSize() {
+ return KEYS.size();
+ }
+
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker != null ? ticker : Ticker.systemTicker();
+ }
- MTopicFilterTrieNode(TopicFilterTrieNode parent, Set> siblingTopicTrieNodes) {
- super(parent);
- Set> topics = parent != null ? parent.backingTopics() : emptySet();
+ MTopicFilterTrieNode init(TopicFilterTrieNode parent, Set> siblingTopicTrieNodes) {
+ assert siblingTopicTrieNodes != null;
+ this.parent = parent;
+ backingTopics.clear();
+ if (parent != null) {
+ backingTopics.addAll(parent.backingTopics());
+ }
for (TopicTrieNode sibling : siblingTopicTrieNodes) {
- topics = collectTopics(sibling, topics);
+ collectTopics(sibling);
}
- backingTopics = topics;
+ return this;
}
@Override
@@ -54,17 +125,15 @@ Set> backingTopics() {
return backingTopics;
}
- private Set> collectTopics(TopicTrieNode node, Set> topics) {
+ private void collectTopics(TopicTrieNode node) {
if (node.isUserTopic()) {
- topics = Sets.union(topics, singleton(node));
+ backingTopics.add(node);
}
for (TopicTrieNode child : node.children().values()) {
- topics = collectTopics(child, topics);
+ collectTopics(child);
}
- return topics;
}
-
@Override
void seekChild(String childLevelName) {
@@ -104,4 +173,9 @@ void prevChild() {
TopicFilterTrieNode childNode() {
throw new NoSuchElementException();
}
+
+ private void recycle() {
+ parent = null;
+ backingTopics.clear();
+ }
}
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/NTopicFilterTrieNode.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/NTopicFilterTrieNode.java
index 9149e6238..72b40ec88 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/NTopicFilterTrieNode.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/NTopicFilterTrieNode.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
@@ -22,6 +22,11 @@
import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
import static org.apache.bifromq.util.TopicConst.SINGLE_WILDCARD;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.github.benmanes.caffeine.cache.Ticker;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
@@ -30,6 +35,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Normal level topic filter trie node.
@@ -37,24 +44,89 @@
* @param value type
*/
final class NTopicFilterTrieNode extends TopicFilterTrieNode {
- private final String levelName;
- private final NavigableSet subLevelNames;
- private final NavigableMap>> subTopicTrieNodes;
- private final Set> subWildcardMatchableTopicTrieNodes;
- private final Set> backingTopics;
-
+ private static final ConcurrentLinkedDeque KEYS = new ConcurrentLinkedDeque<>();
+ private static final AtomicLong SEQ = new AtomicLong();
+ private static volatile Ticker TICKER = Ticker.systemTicker();
+ private static final Cache> POOL = Caffeine.newBuilder()
+ .expireAfterAccess(EXPIRE_AFTER)
+ .recordStats()
+ .scheduler(Scheduler.systemScheduler())
+ .ticker(() -> TICKER.read())
+ .removalListener((Long key, NTopicFilterTrieNode> value, RemovalCause cause) -> {
+ KEYS.remove(key);
+ if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
+ value.recycle();
+ }
+ })
+ .build();
+
+ private final NavigableSet subLevelNames = new TreeSet<>();
+ private final NavigableMap>> subTopicTrieNodes = new TreeMap<>();
+ private final Set> subWildcardMatchableTopicTrieNodes = new HashSet<>();
+ private final Set> backingTopics = new HashSet<>();
+ private String levelName;
// point to the sub node during iteration
private String subLevelName;
- NTopicFilterTrieNode(TopicFilterTrieNode parent, String levelName, Set> siblingTopicTrieNodes) {
- super(parent);
+ NTopicFilterTrieNode() {
+ }
+
+ static NTopicFilterTrieNode borrow(TopicFilterTrieNode parent,
+ String levelName,
+ Set> siblingTopicTrieNodes) {
+ while (true) {
+ Long key = KEYS.pollFirst();
+ if (key == null) {
+ break;
+ }
+ @SuppressWarnings("unchecked")
+ NTopicFilterTrieNode pooled = (NTopicFilterTrieNode) POOL.asMap().remove(key);
+ if (pooled != null) {
+ return pooled.init(parent, levelName, siblingTopicTrieNodes);
+ }
+ }
+ NTopicFilterTrieNode node = new NTopicFilterTrieNode<>();
+ return node.init(parent, levelName, siblingTopicTrieNodes);
+ }
+
+ static void release(NTopicFilterTrieNode> node) {
+ node.recycle();
+ long key = SEQ.incrementAndGet();
+ KEYS.offerLast(key);
+ POOL.put(key, node);
+ }
+
+ // test hooks (package-private)
+ static void poolClear() {
+ POOL.invalidateAll();
+ POOL.cleanUp();
+ KEYS.clear();
+ }
+
+ static void poolCleanUp() {
+ POOL.cleanUp();
+ }
+
+ static int poolApproxSize() {
+ return KEYS.size();
+ }
+
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker != null ? ticker : Ticker.systemTicker();
+ }
+
+ NTopicFilterTrieNode init(TopicFilterTrieNode parent, String levelName,
+ Set> siblingTopicTrieNodes) {
assert levelName != null;
+ assert siblingTopicTrieNodes != null;
assert siblingTopicTrieNodes.stream().allMatch(node -> node.levelName().equals(levelName));
- this.subTopicTrieNodes = new TreeMap<>();
- this.subLevelNames = new TreeSet<>();
- this.subWildcardMatchableTopicTrieNodes = new HashSet<>();
+ this.parent = parent;
this.levelName = levelName;
- this.backingTopics = new HashSet<>();
+ subLevelName = null;
+ subLevelNames.clear();
+ subTopicTrieNodes.clear();
+ subWildcardMatchableTopicTrieNodes.clear();
+ backingTopics.clear();
for (TopicTrieNode sibling : siblingTopicTrieNodes) {
if (sibling.isUserTopic()) {
backingTopics.add(sibling);
@@ -77,6 +149,7 @@ final class NTopicFilterTrieNode extends TopicFilterTrieNode {
subLevelNames.add(SINGLE_WILDCARD);
}
seekChild("");
+ return this;
}
@Override
@@ -142,9 +215,19 @@ TopicFilterTrieNode childNode() {
throw new NoSuchElementException();
}
return switch (subLevelName) {
- case MULTI_WILDCARD -> new MTopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
- case SINGLE_WILDCARD -> new STopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
- default -> new NTopicFilterTrieNode<>(this, subLevelName, subTopicTrieNodes.get(subLevelName));
+ case MULTI_WILDCARD -> MTopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
+ case SINGLE_WILDCARD -> STopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
+ default -> NTopicFilterTrieNode.borrow(this, subLevelName, subTopicTrieNodes.get(subLevelName));
};
}
+
+ private void recycle() {
+ parent = null;
+ levelName = null;
+ subLevelName = null;
+ subLevelNames.clear();
+ subTopicTrieNodes.clear();
+ subWildcardMatchableTopicTrieNodes.clear();
+ backingTopics.clear();
+ }
}
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/STopicFilterTrieNode.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/STopicFilterTrieNode.java
index 0ac8285dd..a6da05494 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/STopicFilterTrieNode.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/STopicFilterTrieNode.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
@@ -22,6 +22,11 @@
import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
import static org.apache.bifromq.util.TopicConst.SINGLE_WILDCARD;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.github.benmanes.caffeine.cache.Ticker;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
@@ -30,6 +35,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Single level topic filter trie node.
@@ -37,20 +44,84 @@
* @param value type
*/
final class STopicFilterTrieNode extends TopicFilterTrieNode {
- private final NavigableSet subLevelNames;
- private final NavigableMap>> subTopicTrieNodes;
- private final Set> subWildcardMatchableTopicTrieNodes;
- private final Set> backingTopics;
+ private static final ConcurrentLinkedDeque KEYS = new ConcurrentLinkedDeque<>();
+ private static final AtomicLong SEQ = new AtomicLong();
+ private static volatile Ticker TICKER = Ticker.systemTicker();
+ private static final Cache> POOL = Caffeine.newBuilder()
+ .expireAfterAccess(EXPIRE_AFTER)
+ .recordStats()
+ .scheduler(Scheduler.systemScheduler())
+ .ticker(() -> TICKER.read())
+ .removalListener((Long key, STopicFilterTrieNode> value, RemovalCause cause) -> {
+ KEYS.remove(key);
+ if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
+ value.recycle();
+ }
+ })
+ .build();
+
+ private final NavigableSet subLevelNames = new TreeSet<>();
+ private final NavigableMap>> subTopicTrieNodes = new TreeMap<>();
+ private final Set> subWildcardMatchableTopicTrieNodes = new HashSet<>();
+ private final Set> backingTopics = new HashSet<>();
// point to the sub node during iteration
private String subLevelName;
- STopicFilterTrieNode(TopicFilterTrieNode parent, Set> siblingTopicTrieNodes) {
- super(parent);
- this.subLevelNames = new TreeSet<>();
- this.subTopicTrieNodes = new TreeMap<>();
- this.subWildcardMatchableTopicTrieNodes = new HashSet<>();
- this.backingTopics = new HashSet<>();
+ STopicFilterTrieNode() {
+ }
+
+ static STopicFilterTrieNode borrow(TopicFilterTrieNode parent,
+ Set> siblingTopicTrieNodes) {
+ while (true) {
+ Long key = KEYS.pollFirst();
+ if (key == null) {
+ break;
+ }
+ @SuppressWarnings("unchecked")
+ STopicFilterTrieNode pooled = (STopicFilterTrieNode) POOL.asMap().remove(key);
+ if (pooled != null) {
+ return pooled.init(parent, siblingTopicTrieNodes);
+ }
+ }
+ STopicFilterTrieNode node = new STopicFilterTrieNode<>();
+ return node.init(parent, siblingTopicTrieNodes);
+ }
+
+ static void release(STopicFilterTrieNode> node) {
+ node.recycle();
+ long key = SEQ.incrementAndGet();
+ KEYS.offerLast(key);
+ POOL.put(key, node);
+ }
+
+ // test hooks (package-private)
+ static void poolClear() {
+ POOL.invalidateAll();
+ POOL.cleanUp();
+ KEYS.clear();
+ }
+
+ static void poolCleanUp() {
+ POOL.cleanUp();
+ }
+
+ static int poolApproxSize() {
+ return KEYS.size();
+ }
+
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker != null ? ticker : Ticker.systemTicker();
+ }
+
+ STopicFilterTrieNode init(TopicFilterTrieNode parent, Set> siblingTopicTrieNodes) {
+ assert siblingTopicTrieNodes != null;
+ this.parent = parent;
+ subLevelName = null;
+ subLevelNames.clear();
+ subTopicTrieNodes.clear();
+ subWildcardMatchableTopicTrieNodes.clear();
+ backingTopics.clear();
for (TopicTrieNode sibling : siblingTopicTrieNodes) {
if (sibling.isUserTopic()) {
backingTopics.add(sibling);
@@ -63,7 +134,6 @@ final class STopicFilterTrieNode extends TopicFilterTrieNode {
subTopicTrieNodes.computeIfAbsent(subNode.levelName(), k -> new HashSet<>()).add(subNode);
subLevelNames.add(subNode.levelName());
}
-
}
// # match parent
if (!backingTopics.isEmpty()) {
@@ -73,8 +143,8 @@ final class STopicFilterTrieNode extends TopicFilterTrieNode {
subLevelNames.add(MULTI_WILDCARD);
subLevelNames.add(SINGLE_WILDCARD);
}
- // point to first child after init
seekChild("");
+ return this;
}
@Override
@@ -140,9 +210,18 @@ TopicFilterTrieNode childNode() {
throw new NoSuchElementException();
}
return switch (subLevelName) {
- case MULTI_WILDCARD -> new MTopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
- case SINGLE_WILDCARD -> new STopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
- default -> new NTopicFilterTrieNode<>(this, subLevelName, subTopicTrieNodes.get(subLevelName));
+ case MULTI_WILDCARD -> MTopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
+ case SINGLE_WILDCARD -> STopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
+ default -> NTopicFilterTrieNode.borrow(this, subLevelName, subTopicTrieNodes.get(subLevelName));
};
}
+
+ private void recycle() {
+ parent = null;
+ subLevelName = null;
+ subLevelNames.clear();
+ subTopicTrieNodes.clear();
+ subWildcardMatchableTopicTrieNodes.clear();
+ backingTopics.clear();
+ }
}
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ThreadLocalTopicFilterIterator.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ThreadLocalTopicFilterIterator.java
new file mode 100644
index 000000000..f02c6ee65
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ThreadLocalTopicFilterIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.trie;
+
+import static java.lang.ThreadLocal.withInitial;
+
+public class ThreadLocalTopicFilterIterator {
+ private static final ThreadLocal> INSTANCE = withInitial(TopicFilterIterator::new);
+
+ public static ITopicFilterIterator get(TopicTrieNode root) {
+ @SuppressWarnings("unchecked")
+ ITopicFilterIterator itr = ((ITopicFilterIterator) INSTANCE.get());
+ itr.init(root);
+ return itr;
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterIterator.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterIterator.java
index 4e9fac883..1f42aa419 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterIterator.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterIterator.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
@@ -36,21 +36,32 @@
* @param the value type for topic associated value
*/
public class TopicFilterIterator implements ITopicFilterIterator {
- private final TopicTrieNode topicTrieRoot;
// the invariant of the stack:
// empty: no valid topic filter to iterator from expansion set
// non-empty: always point to a valid topic filter in expansion set when there is no operation
private final Stack> traverseStack = new Stack<>();
- public TopicFilterIterator(TopicTrieNode root) {
+ private TopicTrieNode topicTrieRoot;
+
+ public TopicFilterIterator() {
+ }
+
+ @Override
+ public void init(TopicTrieNode root) {
this.topicTrieRoot = root;
seek(Collections.emptyList());
}
+ @Override
+ public void close() {
+ clearTraverseStack();
+ topicTrieRoot = null;
+ }
+
@Override
public void seek(List filterLevels) {
- traverseStack.clear();
- traverseStack.add(TopicFilterTrieNode.from(topicTrieRoot));
+ clearTraverseStack();
+ traverseStack.push(TopicFilterTrieNode.from(topicTrieRoot));
int i = -1;
out:
while (!traverseStack.isEmpty() && i < filterLevels.size()) {
@@ -73,7 +84,7 @@ public void seek(List filterLevels) {
traverseStack.push(node.childNode());
} else {
// backtrace
- traverseStack.pop();
+ popAndRelease();
if (traverseStack.isEmpty()) {
break;
}
@@ -85,7 +96,7 @@ public void seek(List filterLevels) {
traverseStack.push(parent.childNode());
break out;
} else {
- traverseStack.pop();
+ popAndRelease();
}
}
}
@@ -94,7 +105,7 @@ public void seek(List filterLevels) {
// no least next topicfilter exists in expansion set for the given topic filter
// drain the stack
while (!traverseStack.isEmpty()) {
- traverseStack.pop();
+ popAndRelease();
}
}
}
@@ -112,8 +123,8 @@ public void seek(List filterLevels) {
@Override
public void seekPrev(List filterLevels) {
- traverseStack.clear();
- traverseStack.add(TopicFilterTrieNode.from(topicTrieRoot));
+ clearTraverseStack();
+ traverseStack.push(TopicFilterTrieNode.from(topicTrieRoot));
int i = -1;
out:
while (!traverseStack.isEmpty() && i < filterLevels.size()) {
@@ -129,7 +140,7 @@ public void seekPrev(List filterLevels) {
// levelNameToSeek == levelName
if (i == filterLevels.size()) {
// backtrace to find strictly greatest prev topic filter
- traverseStack.pop();
+ popAndRelease();
if (traverseStack.isEmpty()) {
break;
}
@@ -142,7 +153,7 @@ public void seekPrev(List filterLevels) {
break out;
} else if (parent.backingTopics().isEmpty()) {
// if current stack do not represent a topicfilter in expansion set, backtrace one level up
- traverseStack.pop();
+ popAndRelease();
} else {
// current stack represents the greatest previous topic filter
// make sure it points to valid child position
@@ -164,7 +175,7 @@ public void seekPrev(List filterLevels) {
return;
}
// backtrace
- traverseStack.pop();
+ popAndRelease();
if (traverseStack.isEmpty()) {
break;
}
@@ -178,7 +189,7 @@ public void seekPrev(List filterLevels) {
break out;
} else if (parent.backingTopics().isEmpty()) {
// if current stack not represent a valid topic, backtrace one level up
- traverseStack.pop();
+ popAndRelease();
} else {
// current stack represents a topicfilter in expansion set
// make sure it points to valid child position
@@ -193,7 +204,7 @@ public void seekPrev(List filterLevels) {
// no greatest prev topicfilter exists in expansion set for the given topic filter
// drain the stack
while (!traverseStack.isEmpty()) {
- traverseStack.pop();
+ popAndRelease();
}
}
}
@@ -220,7 +231,7 @@ public boolean isValid() {
public void prev() {
assert traverseStack.isEmpty() || !traverseStack.peek().backingTopics().isEmpty();
while (!traverseStack.isEmpty()) {
- traverseStack.pop();
+ popAndRelease();
if (traverseStack.isEmpty()) {
return;
}
@@ -257,7 +268,7 @@ public void next() {
break;
}
} else {
- traverseStack.pop();
+ popAndRelease();
if (!traverseStack.isEmpty()) {
traverseStack.peek().nextChild();
}
@@ -287,4 +298,14 @@ public Map, Set> value() {
}
return value;
}
+
+ private void clearTraverseStack() {
+ while (!traverseStack.isEmpty()) {
+ popAndRelease();
+ }
+ }
+
+ private void popAndRelease() {
+ TopicFilterTrieNode.release(traverseStack.pop());
+ }
}
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterTrieNode.java b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterTrieNode.java
index 32cdb5654..8b46240bd 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterTrieNode.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/TopicFilterTrieNode.java
@@ -22,6 +22,7 @@
import static org.apache.bifromq.util.TopicConst.NUL;
import com.google.common.collect.Lists;
+import java.time.Duration;
import java.util.List;
import java.util.Set;
@@ -31,14 +32,25 @@
* @param value type
*/
abstract class TopicFilterTrieNode {
- protected final TopicFilterTrieNode parent;
+ static final Duration EXPIRE_AFTER = Duration.ofMinutes(1);
- protected TopicFilterTrieNode(TopicFilterTrieNode parent) {
- this.parent = parent;
+ protected TopicFilterTrieNode parent;
+
+ protected TopicFilterTrieNode() {
}
static TopicFilterTrieNode from(TopicTrieNode root) {
- return new NTopicFilterTrieNode<>(null, root.levelName(), Set.of(root));
+ return NTopicFilterTrieNode.borrow(null, root.levelName(), Set.of(root));
+ }
+
+ static void release(TopicFilterTrieNode> node) {
+ if (node instanceof MTopicFilterTrieNode> mNode) {
+ MTopicFilterTrieNode.release(mNode);
+ } else if (node instanceof STopicFilterTrieNode> sNode) {
+ STopicFilterTrieNode.release(sNode);
+ } else if (node instanceof NTopicFilterTrieNode> nNode) {
+ NTopicFilterTrieNode.release(nNode);
+ }
}
/**
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterIteratorTest.java b/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterIteratorTest.java
index 530a9be6d..fc1ea4df0 100644
--- a/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterIteratorTest.java
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterIteratorTest.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.trie;
@@ -28,9 +28,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import org.apache.bifromq.dist.TestUtil;
-import org.apache.bifromq.util.TopicConst;
-import org.apache.bifromq.util.TopicUtil;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -38,6 +35,9 @@
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.dist.TestUtil;
+import org.apache.bifromq.util.TopicConst;
+import org.apache.bifromq.util.TopicUtil;
import org.testng.annotations.Test;
@Slf4j
@@ -47,7 +47,8 @@ public void expandRandomLocalTopic() {
String topic = randomTopic();
TopicTrieNode topicTrie =
TopicTrieNode.builder(false).addTopic(parse(topic, false), "v1").build();
- TopicFilterIterator itr = new TopicFilterIterator<>(topicTrie);
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(topicTrie);
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -72,7 +73,8 @@ public void seekExistAndIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -94,7 +96,8 @@ public void randomSeekAndIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -117,7 +120,8 @@ public void seekExistAndBackwardIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -142,7 +146,8 @@ public void randomSeekAndBackwardIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -171,7 +176,8 @@ public void seekPrevExistAndIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -198,7 +204,8 @@ public void randomSeekPrevAndIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -227,7 +234,8 @@ public void seekPrevExistAndBackwardIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -252,7 +260,8 @@ public void randomSeekPrevAndBackwardIteration() {
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
builder.addTopic(parse(randomTopic(), false), "_");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
List generated = new ArrayList<>();
for (; itr.isValid(); itr.next()) {
generated.add(fastJoin(NUL, itr.key()));
@@ -276,7 +285,8 @@ public void associatedValues() {
builder.addTopic(parse("a", false), "v1");
builder.addTopic(parse("a/b", false), "v2");
builder.addTopic(parse("c", false), "v3");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
itr.seek(List.of("#"));
assertEquals(itr.value().size(), 3);
assertEquals(itr.value().get(List.of("a")), Set.of("v1"));
@@ -309,7 +319,8 @@ public void localSysTopicMatch() {
builder.addTopic(parse("$sys/a", false), "v1");
builder.addTopic(parse("a/b", false), "v2");
builder.addTopic(parse("c", false), "v3");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
itr.seek(List.of("#"));
assertEquals(itr.value().size(), 2);
assertEquals(itr.value().get(List.of("a", "b")), Set.of("v2"));
@@ -322,7 +333,8 @@ public void globalSysTopicMatch() {
builder.addTopic(parse("tenant/$sys/a", false), "v1");
builder.addTopic(parse("tenant/a/b", false), "v2");
builder.addTopic(parse("tenant/c", false), "v3");
- TopicFilterIterator itr = new TopicFilterIterator<>(builder.build());
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
itr.seek(List.of("tenant", "#"));
assertEquals(itr.value().size(), 2);
assertEquals(itr.value().get(List.of("tenant", "a", "b")), Set.of("v2"));
@@ -337,7 +349,8 @@ private void expandTopics(Map> topicToFilters, boolean isGl
allTopicFilters.addAll(topicToFilters.get(topic));
}
List sortedTopicFilters = allTopicFilters.stream().map(TopicUtil::escape).sorted().toList();
- TopicFilterIterator iterator = new TopicFilterIterator<>(topicTrieBuilder.build());
+ TopicFilterIterator iterator = new TopicFilterIterator<>();
+ iterator.init(topicTrieBuilder.build());
List generated = new ArrayList<>();
for (; iterator.isValid(); iterator.next()) {
generated.add(fastJoin(NUL, iterator.key()));
diff --git a/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterPoolTest.java b/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterPoolTest.java
new file mode 100644
index 000000000..7be9ee385
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-coproc-proto/src/test/java/org/apache/bifromq/dist/trie/TopicFilterPoolTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.trie;
+
+import static org.awaitility.Awaitility.await;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicFilterPoolTest {
+
+ @BeforeMethod
+ public void resetPools() {
+ NTopicFilterTrieNode.poolClear();
+ STopicFilterTrieNode.poolClear();
+ MTopicFilterTrieNode.poolClear();
+ // reset ticker back to system
+ NTopicFilterTrieNode.setTicker(null);
+ STopicFilterTrieNode.setTicker(null);
+ MTopicFilterTrieNode.setTicker(null);
+ }
+
+ @AfterMethod
+ public void cleanup() {
+ NTopicFilterTrieNode.poolClear();
+ STopicFilterTrieNode.poolClear();
+ MTopicFilterTrieNode.poolClear();
+ }
+
+ @Test
+ public void reusePoolEntryForNTopicNode() {
+ TopicTrieNode root = buildLocalTrie();
+ TopicTrieNode aNode = root.child("a");
+
+ NTopicFilterTrieNode n1 = NTopicFilterTrieNode.borrow(null, "a", Set.of(aNode));
+ int id1 = System.identityHashCode(n1);
+ // basic behavior sanity
+ assertEquals(n1.levelName(), "a");
+ assertTrue(n1.backingTopics().contains(aNode));
+ NTopicFilterTrieNode.release(n1);
+
+ NTopicFilterTrieNode n2 = NTopicFilterTrieNode.borrow(null, "a", Set.of(aNode));
+ int id2 = System.identityHashCode(n2);
+ assertSame(n1, n2);
+ assertEquals(id1, id2);
+ // re-init takes effect
+ assertEquals(n2.levelName(), "a");
+ NTopicFilterTrieNode.release(n2);
+ }
+
+ @Test
+ public void reusePoolEntryForSTopicNode() {
+ TopicTrieNode root = buildLocalTrie();
+ Set> wildcardMatchables = new HashSet<>();
+ for (TopicTrieNode child : root.children().values()) {
+ if (child.wildcardMatchable()) {
+ wildcardMatchables.add(child);
+ }
+ }
+
+ STopicFilterTrieNode s1 = STopicFilterTrieNode.borrow(null, wildcardMatchables);
+ STopicFilterTrieNode.release(s1);
+ STopicFilterTrieNode s2 = STopicFilterTrieNode.borrow(null, wildcardMatchables);
+ assertSame(s1, s2);
+ STopicFilterTrieNode.release(s2);
+ }
+
+ @Test
+ public void reusePoolEntryForMTopicNode() {
+ TopicTrieNode root = buildLocalTrie();
+ Set> wildcardMatchables = new HashSet<>();
+ for (TopicTrieNode child : root.children().values()) {
+ if (child.wildcardMatchable()) {
+ wildcardMatchables.add(child);
+ }
+ }
+ MTopicFilterTrieNode m1 = MTopicFilterTrieNode.borrow(null, wildcardMatchables);
+ assertTrue(m1.backingTopics().stream().anyMatch(TopicTrieNode::isUserTopic));
+ MTopicFilterTrieNode.release(m1);
+ MTopicFilterTrieNode m2 = MTopicFilterTrieNode.borrow(null, wildcardMatchables);
+ assertSame(m1, m2);
+ MTopicFilterTrieNode.release(m2);
+ }
+
+ @Test
+ public void ttlExpiryEvictsEntries() {
+ FakeTicker ticker = new FakeTicker();
+ NTopicFilterTrieNode.setTicker(ticker);
+ STopicFilterTrieNode.setTicker(ticker);
+ MTopicFilterTrieNode.setTicker(ticker);
+
+ TopicTrieNode root = buildLocalTrie();
+ TopicTrieNode aNode = root.child("a");
+ Set> wildcardMatchables = new HashSet<>();
+ for (TopicTrieNode child : root.children().values()) {
+ if (child.wildcardMatchable()) {
+ wildcardMatchables.add(child);
+ }
+ }
+
+ // put one entry in each pool
+ NTopicFilterTrieNode.release(NTopicFilterTrieNode.borrow(null, "a", Set.of(aNode)));
+ STopicFilterTrieNode.release(STopicFilterTrieNode.borrow(null, wildcardMatchables));
+ MTopicFilterTrieNode.release(MTopicFilterTrieNode.borrow(null, wildcardMatchables));
+
+ await().until(() -> NTopicFilterTrieNode.poolApproxSize() == 1
+ && STopicFilterTrieNode.poolApproxSize() == 1
+ && MTopicFilterTrieNode.poolApproxSize() == 1);
+
+ // advance beyond TTL and force cleanup
+ ticker.advance(Duration.ofMinutes(2));
+ NTopicFilterTrieNode.poolCleanUp();
+ STopicFilterTrieNode.poolCleanUp();
+ MTopicFilterTrieNode.poolCleanUp();
+
+ await().until(() -> NTopicFilterTrieNode.poolApproxSize() == 0
+ && STopicFilterTrieNode.poolApproxSize() == 0
+ && MTopicFilterTrieNode.poolApproxSize() == 0);
+ }
+
+ @Test
+ public void iteratorBacktraceReleasesNodesNoLeak() {
+ TopicTrieNode.Builder builder = TopicTrieNode.builder(false);
+ for (int i = 0; i < 10; i++) {
+ builder.addTopic(List.of("a" + i), "v" + i);
+ for (int j = 0; j < 3; j++) {
+ builder.addTopic(List.of("a" + i, "b" + j), "v" + i + j);
+ }
+ }
+ builder.addTopic(List.of("$sys", "x"), "sys");
+ TopicFilterIterator itr = new TopicFilterIterator<>();
+ itr.init(builder.build());
+
+ for (int round = 0; round < 6; round++) {
+ if ((round & 1) == 0) {
+ // forward full scan
+ itr.seek(List.of());
+ for (; itr.isValid(); itr.next()) {
+ // no-op
+ }
+ } else {
+ // backward full scan
+ itr.seekPrev(List.of());
+ for (; itr.isValid(); itr.prev()) {
+ // no-op
+ }
+ }
+ // after each round, pools should not grow unbounded
+ assertTrue(NTopicFilterTrieNode.poolApproxSize() < 256);
+ assertTrue(STopicFilterTrieNode.poolApproxSize() < 256);
+ assertTrue(MTopicFilterTrieNode.poolApproxSize() < 256);
+ }
+ }
+
+ private TopicTrieNode buildLocalTrie() {
+ return TopicTrieNode.builder(false)
+ .addTopic(List.of("a"), "v1")
+ .addTopic(List.of("a", "b"), "v2")
+ .addTopic(List.of("c"), "v3")
+ .addTopic(List.of("$sys", "x"), "sys")
+ .build();
+ }
+
+ static class FakeTicker implements Ticker {
+ private final AtomicLong nanos = new AtomicLong(System.nanoTime());
+
+ @Override
+ public long read() {
+ return nanos.get();
+ }
+
+ void advance(Duration d) {
+ nanos.addAndGet(d.toNanos());
+ }
+ }
+}
+
diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/org/apache/bifromq/dist/server/scheduler/BatchDistServerCall.java b/bifromq-dist/bifromq-dist-server/src/main/java/org/apache/bifromq/dist/server/scheduler/BatchDistServerCall.java
index 6cb199b09..292870448 100644
--- a/bifromq-dist/bifromq-dist-server/src/main/java/org/apache/bifromq/dist/server/scheduler/BatchDistServerCall.java
+++ b/bifromq-dist/bifromq-dist-server/src/main/java/org/apache/bifromq/dist/server/scheduler/BatchDistServerCall.java
@@ -59,7 +59,8 @@
import org.apache.bifromq.dist.rpc.proto.DistPack;
import org.apache.bifromq.dist.rpc.proto.DistServiceROCoProcInput;
import org.apache.bifromq.dist.rpc.proto.Fact;
-import org.apache.bifromq.dist.trie.TopicFilterIterator;
+import org.apache.bifromq.dist.trie.ITopicFilterIterator;
+import org.apache.bifromq.dist.trie.ThreadLocalTopicFilterIterator;
import org.apache.bifromq.dist.trie.TopicTrieNode;
import org.apache.bifromq.type.ClientInfo;
import org.apache.bifromq.type.Message;
@@ -231,36 +232,39 @@ private Collection rangeLookup() {
batch.keySet()
.forEach(topic -> topicTrieBuilder.addTopic(TopicUtil.parse(batcherKey.tenantId(), topic, false), topic));
- TopicFilterIterator topicFilterIterator = new TopicFilterIterator<>(topicTrieBuilder.build());
- List finalCandidates = new LinkedList<>();
- for (KVRangeSetting candidate : allCandidates) {
- Optional factOpt = candidate.getFact(Fact.class);
- if (factOpt.isEmpty()) {
- finalCandidates.add(candidate);
- continue;
- }
- Fact fact = factOpt.get();
- if (!fact.hasFirstGlobalFilterLevels() || !fact.hasLastGlobalFilterLevels()) {
- // range is empty
- continue;
- }
- List firstFilterLevels = fact.getFirstGlobalFilterLevels().getFilterLevelList();
- List lastFilterLevels = fact.getLastGlobalFilterLevels().getFilterLevelList();
- topicFilterIterator.seek(firstFilterLevels);
- if (topicFilterIterator.isValid()) {
- // firstTopicFilter <= nextTopicFilter
- if (topicFilterIterator.key().equals(firstFilterLevels) ||
- fastJoin(NUL, topicFilterIterator.key()).compareTo(fastJoin(NUL, lastFilterLevels)) <= 0) {
- // if firstTopicFilter == nextTopicFilter || nextFilterLevels <= lastFilterLevels
- // add to finalCandidates
+ try (ITopicFilterIterator topicFilterIterator =
+ ThreadLocalTopicFilterIterator.get(topicTrieBuilder.build())) {
+ topicFilterIterator.init(topicTrieBuilder.build());
+ List finalCandidates = new LinkedList<>();
+ for (KVRangeSetting candidate : allCandidates) {
+ Optional factOpt = candidate.getFact(Fact.class);
+ if (factOpt.isEmpty()) {
finalCandidates.add(candidate);
+ continue;
+ }
+ Fact fact = factOpt.get();
+ if (!fact.hasFirstGlobalFilterLevels() || !fact.hasLastGlobalFilterLevels()) {
+ // range is empty
+ continue;
+ }
+ List firstFilterLevels = fact.getFirstGlobalFilterLevels().getFilterLevelList();
+ List lastFilterLevels = fact.getLastGlobalFilterLevels().getFilterLevelList();
+ topicFilterIterator.seek(firstFilterLevels);
+ if (topicFilterIterator.isValid()) {
+ // firstTopicFilter <= nextTopicFilter
+ if (topicFilterIterator.key().equals(firstFilterLevels)
+ || fastJoin(NUL, topicFilterIterator.key()).compareTo(fastJoin(NUL, lastFilterLevels)) <= 0) {
+ // if firstTopicFilter == nextTopicFilter || nextFilterLevels <= lastFilterLevels
+ // add to finalCandidates
+ finalCandidates.add(candidate);
+ }
+ } else {
+ // endTopicFilter < firstTopicFilter, stop
+ break;
}
- } else {
- // endTopicFilter < firstTopicFilter, stop
- break;
}
+ return finalCandidates;
}
- return finalCandidates;
}
private boolean hasSingleItem(Collection collection) {
diff --git a/bifromq-dist/bifromq-dist-worker-schema/pom.xml b/bifromq-dist/bifromq-dist-worker-schema/pom.xml
index 1c2cf307d..91bcbf872 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/pom.xml
+++ b/bifromq-dist/bifromq-dist-worker-schema/pom.xml
@@ -30,6 +30,10 @@
bifromq-dist-worker-schema
+
+ com.github.ben-manes.caffeine
+ caffeine
+
org.apache.bifromq
bifromq-util
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/GroupMatching.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/GroupMatching.java
index ff745eeef..ee9351bd4 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/GroupMatching.java
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/GroupMatching.java
@@ -29,7 +29,7 @@
/**
* Represent a group matching route.
*/
-@EqualsAndHashCode(callSuper = true)
+@EqualsAndHashCode(callSuper = true, cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public final class GroupMatching extends Matching {
@EqualsAndHashCode.Exclude
@@ -38,11 +38,12 @@ public final class GroupMatching extends Matching {
public final List receiverList;
private final Map receivers;
- GroupMatching(String tenantId, RouteMatcher matcher, Map members) {
+ public GroupMatching(String tenantId, RouteMatcher matcher, Map members) {
super(tenantId, matcher);
assert matcher.getType() != RouteMatcher.Type.Normal;
- this.ordered = matcher.getType() == RouteMatcher.Type.OrderedShare;
this.receivers = members;
+
+ this.ordered = matcher.getType() == RouteMatcher.Type.OrderedShare;
this.receiverList = members.entrySet().stream()
.map(e -> new NormalMatching(tenantId, matcher, e.getKey(), e.getValue()))
.collect(Collectors.toList());
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaConstants.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaConstants.java
new file mode 100644
index 000000000..7f5db3851
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema;
+
+import com.google.protobuf.ByteString;
+
+public class KVSchemaConstants {
+ public static final ByteString SCHEMA_VER = ByteString.copyFrom(new byte[] {0x00});
+ public static final int MAX_RECEIVER_BUCKETS = 0xFF; // one byte
+ public static final byte FLAG_NORMAL = 0x01;
+ public static final byte FLAG_UNORDERED = 0x02;
+ public static final byte FLAG_ORDERED = 0x03;
+ public static final ByteString SEPARATOR_BYTE = ByteString.copyFrom(new byte[] {0x00});
+ public static final ByteString FLAG_NORMAL_VAL = ByteString.copyFrom(new byte[] {FLAG_NORMAL});
+ public static final ByteString FLAG_UNORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_UNORDERED});
+ public static final ByteString FLAG_ORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_ORDERED});
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtil.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtil.java
index f1c0117c4..630dbdcc2 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtil.java
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtil.java
@@ -14,43 +14,38 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.worker.schema;
-import static org.apache.bifromq.util.BSUtil.toByteString;
-import static org.apache.bifromq.util.BSUtil.toShort;
-import static org.apache.bifromq.util.TopicConst.DELIMITER;
-import static org.apache.bifromq.util.TopicConst.NUL;
-import static org.apache.bifromq.util.TopicConst.ORDERED_SHARE;
-import static org.apache.bifromq.util.TopicConst.UNORDERED_SHARE;
-import static org.apache.bifromq.util.TopicUtil.parse;
-import static org.apache.bifromq.util.TopicUtil.unescape;
import static com.google.protobuf.ByteString.copyFromUtf8;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_NORMAL_VAL;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_ORDERED_VAL;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_UNORDERED_VAL;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.MAX_RECEIVER_BUCKETS;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.SCHEMA_VER;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.SEPARATOR_BYTE;
+import static org.apache.bifromq.dist.worker.schema.cache.RouteGroupCache.get;
+import static org.apache.bifromq.util.BSUtil.toByteString;
+import static org.apache.bifromq.util.TopicConst.NUL;
+import com.google.protobuf.ByteString;
+import java.util.List;
import org.apache.bifromq.dist.rpc.proto.MatchRoute;
import org.apache.bifromq.dist.rpc.proto.RouteGroup;
+import org.apache.bifromq.dist.worker.schema.cache.GroupMatchingCache;
+import org.apache.bifromq.dist.worker.schema.cache.NormalMatchingCache;
+import org.apache.bifromq.dist.worker.schema.cache.ReceiverCache;
+import org.apache.bifromq.dist.worker.schema.cache.RouteDetailCache;
import org.apache.bifromq.type.RouteMatcher;
import org.apache.bifromq.util.BSUtil;
-import com.google.protobuf.ByteString;
-import java.util.List;
/**
* Utility for working with the data stored in dist worker.
*/
public class KVSchemaUtil {
- public static final ByteString SCHEMA_VER = ByteString.copyFrom(new byte[] {0x00});
- private static final int MAX_RECEIVER_BUCKETS = 0xFF; // one byte
- private static final byte FLAG_NORMAL = 0x01;
- private static final byte FLAG_UNORDERED = 0x02;
- private static final byte FLAG_ORDERED = 0x03;
- private static final ByteString SEPARATOR_BYTE = ByteString.copyFrom(new byte[] {0x00});
- private static final ByteString FLAG_NORMAL_VAL = ByteString.copyFrom(new byte[] {FLAG_NORMAL});
- private static final ByteString FLAG_UNORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_UNORDERED});
- private static final ByteString FLAG_ORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_ORDERED});
-
public static String toReceiverUrl(MatchRoute route) {
return toReceiverUrl(route.getBrokerId(), route.getReceiverId(), route.getDelivererKey());
}
@@ -60,25 +55,25 @@ public static String toReceiverUrl(int subBrokerId, String receiverId, String de
}
public static Receiver parseReceiver(String receiverUrl) {
- String[] parts = receiverUrl.split(NUL);
- return new Receiver(Integer.parseInt(parts[0]), parts[1], parts[2]);
+ return ReceiverCache.get(receiverUrl);
}
public static Matching buildMatchRoute(ByteString routeKey, ByteString routeValue) {
RouteDetail routeDetail = parseRouteDetail(routeKey);
- try {
- if (routeDetail.matcher().getType() == RouteMatcher.Type.Normal) {
- return new NormalMatching(routeDetail.tenantId(),
- routeDetail.matcher(),
- routeDetail.receiverUrl(),
- BSUtil.toLong(routeValue));
- }
- return new GroupMatching(routeDetail.tenantId(),
- routeDetail.matcher(),
- RouteGroup.parseFrom(routeValue).getMembersMap());
- } catch (Exception e) {
- throw new IllegalStateException("Unable to parse matching record", e);
+ if (routeDetail.matcher().getType() == RouteMatcher.Type.Normal) {
+ return buildNormalMatchRoute(routeDetail, BSUtil.toLong(routeValue));
}
+ return buildGroupMatchRoute(routeDetail, get(routeValue));
+ }
+
+ public static Matching buildNormalMatchRoute(RouteDetail routeDetail, long incarnation) {
+ assert routeDetail.matcher().getType() == RouteMatcher.Type.Normal;
+ return NormalMatchingCache.get(routeDetail, incarnation);
+ }
+
+ public static Matching buildGroupMatchRoute(RouteDetail routeDetail, RouteGroup group) {
+ assert routeDetail.matcher().getType() != RouteMatcher.Type.Normal;
+ return GroupMatchingCache.get(routeDetail, group);
}
public static ByteString tenantBeginKey(String tenantId) {
@@ -107,67 +102,13 @@ public static ByteString toNormalRouteKey(String tenantId, RouteMatcher routeMat
public static ByteString toGroupRouteKey(String tenantId, RouteMatcher routeMatcher) {
assert routeMatcher.getType() != RouteMatcher.Type.Normal;
- return tenantRouteBucketStartKey(tenantId, routeMatcher.getFilterLevelList(),
- bucket(routeMatcher.getGroup()))
+ return tenantRouteBucketStartKey(tenantId, routeMatcher.getFilterLevelList(), bucket(routeMatcher.getGroup()))
.concat(routeMatcher.getType() == RouteMatcher.Type.OrderedShare ? FLAG_ORDERED_VAL : FLAG_UNORDERED_VAL)
.concat(toReceiverBytes(routeMatcher.getGroup()));
}
public static RouteDetail parseRouteDetail(ByteString routeKey) {
- //
- short tenantIdLen = tenantIdLen(routeKey);
- int tenantIdStartIdx = SCHEMA_VER.size() + Short.BYTES;
- int escapedTopicFilterStartIdx = tenantIdStartIdx + tenantIdLen;
- int receiverBytesLen = receiverBytesLen(routeKey);
- int receiverBytesStartIdx = routeKey.size() - Short.BYTES - receiverBytesLen;
- int receiverBytesEndIdx = routeKey.size() - Short.BYTES;
- int flagByteIdx = receiverBytesStartIdx - 1;
- int separatorBytesIdx = flagByteIdx - 1 - 2; // 2 bytes separator
- String receiverInfo = routeKey.substring(receiverBytesStartIdx, receiverBytesEndIdx).toStringUtf8();
- byte flag = routeKey.byteAt(flagByteIdx);
-
- String tenantId = routeKey.substring(tenantIdStartIdx, escapedTopicFilterStartIdx).toStringUtf8();
- String escapedTopicFilter =
- routeKey.substring(escapedTopicFilterStartIdx, separatorBytesIdx).toStringUtf8();
- switch (flag) {
- case FLAG_NORMAL -> {
- RouteMatcher matcher = RouteMatcher.newBuilder()
- .setType(RouteMatcher.Type.Normal)
- .addAllFilterLevel(parse(escapedTopicFilter, true))
- .setMqttTopicFilter(unescape(escapedTopicFilter))
- .build();
- return new RouteDetail(tenantId, matcher, receiverInfo);
- }
- case FLAG_UNORDERED -> {
- RouteMatcher matcher = RouteMatcher.newBuilder()
- .setType(RouteMatcher.Type.UnorderedShare)
- .addAllFilterLevel(parse(escapedTopicFilter, true))
- .setGroup(receiverInfo)
- .setMqttTopicFilter(
- UNORDERED_SHARE + DELIMITER + receiverInfo + DELIMITER + unescape(escapedTopicFilter))
- .build();
- return new RouteDetail(tenantId, matcher, null);
- }
- case FLAG_ORDERED -> {
- RouteMatcher matcher = RouteMatcher.newBuilder()
- .setType(RouteMatcher.Type.OrderedShare)
- .addAllFilterLevel(parse(escapedTopicFilter, true))
- .setGroup(receiverInfo)
- .setMqttTopicFilter(
- ORDERED_SHARE + DELIMITER + receiverInfo + DELIMITER + unescape(escapedTopicFilter))
- .build();
- return new RouteDetail(tenantId, matcher, null);
- }
- default -> throw new UnsupportedOperationException("Unknown route type: " + flag);
- }
- }
-
- private static short tenantIdLen(ByteString routeKey) {
- return toShort(routeKey.substring(SCHEMA_VER.size(), SCHEMA_VER.size() + Short.BYTES));
- }
-
- private static short receiverBytesLen(ByteString routeKey) {
- return toShort(routeKey.substring(routeKey.size() - Short.BYTES));
+ return RouteDetailCache.get(routeKey);
}
private static ByteString toReceiverBytes(String receiver) {
@@ -179,8 +120,4 @@ private static byte bucket(String receiver) {
int hash = receiver.hashCode();
return (byte) ((hash ^ (hash >>> 16)) & MAX_RECEIVER_BUCKETS);
}
-
- public record Receiver(int subBrokerId, String receiverId, String delivererKey) {
-
- }
}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Matching.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Matching.java
index 60b8d9aa5..56d659fe1 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Matching.java
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Matching.java
@@ -26,7 +26,7 @@
/**
* The abstract class of matching route.
*/
-@EqualsAndHashCode
+@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public abstract class Matching {
@EqualsAndHashCode.Exclude
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/NormalMatching.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/NormalMatching.java
index 07b95f223..3090ba954 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/NormalMatching.java
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/NormalMatching.java
@@ -27,7 +27,7 @@
/**
* Represent a normal matching route.
*/
-@EqualsAndHashCode(callSuper = true)
+@EqualsAndHashCode(callSuper = true, cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public class NormalMatching extends Matching {
private final String receiverUrl;
@@ -38,9 +38,9 @@ public class NormalMatching extends Matching {
private final MatchInfo matchInfo;
@EqualsAndHashCode.Exclude
- private final KVSchemaUtil.Receiver receiver;
+ private final Receiver receiver;
- NormalMatching(String tenantId, RouteMatcher matcher, String receiverUrl, long incarnation) {
+ public NormalMatching(String tenantId, RouteMatcher matcher, String receiverUrl, long incarnation) {
super(tenantId, matcher);
this.receiverUrl = receiverUrl;
this.receiver = KVSchemaUtil.parseReceiver(receiverUrl);
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Receiver.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Receiver.java
new file mode 100644
index 000000000..8c7a61b80
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Receiver.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema;
+
+public record Receiver(int subBrokerId, String receiverId, String delivererKey) {
+
+}
\ No newline at end of file
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCache.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCache.java
new file mode 100644
index 000000000..17924f0a1
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCache.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Interner;
+import org.apache.bifromq.dist.rpc.proto.RouteGroup;
+import org.apache.bifromq.dist.worker.schema.GroupMatching;
+import org.apache.bifromq.dist.worker.schema.Matching;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.RouteMatcher;
+
+public class GroupMatchingCache {
+ private static final Interner CACHE_KEY_INTERNER = Interner.newWeakInterner();
+ private static final Cache GROUP_MATCHING_CACHE = Caffeine.newBuilder().weakKeys()
+ .build();
+ private static final Interner MATCHING_INTERNER = Interner.newWeakInterner();
+
+ public static Matching get(RouteDetail routeDetail, RouteGroup group) {
+ assert routeDetail.matcher().getType() != RouteMatcher.Type.Normal;
+ CacheKey key = CACHE_KEY_INTERNER.intern(new CacheKey(routeDetail, group));
+ return GROUP_MATCHING_CACHE.get(key, k ->
+ MATCHING_INTERNER.intern(new GroupMatching(k.routeDetail().tenantId(), k.routeDetail.matcher(),
+ k.routeGroup().getMembersMap())));
+ }
+
+ private record CacheKey(RouteDetail routeDetail, RouteGroup routeGroup) {
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCache.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCache.java
new file mode 100644
index 000000000..712364b4b
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCache.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Interner;
+import org.apache.bifromq.dist.worker.schema.Matching;
+import org.apache.bifromq.dist.worker.schema.NormalMatching;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.RouteMatcher;
+
+public class NormalMatchingCache {
+ private static final Interner CACHE_KEY_INTERNER = Interner.newWeakInterner();
+ private static final Cache NORMAL_MATCHING_CACHE = Caffeine.newBuilder().weakKeys()
+ .build();
+ private static final Interner MATCHING_INTERNER = Interner.newWeakInterner();
+
+ public static Matching get(RouteDetail routeDetail, long incarnation) {
+ assert routeDetail.matcher().getType() == RouteMatcher.Type.Normal;
+ CacheKey key = CACHE_KEY_INTERNER.intern(new CacheKey(routeDetail, incarnation));
+ return NORMAL_MATCHING_CACHE.get(key, k -> MATCHING_INTERNER.intern(new NormalMatching(k.routeDetail.tenantId(),
+ k.routeDetail.matcher(),
+ k.routeDetail.receiverUrl(), k.incarnation)));
+ }
+
+ private record CacheKey(RouteDetail routeDetail, long incarnation) {
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCache.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCache.java
new file mode 100644
index 000000000..de3a84151
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCache.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.util.TopicConst.NUL;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Interner;
+import org.apache.bifromq.dist.worker.schema.Receiver;
+
+public class ReceiverCache {
+ private static final Interner RECEIVER_URL_INTERNER = Interner.newWeakInterner();
+ private static final Interner RECEIVER_INTERNER = Interner.newWeakInterner();
+ private static final Cache RECEIVER_CACHE = Caffeine.newBuilder().weakKeys().build();
+
+ public static Receiver get(String receiverUrl) {
+ receiverUrl = RECEIVER_URL_INTERNER.intern(receiverUrl);
+ return RECEIVER_CACHE.get(receiverUrl, k -> {
+ String[] parts = k.split(NUL);
+ return RECEIVER_INTERNER.intern(new Receiver(Integer.parseInt(parts[0]), parts[1], parts[2]));
+ });
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCache.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCache.java
new file mode 100644
index 000000000..19f67a47f
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCache.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_NORMAL;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_ORDERED;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.FLAG_UNORDERED;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaConstants.SCHEMA_VER;
+import static org.apache.bifromq.util.BSUtil.toShort;
+import static org.apache.bifromq.util.TopicConst.DELIMITER;
+import static org.apache.bifromq.util.TopicConst.ORDERED_SHARE;
+import static org.apache.bifromq.util.TopicConst.UNORDERED_SHARE;
+import static org.apache.bifromq.util.TopicUtil.parse;
+import static org.apache.bifromq.util.TopicUtil.unescape;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Interner;
+import com.google.protobuf.ByteString;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.RouteMatcher;
+
+public class RouteDetailCache {
+ private static final Interner TENANTID_INTERNER = Interner.newWeakInterner();
+ private static final Interner MQTT_TOPIC_FILTER_INTERNER = Interner.newWeakInterner();
+ private static final Interner ROUTE_KEY_INTERNER = Interner.newWeakInterner();
+ private static final Interner ROUTE_MATCHER_INTERNER = Interner.newWeakInterner();
+ private static final Cache ROUTE_MATCHER_CACHE = Caffeine.newBuilder().weakKeys().build();
+ private static final Cache ROUTE_DETAIL_CACHE = Caffeine.newBuilder().weakKeys().build();
+
+ public static RouteDetail get(ByteString routeKey) {
+ //
+ routeKey = ROUTE_KEY_INTERNER.intern(routeKey);
+ return ROUTE_DETAIL_CACHE.get(routeKey, k -> {
+ short tenantIdLen = tenantIdLen(k);
+ int tenantIdStartIdx = SCHEMA_VER.size() + Short.BYTES;
+ int escapedTopicFilterStartIdx = tenantIdStartIdx + tenantIdLen;
+ int receiverBytesLen = receiverBytesLen(k);
+ int receiverBytesStartIdx = k.size() - Short.BYTES - receiverBytesLen;
+ int receiverBytesEndIdx = k.size() - Short.BYTES;
+ int flagByteIdx = receiverBytesStartIdx - 1;
+ int separatorBytesIdx = flagByteIdx - 1 - 2; // 2 bytes separator
+ String receiverInfo = k.substring(receiverBytesStartIdx, receiverBytesEndIdx).toStringUtf8();
+ byte flag = k.byteAt(flagByteIdx);
+
+ String tenantId = TENANTID_INTERNER.intern(
+ k.substring(tenantIdStartIdx, escapedTopicFilterStartIdx).toStringUtf8());
+ String escapedTopicFilter = k.substring(escapedTopicFilterStartIdx, separatorBytesIdx)
+ .toStringUtf8();
+ switch (flag) {
+ case FLAG_NORMAL -> {
+ String mqttTopicFilter = MQTT_TOPIC_FILTER_INTERNER.intern(unescape(escapedTopicFilter));
+ RouteMatcher matcher = ROUTE_MATCHER_CACHE.get(mqttTopicFilter,
+ t -> ROUTE_MATCHER_INTERNER.intern(RouteMatcher.newBuilder()
+ .setType(RouteMatcher.Type.Normal)
+ .addAllFilterLevel(parse(escapedTopicFilter, true))
+ .setMqttTopicFilter(t)
+ .build()));
+ return new RouteDetail(tenantId, matcher, receiverInfo); // receiverInfo is the receiverUrl
+ }
+ case FLAG_UNORDERED -> {
+ String mqttTopicFilter = MQTT_TOPIC_FILTER_INTERNER.intern(
+ UNORDERED_SHARE + DELIMITER + receiverInfo + DELIMITER + unescape(escapedTopicFilter));
+ RouteMatcher matcher = ROUTE_MATCHER_CACHE.get(mqttTopicFilter,
+ t -> ROUTE_MATCHER_INTERNER.intern(RouteMatcher.newBuilder()
+ .setType(RouteMatcher.Type.UnorderedShare)
+ .addAllFilterLevel(parse(escapedTopicFilter, true))
+ .setGroup(receiverInfo) // receiverInfo is the group name
+ .setMqttTopicFilter(t)
+ .build()));
+ return new RouteDetail(tenantId, matcher, null);
+ }
+ case FLAG_ORDERED -> {
+ String mqttTopicFilter = MQTT_TOPIC_FILTER_INTERNER.intern(
+ ORDERED_SHARE + DELIMITER + receiverInfo + DELIMITER + unescape(escapedTopicFilter));
+ RouteMatcher matcher = ROUTE_MATCHER_CACHE.get(mqttTopicFilter, t -> {
+ RouteMatcher m = RouteMatcher.newBuilder()
+ .setType(RouteMatcher.Type.OrderedShare)
+ .addAllFilterLevel(parse(escapedTopicFilter, true))
+ .setGroup(receiverInfo) // group name
+ .setMqttTopicFilter(t)
+ .build();
+ return ROUTE_MATCHER_INTERNER.intern(m);
+ });
+ return new RouteDetail(tenantId, matcher, null);
+ }
+ default -> throw new UnsupportedOperationException("Unknown route type: " + flag);
+ }
+ });
+ }
+
+ private static short tenantIdLen(ByteString routeKey) {
+ return toShort(routeKey.substring(SCHEMA_VER.size(), SCHEMA_VER.size() + Short.BYTES));
+ }
+
+ private static short receiverBytesLen(ByteString routeKey) {
+ return toShort(routeKey.substring(routeKey.size() - Short.BYTES));
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCache.java b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCache.java
new file mode 100644
index 000000000..8e2de8a19
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCache.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Interner;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.bifromq.dist.rpc.proto.RouteGroup;
+
+public class RouteGroupCache {
+ private static final Interner ROUTE_GROUP_BYTES_INTERNER = Interner.newWeakInterner();
+ private static final Interner ROUTE_GROUP_INTERNER = Interner.newWeakInterner();
+ private static final Cache ROUTE_GROUP_CACHE = Caffeine.newBuilder().weakKeys().build();
+
+ public static RouteGroup get(ByteString routeGroupBytes) {
+ routeGroupBytes = ROUTE_GROUP_BYTES_INTERNER.intern(routeGroupBytes);
+ return ROUTE_GROUP_CACHE.get(routeGroupBytes, k -> {
+ try {
+ return ROUTE_GROUP_INTERNER.intern(RouteGroup.parseFrom(k));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Unable to parse matching record", e);
+ }
+ });
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtilTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtilTest.java
index 2c52d5e57..9d469cffe 100644
--- a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtilTest.java
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/KVSchemaUtilTest.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.dist.worker.schema;
@@ -24,15 +24,16 @@
import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toNormalRouteKey;
import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
+import com.google.protobuf.ByteString;
import org.apache.bifromq.dist.rpc.proto.MatchRoute;
import org.apache.bifromq.dist.rpc.proto.RouteGroup;
import org.apache.bifromq.type.MatchInfo;
import org.apache.bifromq.type.RouteMatcher;
import org.apache.bifromq.util.BSUtil;
import org.apache.bifromq.util.TopicUtil;
-import com.google.protobuf.ByteString;
import org.testng.annotations.Test;
public class KVSchemaUtilTest {
@@ -40,6 +41,46 @@ public class KVSchemaUtilTest {
private static final int InboxService = 1;
private static final int RuleEngine = 2;
+ @Test
+ public void testNormalMatchingIntern() {
+ String tenantId = "tenantId";
+ String topicFilter = "/a/b/c";
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ MatchRoute route = MatchRoute.newBuilder()
+ .setMatcher(matcher)
+ .setBrokerId(MqttBroker)
+ .setReceiverId("inbox1")
+ .setDelivererKey("delivererKey1")
+ .setIncarnation(1L)
+ .build();
+ ByteString key = toNormalRouteKey(tenantId, matcher, toReceiverUrl(route));
+ Matching matching1 = buildMatchRoute(key, BSUtil.toByteString(route.getIncarnation()));
+ Matching matching2 = buildMatchRoute(key, BSUtil.toByteString(route.getIncarnation()));
+ assertSame(matching1, matching2);
+ }
+
+ @Test
+ public void testGroupMatchingIntern() {
+ String origTopicFilter = "$share/group//a/b/c";
+ RouteMatcher matcher = TopicUtil.from(origTopicFilter);
+ MatchRoute route = MatchRoute.newBuilder()
+ .setMatcher(matcher)
+ .setBrokerId(MqttBroker)
+ .setReceiverId("inbox1")
+ .setDelivererKey("server1")
+ .setIncarnation(1L)
+ .build();
+
+ String scopedReceiverId = toReceiverUrl(MqttBroker, "inbox1", "server1");
+ ByteString key = toGroupRouteKey("tenantId", matcher);
+ RouteGroup groupMembers = RouteGroup.newBuilder()
+ .putMembers(scopedReceiverId, route.getIncarnation())
+ .build();
+ Matching matching1 = buildMatchRoute(key, groupMembers.toByteString());
+ Matching matching2 = buildMatchRoute(key, groupMembers.toByteString());
+ assertSame(matching1, matching2);
+ }
+
@Test
public void testParseNormalMatchRecord() {
String tenantId = "tenantId";
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCacheTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCacheTest.java
new file mode 100644
index 000000000..100530e40
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/GroupMatchingCacheTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.bifromq.dist.rpc.proto.RouteGroup;
+import org.apache.bifromq.dist.worker.schema.GroupMatching;
+import org.apache.bifromq.dist.worker.schema.NormalMatching;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.RouteMatcher;
+import org.apache.bifromq.util.TopicUtil;
+import org.testng.annotations.Test;
+
+public class GroupMatchingCacheTest {
+
+ private static RouteDetail detail(String tenantId, RouteMatcher matcher) {
+ return new RouteDetail(tenantId, matcher, null);
+ }
+
+ private static RouteGroup groupOf(Map members) {
+ RouteGroup.Builder b = RouteGroup.newBuilder();
+ for (Map.Entry e : members.entrySet()) {
+ b.putMembers(e.getKey(), e.getValue());
+ }
+ return b.build();
+ }
+
+ @Test
+ public void sameKeyIntern() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topic = "$share/group-" + UUID.randomUUID() + "/home/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topic);
+
+ Map members = new HashMap<>();
+ members.put(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L);
+ members.put(toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2"), 2L);
+
+ RouteDetail d = detail(tenantId, matcher);
+ RouteGroup g = groupOf(members);
+
+ GroupMatching m1 = (GroupMatching) GroupMatchingCache.get(d, g);
+ GroupMatching m2 = (GroupMatching) GroupMatchingCache.get(d, g);
+ assertSame(m1, m2);
+ }
+
+ @Test
+ public void membershipChangeCreatesNewInstance() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topic = "$share/group-" + UUID.randomUUID() + "/home/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topic);
+
+ String url1 = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ String url2 = toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2");
+
+ RouteDetail d = detail(tenantId, matcher);
+ RouteGroup g1 = RouteGroup.newBuilder().putMembers(url1, 1L).putMembers(url2, 2L).build();
+ RouteGroup g2 = RouteGroup.newBuilder().putMembers(url1, 1L).build(); // remove one member
+
+ GroupMatching m1 = (GroupMatching) GroupMatchingCache.get(d, g1);
+ GroupMatching m2 = (GroupMatching) GroupMatchingCache.get(d, g2);
+ assertNotSame(m1, m2);
+ }
+
+ @Test
+ public void orderedFlagPropagates() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ RouteMatcher unordered = TopicUtil.from("$share/g" + UUID.randomUUID() + "/a/b");
+ RouteMatcher ordered = TopicUtil.from("$oshare/g" + UUID.randomUUID() + "/a/b");
+ RouteGroup g = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .build();
+
+ GroupMatching mu = (GroupMatching) GroupMatchingCache.get(detail(tenantId, unordered), g);
+ GroupMatching mo = (GroupMatching) GroupMatchingCache.get(detail(tenantId, ordered), g);
+
+ // unordered share => ordered=false; ordered share => ordered=true
+ assertEquals(mu.ordered, false);
+ assertEquals(mo.ordered, true);
+ }
+
+ @Test
+ public void receiverListConsistency() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from("$share/g" + UUID.randomUUID() + "/a/b");
+
+ String url1 = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ String url2 = toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2");
+ long inc1 = 1L, inc2 = 2L;
+
+ RouteGroup g = RouteGroup.newBuilder().putMembers(url1, inc1).putMembers(url2, inc2).build();
+ GroupMatching m = (GroupMatching) GroupMatchingCache.get(detail(tenantId, matcher), g);
+
+ Map expected = g.getMembersMap();
+ assertEquals(m.receiverList.size(), expected.size());
+
+ Map actual = new HashMap<>();
+ for (NormalMatching n : m.receiverList) {
+ actual.put(n.receiverUrl(), n.incarnation());
+ // Each sub matching preserves matcher mqttTopicFilter
+ assertEquals(n.matchInfo().getMatcher().getMqttTopicFilter(), matcher.getMqttTopicFilter());
+ }
+ assertEquals(actual, expected);
+ }
+
+ @Test
+ public void concurrentGetSameKey() throws Exception {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from("$share/g" + UUID.randomUUID() + "/a/b");
+ RouteGroup g = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .putMembers(toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2"), 2L)
+ .build();
+ RouteDetail d = detail(tenantId, matcher);
+
+ int threads = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(pool.submit(() -> {
+ start.await();
+ return (GroupMatching) GroupMatchingCache.get(d, g);
+ }));
+ }
+ start.countDown();
+ GroupMatching first = futures.get(0).get();
+ for (Future f : futures) {
+ assertSame(first, f.get(), "Concurrent gets should return same GroupMatching instance");
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+}
+
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCacheTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCacheTest.java
new file mode 100644
index 000000000..60750a7cf
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/NormalMatchingCacheTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.bifromq.dist.worker.schema.NormalMatching;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.MatchInfo;
+import org.apache.bifromq.type.RouteMatcher;
+import org.apache.bifromq.util.TopicUtil;
+import org.testng.annotations.Test;
+
+public class NormalMatchingCacheTest {
+
+ private static RouteDetail detail(String tenantId, String topicFilter, String receiverUrl) {
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ return new RouteDetail(tenantId, matcher, receiverUrl);
+ }
+
+ @Test
+ public void sameKeySameIncarnationIntern() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/s/" + UUID.randomUUID();
+ String receiverUrl = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+ RouteDetail d = detail(tenantId, topicFilter, receiverUrl);
+
+ NormalMatching m1 = (NormalMatching) NormalMatchingCache.get(d, 1L);
+ NormalMatching m2 = (NormalMatching) NormalMatchingCache.get(d, 1L);
+ assertSame(m1, m2);
+ }
+
+ @Test
+ public void differentIncarnationNotSame() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/s/" + UUID.randomUUID();
+ String receiverUrl = toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+ RouteDetail d = detail(tenantId, topicFilter, receiverUrl);
+
+ NormalMatching m1 = (NormalMatching) NormalMatchingCache.get(d, 1L);
+ NormalMatching m2 = (NormalMatching) NormalMatchingCache.get(d, 2L);
+ assertNotSame(m1, m2);
+ }
+
+ @Test
+ public void differentReceiverNotSame() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/s/" + UUID.randomUUID();
+ String receiverUrl1 = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ String receiverUrl2 = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ NormalMatching m1 = (NormalMatching) NormalMatchingCache.get(detail(tenantId, topicFilter, receiverUrl1), 1L);
+ NormalMatching m2 = (NormalMatching) NormalMatchingCache.get(detail(tenantId, topicFilter, receiverUrl2), 1L);
+ assertNotSame(m1, m2);
+ }
+
+ @Test
+ public void differentMatcherNotSame() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String receiverUrl = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ NormalMatching m1 = (NormalMatching) NormalMatchingCache.get(detail(tenantId, "/a/#", receiverUrl), 1L);
+ NormalMatching m2 = (NormalMatching) NormalMatchingCache.get(detail(tenantId, "/a/+", receiverUrl), 1L);
+ assertNotSame(m1, m2);
+ }
+
+ @Test
+ public void fieldCorrectness() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/s/" + UUID.randomUUID();
+ int subBrokerId = 7;
+ String receiverId = "inbox-" + UUID.randomUUID();
+ String deliverer = "d-" + UUID.randomUUID();
+ String receiverUrl = toReceiverUrl(subBrokerId, receiverId, deliverer);
+ NormalMatching m = (NormalMatching) NormalMatchingCache.get(detail(tenantId, topicFilter, receiverUrl), 42L);
+
+ assertEquals(m.receiverUrl(), receiverUrl);
+ assertEquals(m.subBrokerId(), subBrokerId);
+ assertEquals(m.delivererKey(), deliverer);
+ MatchInfo info = m.matchInfo();
+ assertEquals(info.getReceiverId(), receiverId);
+ assertEquals(info.getIncarnation(), 42L);
+ assertEquals(info.getMatcher().getMqttTopicFilter(), topicFilter);
+ }
+
+ @Test
+ public void concurrentGetSameKey() throws Exception {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/s/" + UUID.randomUUID();
+ String receiverUrl = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+ RouteDetail d = detail(tenantId, topicFilter, receiverUrl);
+
+ int threads = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(pool.submit(() -> {
+ start.await();
+ return (NormalMatching) NormalMatchingCache.get(d, 1L);
+ }));
+ }
+ start.countDown();
+ NormalMatching first = futures.get(0).get();
+ for (Future f : futures) {
+ assertSame(first, f.get());
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+}
+
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCacheTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCacheTest.java
new file mode 100644
index 000000000..c045a58a9
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/ReceiverCacheTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.bifromq.dist.worker.schema.Receiver;
+import org.testng.annotations.Test;
+
+public class ReceiverCacheTest {
+
+ @Test
+ public void parseAndCache() {
+ int brokerId = (int) (System.nanoTime() & 0x7fffffff);
+ String receiverId = "inbox-" + UUID.randomUUID();
+ String delivererKey = "deliverer-" + UUID.randomUUID();
+ String url = toReceiverUrl(brokerId, receiverId, delivererKey);
+
+ Receiver r1 = ReceiverCache.get(url);
+ Receiver r2 = ReceiverCache.get(url);
+
+ assertSame(r1, r2);
+ assertEquals(r1.subBrokerId(), brokerId);
+ assertEquals(r1.receiverId(), receiverId);
+ assertEquals(r1.delivererKey(), delivererKey);
+ }
+
+ @Test
+ public void differentReceiverNotSame() {
+ String url1 = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1");
+ String url2 = toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2");
+
+ Receiver r1 = ReceiverCache.get(url1);
+ Receiver r2 = ReceiverCache.get(url2);
+
+ assertNotSame(r1, r2);
+ }
+
+ @Test
+ public void concurrentGetSameKey() throws Exception {
+ int threads = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ String url = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+ CountDownLatch start = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(pool.submit(() -> {
+ start.await();
+ return ReceiverCache.get(url);
+ }));
+ }
+ start.countDown();
+ Receiver first = futures.get(0).get();
+ for (Future f : futures) {
+ assertSame(first, f.get());
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+}
+
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCacheTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCacheTest.java
new file mode 100644
index 000000000..f233ffa57
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteDetailCacheTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toGroupRouteKey;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toNormalRouteKey;
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.bifromq.dist.worker.schema.RouteDetail;
+import org.apache.bifromq.type.RouteMatcher;
+import org.apache.bifromq.util.TopicUtil;
+import org.testng.annotations.Test;
+
+public class RouteDetailCacheTest {
+
+ @Test
+ public void parseNormalRouteDetailAndIntern() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/home/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ String receiverUrl = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+
+ ByteString routeKey = toNormalRouteKey(tenantId, matcher, receiverUrl);
+ RouteDetail d1 = RouteDetailCache.get(routeKey);
+ RouteDetail d2 = RouteDetailCache.get(routeKey);
+
+ assertSame(d1, d2);
+ assertEquals(d1.tenantId(), tenantId);
+ assertEquals(d1.receiverUrl(), receiverUrl);
+ assertEquals(d1.matcher().getType(), RouteMatcher.Type.Normal);
+ assertEquals(d1.matcher().getMqttTopicFilter(), topicFilter);
+ }
+
+ @Test
+ public void parseUnorderedShareRouteDetail() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String origTopicFilter = "$share/group-" + UUID.randomUUID() + "/s/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(origTopicFilter);
+
+ ByteString routeKey = toGroupRouteKey(tenantId, matcher);
+ RouteDetail d = RouteDetailCache.get(routeKey);
+
+ assertEquals(d.tenantId(), tenantId);
+ assertEquals(d.receiverUrl(), null);
+ assertEquals(d.matcher().getType(), RouteMatcher.Type.UnorderedShare);
+ assertEquals(d.matcher().getGroup(), matcher.getGroup());
+ assertEquals(d.matcher().getMqttTopicFilter(), origTopicFilter);
+ }
+
+ @Test
+ public void parseOrderedShareRouteDetail() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String origTopicFilter = "$oshare/group-" + UUID.randomUUID() + "/s/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(origTopicFilter);
+
+ ByteString routeKey = toGroupRouteKey(tenantId, matcher);
+ RouteDetail d = RouteDetailCache.get(routeKey);
+
+ assertEquals(d.tenantId(), tenantId);
+ assertEquals(d.receiverUrl(), null);
+ assertEquals(d.matcher().getType(), RouteMatcher.Type.OrderedShare);
+ assertEquals(d.matcher().getGroup(), matcher.getGroup());
+ assertEquals(d.matcher().getMqttTopicFilter(), origTopicFilter);
+ }
+
+ @Test
+ public void equalContentDifferentByteStringInstancesInterned() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/room/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ String receiverUrl = toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+
+ ByteString k1 = toNormalRouteKey(tenantId, matcher, receiverUrl);
+ ByteString k2 = ByteString.copyFrom(k1.toByteArray());
+
+ RouteDetail d1 = RouteDetailCache.get(k1);
+ RouteDetail d2 = RouteDetailCache.get(k2);
+ assertSame(d1, d2);
+ }
+
+ @Test
+ public void differentKeysProduceDifferentDetails() {
+ String tenantId1 = "tenant-" + UUID.randomUUID();
+ String tenantId2 = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/a/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ String receiverUrl = toReceiverUrl(3, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+
+ ByteString k1 = toNormalRouteKey(tenantId1, matcher, receiverUrl);
+ ByteString k2 = toNormalRouteKey(tenantId2, matcher, receiverUrl);
+
+ RouteDetail d1 = RouteDetailCache.get(k1);
+ RouteDetail d2 = RouteDetailCache.get(k2);
+ assertNotSame(d1, d2);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void unsupportedFlagThrows() {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/x/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ String receiverUrl = toReceiverUrl(9, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+
+ ByteString normalKey = toNormalRouteKey(tenantId, matcher, receiverUrl);
+ // Rewrite flag byte to an unsupported value (e.g., 0x7F)
+ short receiverBytesLen = normalKey.substring(normalKey.size() - Short.BYTES).asReadOnlyByteBuffer().getShort();
+ int receiverBytesStartIdx = normalKey.size() - Short.BYTES - receiverBytesLen;
+ int flagByteIdx = receiverBytesStartIdx - 1;
+ ByteString prefix = normalKey.substring(0, flagByteIdx);
+ ByteString invalidFlag = ByteString.copyFrom(new byte[] {(byte) 0x7F});
+ ByteString suffix = normalKey.substring(flagByteIdx + 1);
+ ByteString invalidKey = prefix.concat(invalidFlag).concat(suffix);
+
+ // Should throw UnsupportedOperationException
+ RouteDetailCache.get(invalidKey);
+ }
+
+ @Test
+ public void concurrentGetSameKey() throws Exception {
+ String tenantId = "tenant-" + UUID.randomUUID();
+ String topicFilter = "/home/" + UUID.randomUUID();
+ RouteMatcher matcher = TopicUtil.from(topicFilter);
+ String receiverUrl = toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d-" + UUID.randomUUID());
+ ByteString k = toNormalRouteKey(tenantId, matcher, receiverUrl);
+
+ int threads = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(pool.submit(() -> {
+ start.await();
+ return RouteDetailCache.get(k);
+ }));
+ }
+ start.countDown();
+ RouteDetail first = futures.get(0).get();
+ for (Future f : futures) {
+ assertSame(first, f.get());
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+}
diff --git a/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCacheTest.java b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCacheTest.java
new file mode 100644
index 000000000..644453b6e
--- /dev/null
+++ b/bifromq-dist/bifromq-dist-worker-schema/src/test/java/org/apache/bifromq/dist/worker/schema/cache/RouteGroupCacheTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bifromq.dist.worker.schema.cache;
+
+import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.bifromq.dist.rpc.proto.RouteGroup;
+import org.testng.annotations.Test;
+
+public class RouteGroupCacheTest {
+
+ @Test
+ public void sameByteStringInstanceIsInterned() {
+ RouteGroup group = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .putMembers(toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2"), 2L)
+ .build();
+ ByteString bytes = group.toByteString();
+
+ RouteGroup g1 = RouteGroupCache.get(bytes);
+ RouteGroup g2 = RouteGroupCache.get(bytes);
+
+ assertSame(g1, g2);
+ }
+
+ @Test
+ public void equalContentDifferentByteString() {
+ RouteGroup group = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .build();
+ ByteString bs1 = group.toByteString();
+ ByteString bs2 = ByteString.copyFrom(bs1.toByteArray());
+
+ RouteGroup g1 = RouteGroupCache.get(bs1);
+ RouteGroup g2 = RouteGroupCache.get(bs2);
+
+ // Content equality must hold
+ assertEquals(g1, g2);
+ // Implementation may or may not intern across different ByteString instances; avoid over-constraining
+ assertSame(g1, g2);
+ }
+
+ @Test
+ public void differentContent() {
+ ByteString bs1 = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .build().toByteString();
+ ByteString bs2 = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 2L) // different incarnation
+ .build().toByteString();
+
+ RouteGroup g1 = RouteGroupCache.get(bs1);
+ RouteGroup g2 = RouteGroupCache.get(bs2);
+
+ assertNotEquals(g1, g2);
+ }
+
+ @Test
+ public void invalidBytes() {
+ ByteString invalid = ByteString.copyFromUtf8("not-a-route-group");
+ assertThrows(RuntimeException.class, () -> RouteGroupCache.get(invalid));
+ }
+
+ @Test
+ public void concurrentGetSameKey() throws Exception {
+ RouteGroup group = RouteGroup.newBuilder()
+ .putMembers(toReceiverUrl(1, "inbox-" + UUID.randomUUID(), "d1"), 1L)
+ .putMembers(toReceiverUrl(2, "inbox-" + UUID.randomUUID(), "d2"), 2L)
+ .build();
+ ByteString bytes = group.toByteString();
+
+ int threads = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ CountDownLatch start = new CountDownLatch(1);
+ List