Skip to content

feat:support dynamic discovery. #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public static RegisterState putRegisterState(SDKContext sdkContext,
});
}

/**
* Get instance register state from cache
*
* @param sdkContext sdk context
* @param instanceRegisterRequest instance register request
* @return Return instance register state object if it is cached, otherwise null
*/
public static RegisterState getRegisterState(SDKContext sdkContext, InstanceRegisterRequest instanceRegisterRequest) {
String registerStateKey = buildRegisterStateKey(instanceRegisterRequest);
Map<String, RegisterState> sdkRegisterStates = REGISTER_STATES.computeIfAbsent(
sdkContext.getValueContext().getClientId(), clientId -> new ConcurrentHashMap<>());
return sdkRegisterStates.get(registerStateKey);
}

/**
* Remove the instance heartbeat task and cancel the task
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,31 @@ default ServiceRuleByProto getServiceContract(CommonServiceContractRequest req)
*/
boolean isRegisterEnable();

/**
* Set if register enabled.
*
* @param registerEnable
*/
default void setRegisterEnable(boolean registerEnable) {

}

/**
* Check if discovery enabled.
*
* @return boolean
*/
boolean isDiscoveryEnable();

/**
* Set if discovery enabled.
*
* @param discoveryEnable
*/
default void setDiscoveryEnable(boolean discoveryEnable) {

}

/**
* Check if service contract reporting enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.tencent.polaris.api.plugin.server.*;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.ServiceRuleByProto;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl;
Expand Down Expand Up @@ -306,4 +307,13 @@ public boolean isZeroProtectionEnabled() {
public boolean isNeedTestConnectivity() {
return zeroProtectionConfig.isNeedTestConnectivity();
}

public DestroyableServerConnector getServerConnectorByType(String type) {
for (DestroyableServerConnector sc : serverConnectors) {
if (StringUtils.equals(sc.getName(), type)) {
return sc;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public String getRevision(String name) {
return "";
}

public void removeRevision(String name) {
if (ORDER_LIST.contains(name)) {
content[ORDER_LIST.indexOf(name)] = "";
}
}

/**
* Generate composite revision string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,25 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
CompositeRevision compositeRevision = new CompositeRevision();
Object value = getEventHandler().getValue();
Map<String, List<Instance>> instancesMap = new HashMap<>();
boolean isDiscoveryChanged = false;
if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) {
ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value;
compositeRevision = CompositeRevision.of(cacheValue.getRevision());
List<Instance> oldInstancesList = cacheValue.getOriginInstancesList();
for (Instance oldInstance : oldInstancesList) {
String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC);
if (!instancesMap.containsKey(serverConnectorType)) {
instancesMap.put(serverConnectorType, new ArrayList<>());
DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType);
if (serverConnector != null && serverConnector.isDiscoveryEnable()) {
if (!instancesMap.containsKey(serverConnectorType)) {
instancesMap.put(serverConnectorType, new ArrayList<>());
}
instancesMap.get(serverConnectorType).add(oldInstance);
} else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) {
compositeRevision.removeRevision(serverConnectorType);
isDiscoveryChanged = true;
LOG.info("server connector {} is not enabled for discovery instance {}:{}",
serverConnectorType, oldInstance.getHost().getValue(), oldInstance.getPort().getValue());
}
instancesMap.get(serverConnectorType).add(oldInstance);
}
}

Expand All @@ -200,6 +209,8 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
// 按照事件来源更新对应的列表
serverEventInstancesList.clear();
serverEventInstancesList.addAll(discoverResponse.getInstancesList());
} else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) {
newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS));
}
if (LOG.isDebugEnabled()) {
String newInstancesMapStr = convertToString(instancesMap);
Expand Down Expand Up @@ -336,26 +347,36 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
CompositeRevision compositeRevision = new CompositeRevision();
Object value = getEventHandler().getValue();
Map<String, List<Service>> servicesMap = new HashMap<>();
boolean isDiscoveryChanged = false;
if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) {
ServicesByProto cacheValue = (ServicesByProto) value;
compositeRevision = CompositeRevision.of(cacheValue.getRevision());
List<Service> oldInstancesList = cacheValue.getOriginServicesList();
for (Service oldService : oldInstancesList) {
List<Service> oldServiceList = cacheValue.getOriginServicesList();
for (Service oldService : oldServiceList) {
String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC);
if (!servicesMap.containsKey(serverConnectorType)) {
servicesMap.put(serverConnectorType, new ArrayList<>());
DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType);
if (serverConnector != null && serverConnector.isDiscoveryEnable()) {
if (!servicesMap.containsKey(serverConnectorType)) {
servicesMap.put(serverConnectorType, new ArrayList<>());
}
servicesMap.get(serverConnectorType).add(oldService);
} else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) {
compositeRevision.removeRevision(serverConnectorType);
isDiscoveryChanged = true;
LOG.info("server connector {} is not enabled for discovery service {}", serverConnectorType, oldService);
}
servicesMap.get(serverConnectorType).add(oldService);
}
}

// 按照事件来源更新对应的revision
compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue());
// 按照事件来源更新对应的列表
List<Service> serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>());
if (discoverResponse.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) {
if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) {
serverEventServicesList.clear();
serverEventServicesList.addAll(discoverResponse.getServicesList());
} else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) {
newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS));
}
// 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份
if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,22 @@ public boolean isRegisterEnable() {
return isRegisterEnable;
}

public void setRegisterEnable(boolean registerEnable) {
isRegisterEnable = registerEnable;
}

@Override
public boolean isDiscoveryEnable() {
return isDiscoveryEnable;
}

public void setDiscoveryEnable(boolean discoveryEnable) {
isDiscoveryEnable = discoveryEnable;
for (ConsulService consulService : consulServiceMap.values()) {
consulService.setEnable(discoveryEnable);
}
}

@Override
public boolean isReportServiceContractEnable() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ protected void handle(Throwable throwable) {
public boolean notifyServerEvent(ServerEvent serverEvent) {
return false;
}

@Override
public boolean needUpdate() {
if (serverConnector.isDiscoveryEnable()) {
return super.needUpdate();
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public abstract class ConsulService extends Destroyable {

protected final ExecutorService refreshExecutor;

protected boolean enable = true;

protected boolean isReset = false;

public ConsulService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext,
String threadName, ObjectMapper mapper) {
this.consulClient = consulClient;
Expand All @@ -74,4 +78,15 @@ public void asyncSendRequest(ServiceUpdateTask serviceUpdateTask) {
protected void doDestroy() {
ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{refreshExecutor});
}

public boolean isEnable() {
return enable;
}

public void setEnable(boolean enable) {
if (!this.enable && enable) {
this.isReset = true;
}
this.enable = enable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,17 @@ private Long getServersConsulIndex(String serviceId) {
}

private void setServersConsulIndex(String serviceId, Long lastIndex, Long newIndex) {
LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex);
serviceConsulIndexMap.put(serviceId, newIndex);
if (isEnable() && isReset) {
// 长轮询之后出现关闭又开启的情况,需要清空index,否则后续拉不到
LOG.info("serviceId: {} is reset.", serviceId);
serviceConsulIndexMap.remove(serviceId);
isReset = false;
} else if (isEnable()) {
LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex);
serviceConsulIndexMap.put(serviceId, newIndex);
} else {
LOG.info("serviceId: {} is disabled.", serviceId);
serviceConsulIndexMap.remove(serviceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,17 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL);
boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent);
if (consulIndex != null) {
catalogConsulIndex.set(consulIndex);
if (isEnable() && isReset) {
LOG.info("service is reset.");
catalogConsulIndex.set(-1L);
isReset = false;
} else if (isEnable()) {
LOG.debug("lastIndex: {}; newIndex: {}", index, consulIndex);
catalogConsulIndex.set(consulIndex);
} else {
LOG.info("service is disabled.");
catalogConsulIndex.set(-1L);
}
}
if (!svcDeleted) {
serviceUpdateTask.addUpdateTaskSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,17 @@ private Long getAuthorityConsulIndex(AuthorityKey authorityKey) {
}

private void setAuthorityConsulIndex(AuthorityKey authorityKey, Long lastIndex, Long newIndex) {
LOG.debug("AuthorityKey: {}; lastIndex: {}; newIndex: {}", authorityKey, lastIndex, newIndex);
authorityConsulIndexMap.put(authorityKey, newIndex);
if (isEnable() && isReset) {
LOG.info("AuthorityKey: {} is reset.", authorityKey);
authorityConsulIndexMap.remove(authorityKey);
isReset = false;
} else if (isEnable()) {
LOG.debug("AuthorityKey: {}; lastIndex: {}; newIndex: {}", authorityKey, lastIndex, newIndex);
authorityConsulIndexMap.put(authorityKey, newIndex);
} else {
LOG.info("AuthorityKey: {} is disabled.", authorityKey);
authorityConsulIndexMap.remove(authorityKey);
}
}

static class AuthorityKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,17 @@ private CircuitBreakingValue getCircuitBreakingValue(CircuitBreakingKey circuitB
}

private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex, Boolean is404) {
LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404);
circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404));
if (isEnable() && isReset) {
LOG.info("CircuitBreakingKey: {} is reset.", circuitBreakingKey);
circuitBreakingConsulIndexMap.remove(circuitBreakingKey);
isReset = false;
} else if (isEnable()) {
LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404);
circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404));
} else {
LOG.info("CircuitBreakingKey: {} is disabled.", circuitBreakingKey);
circuitBreakingConsulIndexMap.remove(circuitBreakingKey);
}
}

static class CircuitBreakingKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,32 +442,50 @@ private List<LaneRule> parseLaneRuleResponse(final HttpResponse response, Object
return laneRuleList;
}

private Long getLaneInfoConsulIndex(LaneRuleKey routerRuleKey) {
Long index = laneInfoConsulIndexMap.get(routerRuleKey);
private Long getLaneInfoConsulIndex(LaneRuleKey laneInfoKey) {
Long index = laneInfoConsulIndexMap.get(laneInfoKey);
if (index != null) {
return index;
}
setLaneInfoConsulIndex(routerRuleKey, null, -1L);
setLaneInfoConsulIndex(laneInfoKey, null, -1L);
return -1L;
}

private void setLaneInfoConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) {
LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex);
laneInfoConsulIndexMap.put(routerRuleKey, newIndex);
private void setLaneInfoConsulIndex(LaneRuleKey laneInfoKey, Long lastIndex, Long newIndex) {
if (isEnable() && isReset) {
LOG.info("LaneInfoKey: {} is reset.", laneInfoKey);
laneInfoConsulIndexMap.remove(laneInfoKey);
isReset = false;
} else if (isEnable()) {
LOG.debug("LaneInfoKey: {}; lastIndex: {}; newIndex: {}", laneInfoKey, lastIndex, newIndex);
laneInfoConsulIndexMap.put(laneInfoKey, newIndex);
} else {
LOG.info("LaneInfoKey: {} is disabled.", laneInfoKey);
laneInfoConsulIndexMap.remove(laneInfoKey);
}
}

private Long getLaneRuleConsulIndex(LaneRuleKey routerRuleKey) {
Long index = laneRuleConsulIndexMap.get(routerRuleKey);
private Long getLaneRuleConsulIndex(LaneRuleKey laneRuleKey) {
Long index = laneRuleConsulIndexMap.get(laneRuleKey);
if (index != null) {
return index;
}
setLaneRuleConsulIndex(routerRuleKey, null, -1L);
setLaneRuleConsulIndex(laneRuleKey, null, -1L);
return -1L;
}

private void setLaneRuleConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) {
LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex);
laneRuleConsulIndexMap.put(routerRuleKey, newIndex);
private void setLaneRuleConsulIndex(LaneRuleKey laneRuleKey, Long lastIndex, Long newIndex) {
if (isEnable() && isReset) {
LOG.info("LaneRuleKey: {} is reset.", laneRuleKey);
laneRuleConsulIndexMap.remove(laneRuleKey);
isReset = false;
} else if (isEnable()) {
LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", laneRuleKey, lastIndex, newIndex);
laneRuleConsulIndexMap.put(laneRuleKey, newIndex);
} else {
LOG.info("LaneRuleKey: {} is disabled.", laneRuleKey);
laneRuleConsulIndexMap.remove(laneRuleKey);
}
}

static class LaneRuleKey {
Expand Down
Loading