Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.apache.bifromq.baserpc.client;

import java.util.Optional;
import org.apache.bifromq.baserpc.client.loadbalancer.IServerGroupRouter;
import org.apache.bifromq.baserpc.client.loadbalancer.IServerSelector;
import java.util.Optional;

class DummyServerSelector implements IServerSelector {
public static final IServerSelector INSTANCE = new DummyServerSelector();
Expand Down Expand Up @@ -58,6 +58,11 @@ public Optional<String> tryRoundRobin() {
public Optional<String> hashing(String key) {
return Optional.empty();
}

@Override
public Optional<String> stickyHashing(String key) {
return Optional.empty();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.baserpc.client;
Expand Down Expand Up @@ -52,6 +52,7 @@ abstract class ManagedBiDiStream<InT, OutT> {
private final CompositeDisposable disposables = new CompositeDisposable();
private final String tenantId;
private final String wchKey;
private final boolean sticky;
private final String targetServerId;
private final Supplier<Map<String, String>> metadataSupplier;
private final Channel channel;
Expand All @@ -66,18 +67,20 @@ abstract class ManagedBiDiStream<InT, OutT> {
ManagedBiDiStream(String tenantId,
String wchKey,
String targetServerId,
BluePrint.BalanceMode balanceMode,
BluePrint.MethodSemantic methodSemantic,
Supplier<Map<String, String>> metadataSupplier,
Channel channel,
CallOptions callOptions,
MethodDescriptor<InT, OutT> methodDescriptor) {
checkArgument(balanceMode != BluePrint.BalanceMode.DDBalanced || targetServerId != null,
checkArgument(methodSemantic.mode() != BluePrint.BalanceMode.DDBalanced || targetServerId != null,
"targetServerId is required");
checkArgument(balanceMode != BluePrint.BalanceMode.WCHBalanced | wchKey != null, "wchKey is required");
checkArgument(methodSemantic.mode() != BluePrint.BalanceMode.WCHBalanced || wchKey != null,
"wchKey is required");
this.tenantId = tenantId;
this.wchKey = wchKey;
this.targetServerId = targetServerId;
this.balanceMode = balanceMode;
this.balanceMode = methodSemantic.mode();
this.sticky = methodSemantic instanceof BluePrint.HRWPipelineUnaryMethod;
this.metadataSupplier = metadataSupplier;
this.channel = channel;
this.callOptions = callOptions;
Expand Down Expand Up @@ -149,11 +152,11 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
return;
}
Optional<String> currentServer = prevRouter.hashing(wchKey);
Optional<String> newServer = router.hashing(wchKey);
Optional<String> newServer = sticky ? router.stickyHashing(wchKey) : router.hashing(wchKey);
if (newServer.isEmpty()) {
// cancel current bidi-stream
synchronized (this) {
bidiStream.get().bidiStream().cancel("no server available");
bidiStream.get().bidiStream().cancel("No server available");
}
} else if (!newServer.equals(currentServer)) {
switch (state.get()) {
Expand All @@ -179,7 +182,7 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
if (newServer.isEmpty()) {
// cancel current bidi-stream
synchronized (this) {
bidiStream.get().bidiStream().cancel("no server available");
bidiStream.get().bidiStream().cancel("No server available");
}
} else {
switch (state.get()) {
Expand Down Expand Up @@ -212,7 +215,7 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
if (newServer.isEmpty()) {
// cancel current bidi-stream
synchronized (this) {
bidiStream.get().bidiStream().cancel("no server available");
bidiStream.get().bidiStream().cancel("No server available");
}
} else {
switch (state.get()) {
Expand Down Expand Up @@ -294,7 +297,7 @@ private void retarget(IServerSelector serverSelector) {
}
case WCHBalanced -> {
IServerGroupRouter router = serverSelector.get(tenantId);
Optional<String> selectedServer = router.hashing(wchKey);
Optional<String> selectedServer = sticky ? router.stickyHashing(wchKey) : router.hashing(wchKey);
if (selectedServer.isEmpty()) {
state.set(State.NoServerAvailable);
reportServiceUnavailable();
Expand Down Expand Up @@ -423,13 +426,13 @@ public boolean isReady() {
@Override
public void cancel(String message) {
// do nothing
managedBiDiStream.onStreamError(new IllegalStateException("bidi-stream is not ready"));
managedBiDiStream.onStreamError(new IllegalStateException("Stream is not ready"));
}

@Override
public void send(InT in) {
// do nothing
managedBiDiStream.onStreamError(new IllegalStateException("bidi-stream is not ready"));
managedBiDiStream.onStreamError(new IllegalStateException("Stream is not ready"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.baserpc.client;
Expand Down Expand Up @@ -55,7 +55,7 @@ class ManagedMessageStream<MsgT, AckT> extends ManagedBiDiStream<AckT, MsgT>
super(tenantId,
wchKey,
targetServerId,
bluePrint.semantic(methodDescriptor.getFullMethodName()).mode(),
bluePrint.semantic(methodDescriptor.getFullMethodName()),
metadataSupplier,
channelHolder.channel(),
callOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.baserpc.client;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class ManagedRequestPipeline<ReqT, RespT> extends ManagedBiDiStream<ReqT,
super(tenantId,
wchKey,
targetServerId,
bluePrint.semantic(methodDescriptor.getFullMethodName()).mode(),
bluePrint.semantic(methodDescriptor.getFullMethodName()),
metadataSupplier,
channelHolder.channel(),
callOptions,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bifromq.baserpc.client.loadbalancer;

import com.google.common.base.Charsets;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import java.util.Collection;
import java.util.Objects;

/**
* Rendezvous Hashing Weighted Router (HRW).
*/
class HRWRouter<T> {
private static final long IEEE754_DOUBLE_1 = 0x3FF0000000000000L; // 1.0 in IEEE 754 double format
private final Collection<T> nodes;
private final KeyFunction<T> keyFunction;
private final WeightFunction<T> weightFunction;
private final HashFunction hashFunction;

HRWRouter(Collection<T> nodes, KeyFunction<T> keyFunction, WeightFunction<T> weightFunction) {
this(nodes, keyFunction, weightFunction, (key) -> {
HashCode code = Hashing.murmur3_128().hashString(key, Charsets.UTF_8);
return code.asLong();
});
}

HRWRouter(Collection<T> nodes,
KeyFunction<T> keyFunction,
WeightFunction<T> weightFunction,
HashFunction hashFunction) {
this.nodes = Objects.requireNonNull(nodes);
this.keyFunction = Objects.requireNonNull(keyFunction);
this.weightFunction = Objects.requireNonNull(weightFunction);
this.hashFunction = Objects.requireNonNull(hashFunction);
}

// map unsigned long to double in (0,1) uniformly
private static double hashToUnitInterval(long x) {
double u = Double.longBitsToDouble((x >>> 12) | IEEE754_DOUBLE_1) - 1.0;
final double eps = 1e-12;
if (u <= 0) {
u = eps;
}
if (u >= 1) {
u = 1 - eps;
}
return u;
}

/**
* Route to the best node based on the given object key.
*
* @param objectKey the key to route
* @return the best node, or null if no nodes are available
*/
T routeNode(String objectKey) {
if (nodes.isEmpty()) {
return null;
}
T bestNode = null;
double bestScore = Double.POSITIVE_INFINITY;

for (T n : nodes) {
String key = keyFunction.getKey(n);
int w = weightFunction.getWeight(n);
if (w <= 0) {
continue;
}
long h = hashFunction.hash64(objectKey + key);
// Rendezvous/WRH:min(-ln(U)/w)
double u = hashToUnitInterval(h);
double score = -Math.log(u) / (double) w;

if (score < bestScore) {
bestScore = score;
bestNode = n;
}
}
return bestNode;
}

interface KeyFunction<T> {
String getKey(T node);
}

interface WeightFunction<T> {
int getWeight(T node);
}

interface HashFunction {
long hash64(String key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.baserpc.client.loadbalancer;
Expand All @@ -31,4 +31,6 @@ public interface IServerGroupRouter {
Optional<String> tryRoundRobin();

Optional<String> hashing(String key);

Optional<String> stickyHashing(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bifromq.baserpc.client.loadbalancer;

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Set;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode
class TenantAwareServerSelector implements IServerSelector {
private final Map<String, Boolean> allServers;
private final Map<String, Set<String>> serverGroupTags;
private final Map<String, Map<String, Integer>> trafficDirective;
@EqualsAndHashCode.Exclude
private final ITenantRouter tenantRouter;

public TenantAwareServerSelector(Map<String, Boolean> allServers,
Map<String, Set<String>> serverGroupTags,
Map<String, Map<String, Integer>> trafficDirective) {
this.allServers = Maps.newHashMap(allServers);
this.serverGroupTags = Maps.newHashMap(serverGroupTags);
this.trafficDirective = Maps.newHashMap(trafficDirective);
this.tenantRouter = new TenantRouter(this.allServers, this.trafficDirective, this.serverGroupTags);
}

@Override
public boolean exists(String serverId) {
return allServers.containsKey(serverId);
}

@Override
public IServerGroupRouter get(String tenantId) {
return tenantRouter.get(tenantId);
}

@Override
public String toString() {
return allServers.toString();
}
}
Loading
Loading