Skip to content

Commit e8101f8

Browse files
feat:support dynamic discovery. (#607)
1 parent 2907e5b commit e8101f8

File tree

17 files changed

+235
-43
lines changed

17 files changed

+235
-43
lines changed

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,20 @@ public static RegisterState putRegisterState(SDKContext sdkContext,
6666
});
6767
}
6868

69+
/**
70+
* Get instance register state from cache
71+
*
72+
* @param sdkContext sdk context
73+
* @param instanceRegisterRequest instance register request
74+
* @return Return instance register state object if it is cached, otherwise null
75+
*/
76+
public static RegisterState getRegisterState(SDKContext sdkContext, InstanceRegisterRequest instanceRegisterRequest) {
77+
String registerStateKey = buildRegisterStateKey(instanceRegisterRequest);
78+
Map<String, RegisterState> sdkRegisterStates = REGISTER_STATES.computeIfAbsent(
79+
sdkContext.getValueContext().getClientId(), clientId -> new ConcurrentHashMap<>());
80+
return sdkRegisterStates.get(registerStateKey);
81+
}
82+
6983
/**
7084
* Remove the instance heartbeat task and cancel the task
7185
*

polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,31 @@ default ServiceRuleByProto getServiceContract(CommonServiceContractRequest req)
137137
*/
138138
boolean isRegisterEnable();
139139

140+
/**
141+
* Set if register enabled.
142+
*
143+
* @param registerEnable
144+
*/
145+
default void setRegisterEnable(boolean registerEnable) {
146+
147+
}
148+
140149
/**
141150
* Check if discovery enabled.
142151
*
143152
* @return boolean
144153
*/
145154
boolean isDiscoveryEnable();
146155

156+
/**
157+
* Set if discovery enabled.
158+
*
159+
* @param discoveryEnable
160+
*/
161+
default void setDiscoveryEnable(boolean discoveryEnable) {
162+
163+
}
164+
147165
/**
148166
* Check if service contract reporting enabled.
149167
*

polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.tencent.polaris.api.plugin.server.*;
2929
import com.tencent.polaris.api.pojo.ServiceEventKey;
3030
import com.tencent.polaris.api.utils.CollectionUtils;
31+
import com.tencent.polaris.api.utils.StringUtils;
3132
import com.tencent.polaris.client.pojo.ServiceRuleByProto;
3233
import com.tencent.polaris.client.util.NamedThreadFactory;
3334
import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl;
@@ -306,4 +307,13 @@ public boolean isZeroProtectionEnabled() {
306307
public boolean isNeedTestConnectivity() {
307308
return zeroProtectionConfig.isNeedTestConnectivity();
308309
}
310+
311+
public DestroyableServerConnector getServerConnectorByType(String type) {
312+
for (DestroyableServerConnector sc : serverConnectors) {
313+
if (StringUtils.equals(sc.getName(), type)) {
314+
return sc;
315+
}
316+
}
317+
return null;
318+
}
309319
}

polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public String getRevision(String name) {
5353
return "";
5454
}
5555

56+
public void removeRevision(String name) {
57+
if (ORDER_LIST.contains(name)) {
58+
content[ORDER_LIST.indexOf(name)] = "";
59+
}
60+
}
61+
5662
/**
5763
* Generate composite revision string.
5864
*

polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,25 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
176176
CompositeRevision compositeRevision = new CompositeRevision();
177177
Object value = getEventHandler().getValue();
178178
Map<String, List<Instance>> instancesMap = new HashMap<>();
179+
boolean isDiscoveryChanged = false;
179180
if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) {
180181
ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value;
181182
compositeRevision = CompositeRevision.of(cacheValue.getRevision());
182183
List<Instance> oldInstancesList = cacheValue.getOriginInstancesList();
183184
for (Instance oldInstance : oldInstancesList) {
184185
String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC);
185-
if (!instancesMap.containsKey(serverConnectorType)) {
186-
instancesMap.put(serverConnectorType, new ArrayList<>());
186+
DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType);
187+
if (serverConnector != null && serverConnector.isDiscoveryEnable()) {
188+
if (!instancesMap.containsKey(serverConnectorType)) {
189+
instancesMap.put(serverConnectorType, new ArrayList<>());
190+
}
191+
instancesMap.get(serverConnectorType).add(oldInstance);
192+
} else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) {
193+
compositeRevision.removeRevision(serverConnectorType);
194+
isDiscoveryChanged = true;
195+
LOG.info("server connector {} is not enabled for discovery instance {}:{}",
196+
serverConnectorType, oldInstance.getHost().getValue(), oldInstance.getPort().getValue());
187197
}
188-
instancesMap.get(serverConnectorType).add(oldInstance);
189198
}
190199
}
191200

@@ -200,6 +209,8 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
200209
// 按照事件来源更新对应的列表
201210
serverEventInstancesList.clear();
202211
serverEventInstancesList.addAll(discoverResponse.getInstancesList());
212+
} else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) {
213+
newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS));
203214
}
204215
if (LOG.isDebugEnabled()) {
205216
String newInstancesMapStr = convertToString(instancesMap);
@@ -336,26 +347,36 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
336347
CompositeRevision compositeRevision = new CompositeRevision();
337348
Object value = getEventHandler().getValue();
338349
Map<String, List<Service>> servicesMap = new HashMap<>();
350+
boolean isDiscoveryChanged = false;
339351
if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) {
340352
ServicesByProto cacheValue = (ServicesByProto) value;
341353
compositeRevision = CompositeRevision.of(cacheValue.getRevision());
342-
List<Service> oldInstancesList = cacheValue.getOriginServicesList();
343-
for (Service oldService : oldInstancesList) {
354+
List<Service> oldServiceList = cacheValue.getOriginServicesList();
355+
for (Service oldService : oldServiceList) {
344356
String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC);
345-
if (!servicesMap.containsKey(serverConnectorType)) {
346-
servicesMap.put(serverConnectorType, new ArrayList<>());
357+
DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType);
358+
if (serverConnector != null && serverConnector.isDiscoveryEnable()) {
359+
if (!servicesMap.containsKey(serverConnectorType)) {
360+
servicesMap.put(serverConnectorType, new ArrayList<>());
361+
}
362+
servicesMap.get(serverConnectorType).add(oldService);
363+
} else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) {
364+
compositeRevision.removeRevision(serverConnectorType);
365+
isDiscoveryChanged = true;
366+
LOG.info("server connector {} is not enabled for discovery service {}", serverConnectorType, oldService);
347367
}
348-
servicesMap.get(serverConnectorType).add(oldService);
349368
}
350369
}
351370

352371
// 按照事件来源更新对应的revision
353372
compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue());
354373
// 按照事件来源更新对应的列表
355374
List<Service> serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>());
356-
if (discoverResponse.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) {
375+
if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) {
357376
serverEventServicesList.clear();
358377
serverEventServicesList.addAll(discoverResponse.getServicesList());
378+
} else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) {
379+
newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS));
359380
}
360381
// 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份
361382
if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) {

polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,22 @@ public boolean isRegisterEnable() {
115115
return isRegisterEnable;
116116
}
117117

118+
public void setRegisterEnable(boolean registerEnable) {
119+
isRegisterEnable = registerEnable;
120+
}
121+
118122
@Override
119123
public boolean isDiscoveryEnable() {
120124
return isDiscoveryEnable;
121125
}
122126

127+
public void setDiscoveryEnable(boolean discoveryEnable) {
128+
isDiscoveryEnable = discoveryEnable;
129+
for (ConsulService consulService : consulServiceMap.values()) {
130+
consulService.setEnable(discoveryEnable);
131+
}
132+
}
133+
123134
@Override
124135
public boolean isReportServiceContractEnable() {
125136
return false;

polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,13 @@ protected void handle(Throwable throwable) {
7070
public boolean notifyServerEvent(ServerEvent serverEvent) {
7171
return false;
7272
}
73+
74+
@Override
75+
public boolean needUpdate() {
76+
if (serverConnector.isDiscoveryEnable()) {
77+
return super.needUpdate();
78+
} else {
79+
return false;
80+
}
81+
}
7382
}

polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public abstract class ConsulService extends Destroyable {
4848

4949
protected final ExecutorService refreshExecutor;
5050

51+
protected boolean enable = true;
52+
53+
protected boolean isReset = false;
54+
5155
public ConsulService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext,
5256
String threadName, ObjectMapper mapper) {
5357
this.consulClient = consulClient;
@@ -74,4 +78,15 @@ public void asyncSendRequest(ServiceUpdateTask serviceUpdateTask) {
7478
protected void doDestroy() {
7579
ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{refreshExecutor});
7680
}
81+
82+
public boolean isEnable() {
83+
return enable;
84+
}
85+
86+
public void setEnable(boolean enable) {
87+
if (!this.enable && enable) {
88+
this.isReset = true;
89+
}
90+
this.enable = enable;
91+
}
7792
}

polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,17 @@ private Long getServersConsulIndex(String serviceId) {
209209
}
210210

211211
private void setServersConsulIndex(String serviceId, Long lastIndex, Long newIndex) {
212-
LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex);
213-
serviceConsulIndexMap.put(serviceId, newIndex);
212+
if (isEnable() && isReset) {
213+
// 长轮询之后出现关闭又开启的情况,需要清空index,否则后续拉不到
214+
LOG.info("serviceId: {} is reset.", serviceId);
215+
serviceConsulIndexMap.remove(serviceId);
216+
isReset = false;
217+
} else if (isEnable()) {
218+
LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex);
219+
serviceConsulIndexMap.put(serviceId, newIndex);
220+
} else {
221+
LOG.info("serviceId: {} is disabled.", serviceId);
222+
serviceConsulIndexMap.remove(serviceId);
223+
}
214224
}
215225
}

polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,17 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
104104
ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL);
105105
boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent);
106106
if (consulIndex != null) {
107-
catalogConsulIndex.set(consulIndex);
107+
if (isEnable() && isReset) {
108+
LOG.info("service is reset.");
109+
catalogConsulIndex.set(-1L);
110+
isReset = false;
111+
} else if (isEnable()) {
112+
LOG.debug("lastIndex: {}; newIndex: {}", index, consulIndex);
113+
catalogConsulIndex.set(consulIndex);
114+
} else {
115+
LOG.info("service is disabled.");
116+
catalogConsulIndex.set(-1L);
117+
}
108118
}
109119
if (!svcDeleted) {
110120
serviceUpdateTask.addUpdateTaskSet();

0 commit comments

Comments
 (0)