Skip to content

Commit 99e6afb

Browse files
authored
feat: support least connection load balance. (#620)
1 parent 9ad54d0 commit 99e6afb

File tree

12 files changed

+286
-23
lines changed

12 files changed

+286
-23
lines changed

polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/consumer/LoadBalanceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public interface LoadBalanceConfig extends PluginConfig, Verifier {
4444
String LOAD_BALANCE_WEIGHTED_ROUND_ROBIN = "weightedRoundRobin";
4545

4646
String LOAD_BALANCE_SHORTEST_RESPONSE_TIME = "shortestResponseTime";
47+
48+
String LOAD_BALANCE_LEAST_CONNECTION = "leastConnection";
4749
/**
4850
* 轮询负载均衡插件名
4951
*/

polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/InstanceStatistic.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
import java.util.concurrent.atomic.AtomicLong;
2121

22+
/**
23+
* Instance Invocation Statistic
24+
*
25+
* @author Yuwei Fu
26+
*/
27+
2228
public class InstanceStatistic {
2329

2430
/**
@@ -53,6 +59,10 @@ public class InstanceStatistic {
5359
* 成功调用最大耗时
5460
*/
5561
private final AtomicLong succeededMaxElapsed;
62+
/**
63+
* 当前实例的连接数
64+
*/
65+
private final AtomicLong active = new AtomicLong(0);
5666

5767
public InstanceStatistic() {
5868
this(0, 0, 0, 0, 0, 0, 0, 0);
@@ -113,6 +123,20 @@ public long getSucceededMaxElapsed() {
113123
return succeededMaxElapsed.get();
114124
}
115125

126+
public long getActive() {
127+
return active.get();
128+
}
129+
130+
public long incrementAndGetActive() {
131+
return active.incrementAndGet();
132+
}
133+
134+
public long decrementAndGetActive() {
135+
return active.decrementAndGet();
136+
}
137+
138+
139+
116140
@Override
117141
public String toString() {
118142
return "InstanceStatistic{" +

polaris-dependencies/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,21 @@
302302
<artifactId>loadbalancer-ringhash</artifactId>
303303
<version>${project.version}</version>
304304
</dependency>
305+
<dependency>
306+
<groupId>com.tencent.polaris</groupId>
307+
<artifactId>loadbalancer-roundrobin</artifactId>
308+
<version>${project.version}</version>
309+
</dependency>
310+
<dependency>
311+
<groupId>com.tencent.polaris</groupId>
312+
<artifactId>loadbalancer-shortest-response-time</artifactId>
313+
<version>${project.version}</version>
314+
</dependency>
315+
<dependency>
316+
<groupId>com.tencent.polaris</groupId>
317+
<artifactId>loadbalancer-least-connection</artifactId>
318+
<version>${project.version}</version>
319+
</dependency>
305320

306321
<!-- polaris-plugins-observability-->
307322
<dependency>

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/InstancesStatisticUpdater.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717

1818
package com.tencent.polaris.discovery.client.stat;
1919

20-
import static com.tencent.polaris.api.exception.ErrorCode.INSTANCE_NOT_FOUND;
21-
import static com.tencent.polaris.api.exception.ErrorCode.SERVICE_NOT_FOUND;
22-
import static com.tencent.polaris.api.plugin.registry.InstanceProperty.PROPERTY_INSTANCE_STATISTIC;
2320
import static com.tencent.polaris.api.pojo.RetStatus.RetSuccess;
2421

25-
import com.tencent.polaris.api.exception.PolarisException;
2622
import com.tencent.polaris.api.plugin.registry.LocalRegistry;
2723
import com.tencent.polaris.api.plugin.registry.ResourceFilter;
2824
import com.tencent.polaris.api.pojo.Instance;
@@ -38,6 +34,11 @@
3834
import java.util.List;
3935
import org.slf4j.Logger;
4036

37+
/**
38+
* InstancesStatisticUpdater
39+
*
40+
* @author Yuwei Fu
41+
*/
4142
public class InstancesStatisticUpdater {
4243

4344
private static final Logger LOG = LoggerFactory.getLogger(InstancesDetectTask.class);
@@ -50,7 +51,7 @@ public InstancesStatisticUpdater(LocalRegistry localRegistry) {
5051
this.localRegistry = localRegistry;
5152
}
5253

53-
public void updateInstanceStatistic(InstanceGauge result) throws PolarisException {
54+
public void updateInstanceStatistic(InstanceGauge result) {
5455
ServiceKey serviceKey = new ServiceKey(result.getNamespace(), result.getService());
5556
ServiceEventKey serviceEventKey = new ServiceEventKey(serviceKey, EventType.INSTANCE);
5657
ServiceInstances serviceInstances = localRegistry.getInstances(new ResourceFilter(serviceEventKey, true, true));
@@ -80,6 +81,7 @@ public void updateInstanceStatistic(InstanceGauge result) throws PolarisExceptio
8081
if (targetInstance != null) {
8182
InstanceStatistic instanceStatistic = targetInstance.getInstanceLocalValue().getInstanceStatistic();
8283
instanceStatistic.count(result.getDelay(), RetSuccess.equals(result.getRetStatus()));
84+
instanceStatistic.decrementAndGetActive();
8385
LOG.debug(
8486
"[InstanceStatisticUpdater]: " + targetInstance.getHost() + ":" + targetInstance.getPort() + ":"
8587
+ result.getPort() + ": Delay: " + result.getDelay() + "TotalCount"

polaris-discovery/polaris-discovery-factory/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@
105105
<artifactId>loadbalancer-shortest-response-time</artifactId>
106106
<version>${project.version}</version>
107107
</dependency>
108+
<dependency>
109+
<groupId>com.tencent.polaris</groupId>
110+
<artifactId>loadbalancer-least-connection</artifactId>
111+
<version>${project.version}</version>
112+
</dependency>
108113
<!-- 依赖节点级熔断插件-->
109114
<dependency>
110115
<groupId>com.tencent.polaris</groupId>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>com.tencent.polaris</groupId>
8+
<artifactId>polaris-plugins-loadbalancer</artifactId>
9+
<version>${revision}</version>
10+
<relativePath>../pom.xml</relativePath>
11+
</parent>
12+
13+
<artifactId>loadbalancer-least-connection</artifactId>
14+
<name>Polaris Plugins Loadbalancer Least Connection</name>
15+
<description>Polaris Plugins Loadbalancer Least Connection </description>
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.tencent.polaris</groupId>
19+
<artifactId>polaris-client</artifactId>
20+
<version>${revision}</version>
21+
</dependency>
22+
</dependencies>
23+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making polaris-java available.
3+
*
4+
* Copyright (C) 2021 Tencent. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.plugins.loadbalancer.leastconnection;
19+
20+
import com.tencent.polaris.api.config.consumer.LoadBalanceConfig;
21+
import com.tencent.polaris.api.control.Destroyable;
22+
import com.tencent.polaris.api.exception.ErrorCode;
23+
import com.tencent.polaris.api.exception.PolarisException;
24+
import com.tencent.polaris.api.plugin.PluginType;
25+
import com.tencent.polaris.api.plugin.common.InitContext;
26+
import com.tencent.polaris.api.plugin.common.PluginTypes;
27+
import com.tencent.polaris.api.plugin.compose.Extensions;
28+
import com.tencent.polaris.api.plugin.loadbalance.LoadBalancer;
29+
import com.tencent.polaris.api.plugin.registry.LocalRegistry;
30+
import com.tencent.polaris.api.pojo.DefaultServiceEventKeysProvider;
31+
import com.tencent.polaris.api.pojo.Instance;
32+
import com.tencent.polaris.api.pojo.ServiceEventKey;
33+
import com.tencent.polaris.api.pojo.ServiceEventKey.EventType;
34+
import com.tencent.polaris.api.pojo.ServiceInstances;
35+
import com.tencent.polaris.api.pojo.ServiceKey;
36+
import com.tencent.polaris.api.rpc.Criteria;
37+
import com.tencent.polaris.api.utils.CollectionUtils;
38+
import com.tencent.polaris.client.flow.BaseFlow;
39+
import com.tencent.polaris.client.flow.DefaultFlowControlParam;
40+
import com.tencent.polaris.client.flow.ResourcesResponse;
41+
import com.tencent.polaris.client.pojo.InstanceByProto;
42+
import com.tencent.polaris.logging.LoggerFactory;
43+
import java.util.HashMap;
44+
import java.util.HashSet;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Set;
48+
import java.util.stream.Collectors;
49+
import org.slf4j.Logger;
50+
51+
/**
52+
* Least Connections Load Balancer
53+
*
54+
* @author Yuwei Fu
55+
*/
56+
public class LeastConnectionLoadBalance extends Destroyable implements LoadBalancer {
57+
58+
private LocalRegistry localRegistry;
59+
60+
private Extensions extensions;
61+
62+
private static final Logger LOG = LoggerFactory.getLogger(LeastConnectionLoadBalance.class);
63+
64+
@Override
65+
public Instance chooseInstance(Criteria criteria, ServiceInstances instances) throws PolarisException {
66+
if (instances == null || CollectionUtils.isEmpty(instances.getInstances())) {
67+
throw new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "request instances is empty");
68+
}
69+
ServiceKey serviceKey = instances.getServiceKey();
70+
List<Instance> requestInstanceList = instances.getInstances();
71+
Map<String, Instance> requestInstanceMap = new HashMap<>(requestInstanceList.size());
72+
for (Instance instance : requestInstanceList) {
73+
if (instance.getWeight() != 0) {
74+
requestInstanceMap.put(instance.getHost() + ":" + instance.getPort(), instance);
75+
}
76+
}
77+
ServiceEventKey serviceEventKey = new ServiceEventKey(serviceKey, EventType.INSTANCE);
78+
Set<ServiceEventKey> serviceEventKeySet = new HashSet<>();
79+
serviceEventKeySet.add(serviceEventKey);
80+
DefaultServiceEventKeysProvider svcKeysProvider = new DefaultServiceEventKeysProvider();
81+
svcKeysProvider.setSvcEventKeys(serviceEventKeySet);
82+
DefaultFlowControlParam engineFlowControlParam = new DefaultFlowControlParam();
83+
ResourcesResponse resourcesResponse = BaseFlow.syncGetResources(extensions, false, svcKeysProvider,
84+
engineFlowControlParam);
85+
ServiceInstances serviceInstances = resourcesResponse.getServiceInstances(serviceEventKey);
86+
87+
List<Instance> localInstanceList = serviceInstances.getInstances();
88+
if (CollectionUtils.isEmpty(localInstanceList)) {
89+
throw new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "local instances is empty");
90+
}
91+
// intersection lookup
92+
List<Instance> instanceList = localInstanceList.stream()
93+
.filter(instance -> requestInstanceMap.containsKey(instance.getHost() + ":" + instance.getPort()))
94+
.collect(Collectors.toList());
95+
if (instanceList.isEmpty()) {
96+
throw new PolarisException(ErrorCode.INSTANCE_NOT_FOUND,
97+
"No instance found. serviceKey=" + serviceKey.toString());
98+
}
99+
100+
int[] candidateIndexes = new int[instanceList.size()];
101+
int sameActiveCount = 0;
102+
long leastActive = Long.MAX_VALUE;
103+
long[] instanceActive = new long[instanceList.size()];
104+
for (int i = 0; i < instanceList.size(); i++) {
105+
InstanceByProto instance = (InstanceByProto) instanceList.get(i);
106+
long curActive = instance.getInstanceLocalValue().getInstanceStatistic().getActive();
107+
instanceActive[i] = curActive;
108+
if (curActive < leastActive) {
109+
leastActive = curActive;
110+
sameActiveCount = 0;
111+
candidateIndexes[sameActiveCount++] = i;
112+
} else if (curActive == leastActive) {
113+
candidateIndexes[sameActiveCount++] = i;
114+
}
115+
}
116+
// If there is only one instance with the same least active count,
117+
// return it directly and increase its active count.
118+
if (sameActiveCount == 1) {
119+
InstanceByProto targetInstance = (InstanceByProto) instanceList.get(candidateIndexes[0]);
120+
targetInstance.getInstanceLocalValue().getInstanceStatistic().incrementAndGetActive();
121+
LOG.debug("[LeastConnectionLoadBalance] instances active count: {}, choose instance: {}:{}", instanceActive,
122+
targetInstance.getHost(), targetInstance.getPort());
123+
return requestInstanceMap.get(targetInstance.getHost() + ":" + targetInstance.getPort());
124+
}
125+
// If there are multiple instances with the same active connections,
126+
// randomly select one by weight and increase its active count.
127+
int[] candidatesWeights = new int[sameActiveCount];
128+
int totalWeight = 0;
129+
for (int i = 0; i < sameActiveCount; i++) {
130+
candidatesWeights[i] = (instanceList.get(candidateIndexes[i])).getWeight();
131+
totalWeight += candidatesWeights[i];
132+
}
133+
int randomWeight = (int) (Math.random() * totalWeight);
134+
for (int i = 0; i < sameActiveCount; i++) {
135+
randomWeight -= candidatesWeights[i];
136+
if (randomWeight < 0) {
137+
InstanceByProto targetInstance = (InstanceByProto) instanceList.get(candidateIndexes[i]);
138+
targetInstance.getInstanceLocalValue().getInstanceStatistic().incrementAndGetActive();
139+
LOG.debug("[LeastConnectionLoadBalance] instances active count: {}, choose instance: {}:{}",
140+
instanceActive, targetInstance.getHost(), targetInstance.getPort());
141+
return requestInstanceMap.get(targetInstance.getHost() + ":" + targetInstance.getPort());
142+
}
143+
}
144+
throw new PolarisException(ErrorCode.INSTANCE_NOT_FOUND,
145+
"No instance selected. serviceKey=" + serviceKey.toString());
146+
}
147+
148+
149+
@Override
150+
public String getName() {
151+
return LoadBalanceConfig.LOAD_BALANCE_LEAST_CONNECTION;
152+
}
153+
154+
@Override
155+
public PluginType getType() {
156+
return PluginTypes.LOAD_BALANCER.getBaseType();
157+
}
158+
159+
@Override
160+
public void init(InitContext ctx) {
161+
}
162+
163+
@Override
164+
public void postContextInit(Extensions extensions) throws PolarisException {
165+
this.extensions = extensions;
166+
localRegistry = extensions.getLocalRegistry();
167+
}
168+
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.tencent.polaris.plugins.loadbalancer.leastconnection.LeastConnectionLoadBalance

polaris-plugins/polaris-plugins-loadbalancer/loadbalancer-shortest-response-time/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
</parent>
1212

1313
<artifactId>loadbalancer-shortest-response-time</artifactId>
14+
<name>Polaris Plugins Loadbalancer Shortest Response Time</name>
15+
<description>Polaris Plugins Loadbalancer Shortest Response Time</description>
1416
<dependencies>
1517
<dependency>
1618
<groupId>com.tencent.polaris</groupId>

0 commit comments

Comments
 (0)