Skip to content

Commit d0b7f0e

Browse files
feat:optimize heartbeat and re-register. (#596)
* feat:optimize heartbeat and re-register. * feat:optimize heartbeat and re-register.
1 parent acc74a1 commit d0b7f0e

File tree

3 files changed

+294
-28
lines changed

3 files changed

+294
-28
lines changed

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,23 @@
1818
package com.tencent.polaris.discovery.client.flow;
1919

2020
import com.tencent.polaris.api.exception.PolarisException;
21+
import com.tencent.polaris.api.exception.ServerCodes;
2122
import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest;
2223
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
2324
import com.tencent.polaris.api.rpc.InstanceRegisterResponse;
2425
import com.tencent.polaris.client.api.SDKContext;
2526
import com.tencent.polaris.client.util.NamedThreadFactory;
2627
import com.tencent.polaris.discovery.client.flow.RegisterStateManager.RegisterState;
2728
import com.tencent.polaris.logging.LoggerFactory;
29+
import com.tencent.polaris.logging.LoggingConsts;
30+
import org.slf4j.Logger;
31+
2832
import java.util.HashMap;
2933
import java.util.Map;
34+
import java.util.Random;
3035
import java.util.concurrent.ScheduledThreadPoolExecutor;
3136
import java.util.concurrent.TimeUnit;
3237

33-
import com.tencent.polaris.logging.LoggingConsts;
34-
import org.slf4j.Logger;
35-
3638
/**
3739
* 异步注册流
3840
*
@@ -75,23 +77,32 @@ public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request
7577
return instanceRegisterResponse;
7678
}
7779

78-
private void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
80+
void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
7981
HeartbeatFunction heartbeatFunction) {
8082
InstanceRegisterRequest registerRequest = registerState.getInstanceRegisterRequest();
8183
LOG.info("[AsyncHeartbeat]Instance heartbeat task started, namespace:{}, service:{}, host:{}, port:{}",
8284
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
8385
registerRequest.getPort());
8486
try {
8587
heartbeatFunction.doHeartbeat(buildHeartbeatRequest(registerRequest));
86-
LOG.info("[AsyncHeartbeat]Instance heartbeat success, namespace:{}, service:{}, host:{}, port:{}",
88+
registerState.resetFailCount();
89+
LOG.info("[AsyncHeartbeat]Instance heartbeat success. Reset fail count. namespace:{}, service:{}, host:{}, port:{}",
8790
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
8891
registerRequest.getPort());
8992
} catch (PolarisException e) {
90-
registerState.incrementFailCount();
91-
LOG.error(
92-
"[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
93-
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
94-
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
93+
if (e.getServerErrCode() == ServerCodes.NOT_FOUND_RESOURCE) {
94+
registerState.incrementFailCount();
95+
LOG.error("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. " +
96+
"namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
97+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
98+
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
99+
} else {
100+
registerState.resetFailCount();
101+
LOG.error("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. " +
102+
"namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
103+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
104+
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
105+
}
95106
}
96107

97108
long minRegisterInterval = sdkContext.getConfig().getProvider().getMinRegisterInterval();
@@ -100,17 +111,35 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist
100111
|| registerState.getHeartbeatFailCounter() < HEARTBEAT_FAIL_COUNT_THRESHOLD) {
101112
return;
102113
}
103-
try {
104-
registerFunction.doRegister(registerRequest, createRegisterV2Header());
105-
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
106-
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
107-
registerRequest.getPort());
108-
registerState.resetFailCount();
109-
} catch (PolarisException e) {
110-
LOG.error(
111-
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, err:{}",
112-
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
113-
registerRequest.getPort(), e);
114+
115+
synchronized (registerState) {
116+
if (registerState.getReRegisterFuture() == null
117+
|| registerState.getReRegisterFuture().isDone()
118+
|| registerState.getReRegisterFuture().isCancelled()) {
119+
int reRegisterCounter = registerState.getReRegisterCounter();
120+
double base = reRegisterCounter == 0 ? 0 : registerRequest.getTtl() * Math.pow(2, reRegisterCounter - 1);
121+
int offset = reRegisterCounter == 0 ? 0 : new Random().nextInt(registerRequest.getTtl());
122+
long delay = (long) Math.min(base + offset, 60);
123+
LOG.info("[AsyncHeartbeat]Re-register instance, namespace:{}, service:{}, host:{}, port:{}, count:{}, delay:{}s",
124+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
125+
registerRequest.getPort(), reRegisterCounter, delay);
126+
registerState.setReRegisterFuture(registerState.getReRegisterExecutor().schedule(() -> {
127+
try {
128+
registerFunction.doRegister(registerRequest, createRegisterV2Header());
129+
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
130+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
131+
registerRequest.getPort());
132+
registerState.resetFailCount();
133+
registerState.resetReRegisterCounter();
134+
} catch (PolarisException e) {
135+
LOG.error(
136+
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, re-register count:{}",
137+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
138+
registerRequest.getPort(), reRegisterCounter, e);
139+
}
140+
}, delay, TimeUnit.SECONDS));
141+
registerState.incrementReRegisterCounter();
142+
}
114143
}
115144
}
116145

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
import com.tencent.polaris.api.rpc.InstanceDeregisterRequest;
2222
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
2323
import com.tencent.polaris.client.api.SDKContext;
24+
import com.tencent.polaris.logging.LoggerFactory;
25+
import org.slf4j.Logger;
26+
2427
import java.util.Map;
2528
import java.util.Optional;
2629
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
2732
import java.util.concurrent.ScheduledFuture;
33+
import java.util.concurrent.atomic.AtomicInteger;
2834

2935
/**
3036
* 注册状态管理器
@@ -33,6 +39,8 @@
3339
*/
3440
public class RegisterStateManager {
3541

42+
private static final Logger LOG = LoggerFactory.getLogger(RegisterStateManager.class);
43+
3644
private final static Map<String, Map<String, RegisterState>> REGISTER_STATES = new ConcurrentHashMap<>();
3745

3846
/**
@@ -69,15 +77,15 @@ public static void removeRegisterState(SDKContext sdkContext, InstanceDeregister
6977
.ifPresent(sdkRegisterStates -> {
7078
String registerStateKey = buildRegisterStateKey(instanceDeregisterRequest);
7179
Optional.ofNullable(sdkRegisterStates.remove(registerStateKey))
72-
.ifPresent(registerState -> registerState.getTaskFuture().cancel(false));
80+
.ifPresent(RegisterState::destroy);
7381
});
7482
}
7583

7684
public static void destroy(SDKContext sdkContext) {
7785
Optional.ofNullable(REGISTER_STATES.remove(sdkContext.getValueContext().getClientId()))
7886
.ifPresent(sdkRegisterStates -> {
7987
for (RegisterState registerState : sdkRegisterStates.values()) {
80-
registerState.getTaskFuture().cancel(false);
88+
registerState.destroy();
8189
}
8290
sdkRegisterStates.clear();
8391
});
@@ -88,26 +96,33 @@ private static String buildRegisterStateKey(CommonProviderBaseEntity baseEntity)
8896
baseEntity.getPort());
8997
}
9098

91-
public static final class RegisterState {
99+
public static class RegisterState {
92100

93101
private InstanceRegisterRequest instanceRegisterRequest;
94102
private long firstRegisterTime;
95103
private ScheduledFuture<?> taskFuture;
96-
private int heartbeatFailCounter = 0;
104+
private final AtomicInteger heartbeatFailCounter = new AtomicInteger(0);
105+
private ScheduledFuture<?> reRegisterFuture;
106+
private final ScheduledExecutorService reRegisterExecutor;
107+
private final AtomicInteger reRegisterCounter = new AtomicInteger(0);
108+
109+
public RegisterState() {
110+
this.reRegisterExecutor = Executors.newSingleThreadScheduledExecutor();
111+
}
97112

98113
/**
99114
* Increment fail count by one
100115
*/
101116
public void incrementFailCount() {
102-
heartbeatFailCounter += 1;
117+
heartbeatFailCounter.incrementAndGet();
103118
}
104119

105120
public int getHeartbeatFailCounter() {
106-
return heartbeatFailCounter;
121+
return heartbeatFailCounter.get();
107122
}
108123

109124
public void resetFailCount() {
110-
heartbeatFailCounter = 0;
125+
heartbeatFailCounter.set(0);
111126
}
112127

113128
public InstanceRegisterRequest getInstanceRegisterRequest() {
@@ -133,5 +148,41 @@ public ScheduledFuture<?> getTaskFuture() {
133148
public void setTaskFuture(ScheduledFuture<?> taskFuture) {
134149
this.taskFuture = taskFuture;
135150
}
151+
152+
public ScheduledFuture<?> getReRegisterFuture() {
153+
return reRegisterFuture;
154+
}
155+
156+
public void setReRegisterFuture(ScheduledFuture<?> reRegisterFuture) {
157+
this.reRegisterFuture = reRegisterFuture;
158+
}
159+
160+
public ScheduledExecutorService getReRegisterExecutor() {
161+
return reRegisterExecutor;
162+
}
163+
164+
public int getReRegisterCounter() {
165+
return reRegisterCounter.get();
166+
}
167+
168+
public void incrementReRegisterCounter() {
169+
reRegisterCounter.incrementAndGet();
170+
}
171+
172+
public void resetReRegisterCounter() {
173+
reRegisterCounter.set(0);
174+
}
175+
176+
public void destroy() {
177+
try {
178+
getTaskFuture().cancel(false);
179+
getReRegisterFuture().cancel(false);
180+
getReRegisterExecutor().shutdownNow();
181+
} catch (Throwable throwable) {
182+
LOG.warn("[RegisterState] destroy error. namespace:{}, service:{}, host:{}, port:{}.",
183+
getInstanceRegisterRequest().getNamespace(), getInstanceRegisterRequest().getService(),
184+
getInstanceRegisterRequest().getHost(), getInstanceRegisterRequest().getPort(), throwable);
185+
}
186+
}
136187
}
137188
}

0 commit comments

Comments
 (0)