Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bifromq-dist/bifromq-dist-coproc-proto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>org.apache.bifromq</groupId>
<artifactId>bifromq-common-type</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bifromq</groupId>
<artifactId>bifromq-dist-worker-schema</artifactId>
Expand All @@ -47,6 +52,10 @@
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand All @@ -71,4 +80,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.dist.trie;
Expand All @@ -29,7 +29,19 @@
*
* @param <V> the value type for topic associated value
*/
public interface ITopicFilterIterator<V> {
public interface ITopicFilterIterator<V> extends AutoCloseable {
/**
* Init the iterator with the given root node.
*
* @param root the root node of the topic trie
*/
void init(TopicTrieNode<V> root);

/**
* Reset the iterator after using.
*/
void close();

/**
* Seek to the given topic filter levels, so that the next topic filter is greater or equals to the given topic
* filter levels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,105 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.dist.trie;

import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

import com.google.common.collect.Sets;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Ticker;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* Multi-level topic filter trie node.
*
* @param <V> value type
*/
final class MTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
private final Set<TopicTrieNode<V>> backingTopics;
private static final ConcurrentLinkedDeque<Long> KEYS = new ConcurrentLinkedDeque<>();
private static final AtomicLong SEQ = new AtomicLong();
private static volatile Ticker TICKER = Ticker.systemTicker();
private static final Cache<Long, MTopicFilterTrieNode<?>> POOL = Caffeine.newBuilder()
.expireAfterAccess(EXPIRE_AFTER)
.recordStats()
.scheduler(Scheduler.systemScheduler())
.ticker(() -> TICKER.read())
.removalListener((Long key, MTopicFilterTrieNode<?> value, RemovalCause cause) -> {
KEYS.remove(key);
if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
value.recycle();
}
})
.build();

private final Set<TopicTrieNode<V>> backingTopics = new HashSet<>();

MTopicFilterTrieNode() {
}

static <V> MTopicFilterTrieNode<V> borrow(TopicFilterTrieNode<V> parent,
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
while (true) {
Long key = KEYS.pollFirst();
if (key == null) {
break;
}
@SuppressWarnings("unchecked")
MTopicFilterTrieNode<V> pooled = (MTopicFilterTrieNode<V>) POOL.asMap().remove(key);
if (pooled != null) {
return pooled.init(parent, siblingTopicTrieNodes);
}
}
MTopicFilterTrieNode<V> node = new MTopicFilterTrieNode<>();
return node.init(parent, siblingTopicTrieNodes);
}

static void release(MTopicFilterTrieNode<?> node) {
node.recycle();
long key = SEQ.incrementAndGet();
KEYS.offerLast(key);
POOL.put(key, node);
}

// test hooks (package-private)
static void poolClear() {
POOL.invalidateAll();
POOL.cleanUp();
KEYS.clear();
}

static void poolCleanUp() {
POOL.cleanUp();
}

static int poolApproxSize() {
return KEYS.size();
}

static void setTicker(Ticker ticker) {
TICKER = ticker != null ? ticker : Ticker.systemTicker();
}

MTopicFilterTrieNode(TopicFilterTrieNode<V> parent, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
super(parent);
Set<TopicTrieNode<V>> topics = parent != null ? parent.backingTopics() : emptySet();
MTopicFilterTrieNode<V> init(TopicFilterTrieNode<V> parent, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
assert siblingTopicTrieNodes != null;
this.parent = parent;
backingTopics.clear();
if (parent != null) {
backingTopics.addAll(parent.backingTopics());
}
for (TopicTrieNode<V> sibling : siblingTopicTrieNodes) {
topics = collectTopics(sibling, topics);
collectTopics(sibling);
}
backingTopics = topics;
return this;
}

@Override
Expand All @@ -54,17 +125,15 @@ Set<TopicTrieNode<V>> backingTopics() {
return backingTopics;
}

private Set<TopicTrieNode<V>> collectTopics(TopicTrieNode<V> node, Set<TopicTrieNode<V>> topics) {
private void collectTopics(TopicTrieNode<V> node) {
if (node.isUserTopic()) {
topics = Sets.union(topics, singleton(node));
backingTopics.add(node);
}
for (TopicTrieNode<V> child : node.children().values()) {
topics = collectTopics(child, topics);
collectTopics(child);
}
return topics;
}


@Override
void seekChild(String childLevelName) {

Expand Down Expand Up @@ -104,4 +173,9 @@ void prevChild() {
TopicFilterTrieNode<V> childNode() {
throw new NoSuchElementException();
}

private void recycle() {
parent = null;
backingTopics.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.dist.trie;

import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
import static org.apache.bifromq.util.TopicConst.SINGLE_WILDCARD;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Ticker;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -30,31 +35,98 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* Normal level topic filter trie node.
*
* @param <V> value type
*/
final class NTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
private final String levelName;
private final NavigableSet<String> subLevelNames;
private final NavigableMap<String, Set<TopicTrieNode<V>>> subTopicTrieNodes;
private final Set<TopicTrieNode<V>> subWildcardMatchableTopicTrieNodes;
private final Set<TopicTrieNode<V>> backingTopics;

private static final ConcurrentLinkedDeque<Long> KEYS = new ConcurrentLinkedDeque<>();
private static final AtomicLong SEQ = new AtomicLong();
private static volatile Ticker TICKER = Ticker.systemTicker();
private static final Cache<Long, NTopicFilterTrieNode<?>> POOL = Caffeine.newBuilder()
.expireAfterAccess(EXPIRE_AFTER)
.recordStats()
.scheduler(Scheduler.systemScheduler())
.ticker(() -> TICKER.read())
.removalListener((Long key, NTopicFilterTrieNode<?> value, RemovalCause cause) -> {
KEYS.remove(key);
if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
value.recycle();
}
})
.build();

private final NavigableSet<String> subLevelNames = new TreeSet<>();
private final NavigableMap<String, Set<TopicTrieNode<V>>> subTopicTrieNodes = new TreeMap<>();
private final Set<TopicTrieNode<V>> subWildcardMatchableTopicTrieNodes = new HashSet<>();
private final Set<TopicTrieNode<V>> backingTopics = new HashSet<>();
private String levelName;
// point to the sub node during iteration
private String subLevelName;

NTopicFilterTrieNode(TopicFilterTrieNode<V> parent, String levelName, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
super(parent);
NTopicFilterTrieNode() {
}

static <V> NTopicFilterTrieNode<V> borrow(TopicFilterTrieNode<V> parent,
String levelName,
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
while (true) {
Long key = KEYS.pollFirst();
if (key == null) {
break;
}
@SuppressWarnings("unchecked")
NTopicFilterTrieNode<V> pooled = (NTopicFilterTrieNode<V>) POOL.asMap().remove(key);
if (pooled != null) {
return pooled.init(parent, levelName, siblingTopicTrieNodes);
}
}
NTopicFilterTrieNode<V> node = new NTopicFilterTrieNode<>();
return node.init(parent, levelName, siblingTopicTrieNodes);
}

static void release(NTopicFilterTrieNode<?> node) {
node.recycle();
long key = SEQ.incrementAndGet();
KEYS.offerLast(key);
POOL.put(key, node);
}

// test hooks (package-private)
static void poolClear() {
POOL.invalidateAll();
POOL.cleanUp();
KEYS.clear();
}

static void poolCleanUp() {
POOL.cleanUp();
}

static int poolApproxSize() {
return KEYS.size();
}

static void setTicker(Ticker ticker) {
TICKER = ticker != null ? ticker : Ticker.systemTicker();
}

NTopicFilterTrieNode<V> init(TopicFilterTrieNode<V> parent, String levelName,
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
assert levelName != null;
assert siblingTopicTrieNodes != null;
assert siblingTopicTrieNodes.stream().allMatch(node -> node.levelName().equals(levelName));
this.subTopicTrieNodes = new TreeMap<>();
this.subLevelNames = new TreeSet<>();
this.subWildcardMatchableTopicTrieNodes = new HashSet<>();
this.parent = parent;
this.levelName = levelName;
this.backingTopics = new HashSet<>();
subLevelName = null;
subLevelNames.clear();
subTopicTrieNodes.clear();
subWildcardMatchableTopicTrieNodes.clear();
backingTopics.clear();
for (TopicTrieNode<V> sibling : siblingTopicTrieNodes) {
if (sibling.isUserTopic()) {
backingTopics.add(sibling);
Expand All @@ -77,6 +149,7 @@ final class NTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
subLevelNames.add(SINGLE_WILDCARD);
}
seekChild("");
return this;
}

@Override
Expand Down Expand Up @@ -142,9 +215,19 @@ TopicFilterTrieNode<V> childNode() {
throw new NoSuchElementException();
}
return switch (subLevelName) {
case MULTI_WILDCARD -> new MTopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
case SINGLE_WILDCARD -> new STopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
default -> new NTopicFilterTrieNode<>(this, subLevelName, subTopicTrieNodes.get(subLevelName));
case MULTI_WILDCARD -> MTopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
case SINGLE_WILDCARD -> STopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
default -> NTopicFilterTrieNode.borrow(this, subLevelName, subTopicTrieNodes.get(subLevelName));
};
}

private void recycle() {
parent = null;
levelName = null;
subLevelName = null;
subLevelNames.clear();
subTopicTrieNodes.clear();
subWildcardMatchableTopicTrieNodes.clear();
backingTopics.clear();
}
}
Loading
Loading