Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bifromq-dist/bifromq-dist-worker-schema/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

<artifactId>bifromq-dist-worker-schema</artifactId>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bifromq</groupId>
<artifactId>bifromq-util</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Represent a group matching route.
*/
@EqualsAndHashCode(callSuper = true)
@EqualsAndHashCode(callSuper = true, cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public final class GroupMatching extends Matching {
@EqualsAndHashCode.Exclude
Expand All @@ -41,8 +41,9 @@ public final class GroupMatching extends Matching {
GroupMatching(String tenantId, RouteMatcher matcher, Map<String, Long> members) {
super(tenantId, matcher);
assert matcher.getType() != RouteMatcher.Type.Normal;
this.ordered = matcher.getType() == RouteMatcher.Type.OrderedShare;
this.receivers = members;

this.ordered = matcher.getType() == RouteMatcher.Type.OrderedShare;
this.receiverList = members.entrySet().stream()
.map(e -> new NormalMatching(tenantId, matcher, e.getKey(), e.getValue()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.dist.worker.schema;

import static com.google.protobuf.ByteString.copyFromUtf8;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.apache.bifromq.util.BSUtil.toByteString;
import static org.apache.bifromq.util.BSUtil.toShort;
import static org.apache.bifromq.util.TopicConst.DELIMITER;
Expand All @@ -27,15 +29,14 @@
import static org.apache.bifromq.util.TopicConst.UNORDERED_SHARE;
import static org.apache.bifromq.util.TopicUtil.parse;
import static org.apache.bifromq.util.TopicUtil.unescape;
import static com.google.protobuf.ByteString.copyFromUtf8;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;

import com.github.benmanes.caffeine.cache.Interner;
import com.google.protobuf.ByteString;
import java.util.List;
import org.apache.bifromq.dist.rpc.proto.MatchRoute;
import org.apache.bifromq.dist.rpc.proto.RouteGroup;
import org.apache.bifromq.type.RouteMatcher;
import org.apache.bifromq.util.BSUtil;
import com.google.protobuf.ByteString;
import java.util.List;

/**
* Utility for working with the data stored in dist worker.
Expand All @@ -50,6 +51,8 @@ public class KVSchemaUtil {
private static final ByteString FLAG_NORMAL_VAL = ByteString.copyFrom(new byte[] {FLAG_NORMAL});
private static final ByteString FLAG_UNORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_UNORDERED});
private static final ByteString FLAG_ORDERED_VAL = ByteString.copyFrom(new byte[] {FLAG_ORDERED});
private static final Interner<Matching> MATCHING_INTERNER = Interner.newWeakInterner();
private static final Interner<Receiver> RECEIVER_INTERNER = Interner.newWeakInterner();

public static String toReceiverUrl(MatchRoute route) {
return toReceiverUrl(route.getBrokerId(), route.getReceiverId(), route.getDelivererKey());
Expand All @@ -61,26 +64,35 @@ public static String toReceiverUrl(int subBrokerId, String receiverId, String de

public static Receiver parseReceiver(String receiverUrl) {
String[] parts = receiverUrl.split(NUL);
return new Receiver(Integer.parseInt(parts[0]), parts[1], parts[2]);
return RECEIVER_INTERNER.intern(new Receiver(Integer.parseInt(parts[0]), parts[1], parts[2]));
}

public static Matching buildMatchRoute(ByteString routeKey, ByteString routeValue) {
RouteDetail routeDetail = parseRouteDetail(routeKey);
try {
if (routeDetail.matcher().getType() == RouteMatcher.Type.Normal) {
return new NormalMatching(routeDetail.tenantId(),
routeDetail.matcher(),
routeDetail.receiverUrl(),
BSUtil.toLong(routeValue));
return buildNormalMatchRoute(routeDetail, BSUtil.toLong(routeValue));
}
return new GroupMatching(routeDetail.tenantId(),
routeDetail.matcher(),
RouteGroup.parseFrom(routeValue).getMembersMap());
return buildGroupMatchRoute(routeDetail, RouteGroup.parseFrom(routeValue));
} catch (Exception e) {
throw new IllegalStateException("Unable to parse matching record", e);
}
}

public static Matching buildNormalMatchRoute(RouteDetail routeDetail, long incarnation) {
assert routeDetail.matcher().getType() == RouteMatcher.Type.Normal;
return MATCHING_INTERNER.intern(new NormalMatching(routeDetail.tenantId(),
routeDetail.matcher(),
routeDetail.receiverUrl(),
incarnation));
}

public static Matching buildGroupMatchRoute(RouteDetail routeDetail, RouteGroup group) {
assert routeDetail.matcher().getType() != RouteMatcher.Type.Normal;
return MATCHING_INTERNER.intern(new GroupMatching(routeDetail.tenantId(), routeDetail.matcher(),
group.getMembersMap()));
}

public static ByteString tenantBeginKey(String tenantId) {
ByteString tenantIdBytes = copyFromUtf8(tenantId);
return SCHEMA_VER.concat(toByteString((short) tenantIdBytes.size()).concat(tenantIdBytes));
Expand All @@ -107,8 +119,7 @@ public static ByteString toNormalRouteKey(String tenantId, RouteMatcher routeMat

public static ByteString toGroupRouteKey(String tenantId, RouteMatcher routeMatcher) {
assert routeMatcher.getType() != RouteMatcher.Type.Normal;
return tenantRouteBucketStartKey(tenantId, routeMatcher.getFilterLevelList(),
bucket(routeMatcher.getGroup()))
return tenantRouteBucketStartKey(tenantId, routeMatcher.getFilterLevelList(), bucket(routeMatcher.getGroup()))
.concat(routeMatcher.getType() == RouteMatcher.Type.OrderedShare ? FLAG_ORDERED_VAL : FLAG_UNORDERED_VAL)
.concat(toReceiverBytes(routeMatcher.getGroup()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* The abstract class of matching route.
*/
@EqualsAndHashCode
@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public abstract class Matching {
@EqualsAndHashCode.Exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Represent a normal matching route.
*/
@EqualsAndHashCode(callSuper = true)
@EqualsAndHashCode(callSuper = true, cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@ToString
public class NormalMatching extends Matching {
private final String receiverUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.dist.worker.schema;
Expand All @@ -24,22 +24,63 @@
import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toNormalRouteKey;
import static org.apache.bifromq.dist.worker.schema.KVSchemaUtil.toReceiverUrl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

import com.google.protobuf.ByteString;
import org.apache.bifromq.dist.rpc.proto.MatchRoute;
import org.apache.bifromq.dist.rpc.proto.RouteGroup;
import org.apache.bifromq.type.MatchInfo;
import org.apache.bifromq.type.RouteMatcher;
import org.apache.bifromq.util.BSUtil;
import org.apache.bifromq.util.TopicUtil;
import com.google.protobuf.ByteString;
import org.testng.annotations.Test;

public class KVSchemaUtilTest {
private static final int MqttBroker = 0;
private static final int InboxService = 1;
private static final int RuleEngine = 2;

@Test
public void testNormalMatchingIntern() {
String tenantId = "tenantId";
String topicFilter = "/a/b/c";
RouteMatcher matcher = TopicUtil.from(topicFilter);
MatchRoute route = MatchRoute.newBuilder()
.setMatcher(matcher)
.setBrokerId(MqttBroker)
.setReceiverId("inbox1")
.setDelivererKey("delivererKey1")
.setIncarnation(1L)
.build();
ByteString key = toNormalRouteKey(tenantId, matcher, toReceiverUrl(route));
Matching matching1 = buildMatchRoute(key, BSUtil.toByteString(route.getIncarnation()));
Matching matching2 = buildMatchRoute(key, BSUtil.toByteString(route.getIncarnation()));
assertSame(matching1, matching2);
}

@Test
public void testGroupMatchingIntern() {
String origTopicFilter = "$share/group//a/b/c";
RouteMatcher matcher = TopicUtil.from(origTopicFilter);
MatchRoute route = MatchRoute.newBuilder()
.setMatcher(matcher)
.setBrokerId(MqttBroker)
.setReceiverId("inbox1")
.setDelivererKey("server1")
.setIncarnation(1L)
.build();

String scopedReceiverId = toReceiverUrl(MqttBroker, "inbox1", "server1");
ByteString key = toGroupRouteKey("tenantId", matcher);
RouteGroup groupMembers = RouteGroup.newBuilder()
.putMembers(scopedReceiverId, route.getIncarnation())
.build();
Matching matching1 = buildMatchRoute(key, groupMembers.toByteString());
Matching matching2 = buildMatchRoute(key, groupMembers.toByteString());
assertSame(matching1, matching2);
}

@Test
public void testParseNormalMatchRecord() {
String tenantId = "tenantId";
Expand Down
Loading
Loading