From 92b0ae6130c9b08d2a3ac623c94dcef2ffacc87d Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 27 May 2025 09:51:07 +0800 Subject: [PATCH] feat:support dynamic discovery. --- .../client/flow/RegisterStateManager.java | 14 +++++++ .../api/plugin/server/ServerConnector.java | 18 ++++++++ .../composite/CompositeConnector.java | 10 +++++ .../composite/CompositeRevision.java | 6 +++ .../composite/CompositeServiceUpdateTask.java | 39 +++++++++++++---- .../connector/consul/ConsulAPIConnector.java | 11 +++++ .../consul/ConsulServiceUpdateTask.java | 9 ++++ .../consul/service/ConsulService.java | 15 +++++++ .../consul/service/InstanceService.java | 14 ++++++- .../consul/service/ServiceService.java | 12 +++++- .../service/authority/AuthorityService.java | 13 +++++- .../CircuitBreakingService.java | 13 +++++- .../consul/service/lane/LaneService.java | 42 +++++++++++++------ .../service/lossless/LosslessService.java | 23 ++++++---- .../ratelimiting/RateLimitingService.java | 13 +++++- .../router/NearByRouteRuleService.java | 13 +++++- .../consul/service/router/RoutingService.java | 13 +++++- 17 files changed, 235 insertions(+), 43 deletions(-) diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java index 16d1ae5de..046926fee 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java @@ -66,6 +66,20 @@ public static RegisterState putRegisterState(SDKContext sdkContext, }); } + /** + * Get instance register state from cache + * + * @param sdkContext sdk context + * @param instanceRegisterRequest instance register request + * @return Return instance register state object if it is cached, otherwise null + */ + public static RegisterState getRegisterState(SDKContext sdkContext, InstanceRegisterRequest instanceRegisterRequest) { + String registerStateKey = buildRegisterStateKey(instanceRegisterRequest); + Map sdkRegisterStates = REGISTER_STATES.computeIfAbsent( + sdkContext.getValueContext().getClientId(), clientId -> new ConcurrentHashMap<>()); + return sdkRegisterStates.get(registerStateKey); + } + /** * Remove the instance heartbeat task and cancel the task * diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java index dd95fc766..045574f09 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerConnector.java @@ -137,6 +137,15 @@ default ServiceRuleByProto getServiceContract(CommonServiceContractRequest req) */ boolean isRegisterEnable(); + /** + * Set if register enabled. + * + * @param registerEnable + */ + default void setRegisterEnable(boolean registerEnable) { + + } + /** * Check if discovery enabled. * @@ -144,6 +153,15 @@ default ServiceRuleByProto getServiceContract(CommonServiceContractRequest req) */ boolean isDiscoveryEnable(); + /** + * Set if discovery enabled. + * + * @param discoveryEnable + */ + default void setDiscoveryEnable(boolean discoveryEnable) { + + } + /** * Check if service contract reporting enabled. * diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java index 724b5d49d..7a07fbe4c 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java @@ -28,6 +28,7 @@ import com.tencent.polaris.api.plugin.server.*; import com.tencent.polaris.api.pojo.ServiceEventKey; import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.ServiceRuleByProto; import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl; @@ -306,4 +307,13 @@ public boolean isZeroProtectionEnabled() { public boolean isNeedTestConnectivity() { return zeroProtectionConfig.isNeedTestConnectivity(); } + + public DestroyableServerConnector getServerConnectorByType(String type) { + for (DestroyableServerConnector sc : serverConnectors) { + if (StringUtils.equals(sc.getName(), type)) { + return sc; + } + } + return null; + } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java index 6d3887f24..7ed2908d9 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java @@ -53,6 +53,12 @@ public String getRevision(String name) { return ""; } + public void removeRevision(String name) { + if (ORDER_LIST.contains(name)) { + content[ORDER_LIST.indexOf(name)] = ""; + } + } + /** * Generate composite revision string. * diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java index 2d9dfcfa8..bc3f402b4 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java @@ -176,16 +176,25 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { CompositeRevision compositeRevision = new CompositeRevision(); Object value = getEventHandler().getValue(); Map> instancesMap = new HashMap<>(); + boolean isDiscoveryChanged = false; if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) { ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; compositeRevision = CompositeRevision.of(cacheValue.getRevision()); List oldInstancesList = cacheValue.getOriginInstancesList(); for (Instance oldInstance : oldInstancesList) { String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); - if (!instancesMap.containsKey(serverConnectorType)) { - instancesMap.put(serverConnectorType, new ArrayList<>()); + DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType); + if (serverConnector != null && serverConnector.isDiscoveryEnable()) { + if (!instancesMap.containsKey(serverConnectorType)) { + instancesMap.put(serverConnectorType, new ArrayList<>()); + } + instancesMap.get(serverConnectorType).add(oldInstance); + } else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) { + compositeRevision.removeRevision(serverConnectorType); + isDiscoveryChanged = true; + LOG.info("server connector {} is not enabled for discovery instance {}:{}", + serverConnectorType, oldInstance.getHost().getValue(), oldInstance.getPort().getValue()); } - instancesMap.get(serverConnectorType).add(oldInstance); } } @@ -200,6 +209,8 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { // 按照事件来源更新对应的列表 serverEventInstancesList.clear(); serverEventInstancesList.addAll(discoverResponse.getInstancesList()); + } else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) { + newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS)); } if (LOG.isDebugEnabled()) { String newInstancesMapStr = convertToString(instancesMap); @@ -336,16 +347,24 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { CompositeRevision compositeRevision = new CompositeRevision(); Object value = getEventHandler().getValue(); Map> servicesMap = new HashMap<>(); + boolean isDiscoveryChanged = false; if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) { ServicesByProto cacheValue = (ServicesByProto) value; compositeRevision = CompositeRevision.of(cacheValue.getRevision()); - List oldInstancesList = cacheValue.getOriginServicesList(); - for (Service oldService : oldInstancesList) { + List oldServiceList = cacheValue.getOriginServicesList(); + for (Service oldService : oldServiceList) { String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); - if (!servicesMap.containsKey(serverConnectorType)) { - servicesMap.put(serverConnectorType, new ArrayList<>()); + DestroyableServerConnector serverConnector = connector.getServerConnectorByType(serverConnectorType); + if (serverConnector != null && serverConnector.isDiscoveryEnable()) { + if (!servicesMap.containsKey(serverConnectorType)) { + servicesMap.put(serverConnectorType, new ArrayList<>()); + } + servicesMap.get(serverConnectorType).add(oldService); + } else if (serverConnector != null && !serverConnector.isDiscoveryEnable()) { + compositeRevision.removeRevision(serverConnectorType); + isDiscoveryChanged = true; + LOG.info("server connector {} is not enabled for discovery service {}", serverConnectorType, oldService); } - servicesMap.get(serverConnectorType).add(oldService); } } @@ -353,9 +372,11 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); // 按照事件来源更新对应的列表 List serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); - if (discoverResponse.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) { + if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) { serverEventServicesList.clear(); serverEventServicesList.addAll(discoverResponse.getServicesList()); + } else if (newDiscoverResponseBuilder.getCode().getValue() == ServerCodes.DATA_NO_CHANGE && isDiscoveryChanged) { + newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.EXECUTE_SUCCESS)); } // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java index d02ccc087..810ef0587 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java @@ -115,11 +115,22 @@ public boolean isRegisterEnable() { return isRegisterEnable; } + public void setRegisterEnable(boolean registerEnable) { + isRegisterEnable = registerEnable; + } + @Override public boolean isDiscoveryEnable() { return isDiscoveryEnable; } + public void setDiscoveryEnable(boolean discoveryEnable) { + isDiscoveryEnable = discoveryEnable; + for (ConsulService consulService : consulServiceMap.values()) { + consulService.setEnable(discoveryEnable); + } + } + @Override public boolean isReportServiceContractEnable() { return false; diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java index e5540201f..41a68286b 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java @@ -70,4 +70,13 @@ protected void handle(Throwable throwable) { public boolean notifyServerEvent(ServerEvent serverEvent) { return false; } + + @Override + public boolean needUpdate() { + if (serverConnector.isDiscoveryEnable()) { + return super.needUpdate(); + } else { + return false; + } + } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java index 5401fc997..564ba8eea 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java @@ -48,6 +48,10 @@ public abstract class ConsulService extends Destroyable { protected final ExecutorService refreshExecutor; + protected boolean enable = true; + + protected boolean isReset = false; + public ConsulService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, String threadName, ObjectMapper mapper) { this.consulClient = consulClient; @@ -74,4 +78,15 @@ public void asyncSendRequest(ServiceUpdateTask serviceUpdateTask) { protected void doDestroy() { ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{refreshExecutor}); } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + if (!this.enable && enable) { + this.isReset = true; + } + this.enable = enable; + } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java index 2feaf1b17..214fed33d 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java @@ -209,7 +209,17 @@ private Long getServersConsulIndex(String serviceId) { } private void setServersConsulIndex(String serviceId, Long lastIndex, Long newIndex) { - LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex); - serviceConsulIndexMap.put(serviceId, newIndex); + if (isEnable() && isReset) { + // 长轮询之后出现关闭又开启的情况,需要清空index,否则后续拉不到 + LOG.info("serviceId: {} is reset.", serviceId); + serviceConsulIndexMap.remove(serviceId); + isReset = false; + } else if (isEnable()) { + LOG.debug("serviceId: {}; lastIndex: {}; newIndex: {}", serviceId, lastIndex, newIndex); + serviceConsulIndexMap.put(serviceId, newIndex); + } else { + LOG.info("serviceId: {} is disabled.", serviceId); + serviceConsulIndexMap.remove(serviceId); + } } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java index 8f3a272ba..45dde3004 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java @@ -104,7 +104,17 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) { ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL); boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent); if (consulIndex != null) { - catalogConsulIndex.set(consulIndex); + if (isEnable() && isReset) { + LOG.info("service is reset."); + catalogConsulIndex.set(-1L); + isReset = false; + } else if (isEnable()) { + LOG.debug("lastIndex: {}; newIndex: {}", index, consulIndex); + catalogConsulIndex.set(consulIndex); + } else { + LOG.info("service is disabled."); + catalogConsulIndex.set(-1L); + } } if (!svcDeleted) { serviceUpdateTask.addUpdateTaskSet(); diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java index 1859a01f0..3b6fcc9f7 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java @@ -293,8 +293,17 @@ private Long getAuthorityConsulIndex(AuthorityKey authorityKey) { } private void setAuthorityConsulIndex(AuthorityKey authorityKey, Long lastIndex, Long newIndex) { - LOG.debug("AuthorityKey: {}; lastIndex: {}; newIndex: {}", authorityKey, lastIndex, newIndex); - authorityConsulIndexMap.put(authorityKey, newIndex); + if (isEnable() && isReset) { + LOG.info("AuthorityKey: {} is reset.", authorityKey); + authorityConsulIndexMap.remove(authorityKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("AuthorityKey: {}; lastIndex: {}; newIndex: {}", authorityKey, lastIndex, newIndex); + authorityConsulIndexMap.put(authorityKey, newIndex); + } else { + LOG.info("AuthorityKey: {} is disabled.", authorityKey); + authorityConsulIndexMap.remove(authorityKey); + } } static class AuthorityKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java index be1d7b9a4..8f272f97c 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java @@ -266,8 +266,17 @@ private CircuitBreakingValue getCircuitBreakingValue(CircuitBreakingKey circuitB } private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex, Boolean is404) { - LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404); - circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404)); + if (isEnable() && isReset) { + LOG.info("CircuitBreakingKey: {} is reset.", circuitBreakingKey); + circuitBreakingConsulIndexMap.remove(circuitBreakingKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404); + circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404)); + } else { + LOG.info("CircuitBreakingKey: {} is disabled.", circuitBreakingKey); + circuitBreakingConsulIndexMap.remove(circuitBreakingKey); + } } static class CircuitBreakingKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java index afd1969ae..946d26e6c 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java @@ -442,32 +442,50 @@ private List parseLaneRuleResponse(final HttpResponse response, Object return laneRuleList; } - private Long getLaneInfoConsulIndex(LaneRuleKey routerRuleKey) { - Long index = laneInfoConsulIndexMap.get(routerRuleKey); + private Long getLaneInfoConsulIndex(LaneRuleKey laneInfoKey) { + Long index = laneInfoConsulIndexMap.get(laneInfoKey); if (index != null) { return index; } - setLaneInfoConsulIndex(routerRuleKey, null, -1L); + setLaneInfoConsulIndex(laneInfoKey, null, -1L); return -1L; } - private void setLaneInfoConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) { - LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); - laneInfoConsulIndexMap.put(routerRuleKey, newIndex); + private void setLaneInfoConsulIndex(LaneRuleKey laneInfoKey, Long lastIndex, Long newIndex) { + if (isEnable() && isReset) { + LOG.info("LaneInfoKey: {} is reset.", laneInfoKey); + laneInfoConsulIndexMap.remove(laneInfoKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("LaneInfoKey: {}; lastIndex: {}; newIndex: {}", laneInfoKey, lastIndex, newIndex); + laneInfoConsulIndexMap.put(laneInfoKey, newIndex); + } else { + LOG.info("LaneInfoKey: {} is disabled.", laneInfoKey); + laneInfoConsulIndexMap.remove(laneInfoKey); + } } - private Long getLaneRuleConsulIndex(LaneRuleKey routerRuleKey) { - Long index = laneRuleConsulIndexMap.get(routerRuleKey); + private Long getLaneRuleConsulIndex(LaneRuleKey laneRuleKey) { + Long index = laneRuleConsulIndexMap.get(laneRuleKey); if (index != null) { return index; } - setLaneRuleConsulIndex(routerRuleKey, null, -1L); + setLaneRuleConsulIndex(laneRuleKey, null, -1L); return -1L; } - private void setLaneRuleConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) { - LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); - laneRuleConsulIndexMap.put(routerRuleKey, newIndex); + private void setLaneRuleConsulIndex(LaneRuleKey laneRuleKey, Long lastIndex, Long newIndex) { + if (isEnable() && isReset) { + LOG.info("LaneRuleKey: {} is reset.", laneRuleKey); + laneRuleConsulIndexMap.remove(laneRuleKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", laneRuleKey, lastIndex, newIndex); + laneRuleConsulIndexMap.put(laneRuleKey, newIndex); + } else { + LOG.info("LaneRuleKey: {} is disabled.", laneRuleKey); + laneRuleConsulIndexMap.remove(laneRuleKey); + } } static class LaneRuleKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lossless/LosslessService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lossless/LosslessService.java index e192be6a7..39d4ff806 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lossless/LosslessService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lossless/LosslessService.java @@ -17,13 +17,6 @@ package com.tencent.polaris.plugins.connector.consul.service.lossless; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - import com.ecwid.consul.SingleUrlParameters; import com.ecwid.consul.UrlParameters; import com.ecwid.consul.json.GsonFactory; @@ -53,6 +46,9 @@ import org.slf4j.Logger; import org.yaml.snakeyaml.Yaml; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; /** @@ -231,8 +227,17 @@ private Long getWarmupRuleConsulIndex(WarmupRuleKey warmupRuleKey) { } private void setWarmupRuleConsulIndex(WarmupRuleKey warmupRuleKey, Long lastIndex, Long newIndex) { - LOG.debug("WarmupRuleKey: {}; lastIndex: {}; newIndex: {}", warmupRuleKey, lastIndex, newIndex); - warmupRuleConsulIndexMap.put(warmupRuleKey, newIndex); + if (isEnable() && isReset) { + LOG.info("WarmupRuleKey: {} is reset.", warmupRuleKey); + warmupRuleConsulIndexMap.remove(warmupRuleKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("WarmupRuleKey: {}; lastIndex: {}; newIndex: {}", warmupRuleKey, lastIndex, newIndex); + warmupRuleConsulIndexMap.put(warmupRuleKey, newIndex); + } else { + LOG.info("WarmupRuleKey: {} is disabled.", warmupRuleKey); + warmupRuleConsulIndexMap.remove(warmupRuleKey); + } } static class WarmupRuleKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ratelimiting/RateLimitingService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ratelimiting/RateLimitingService.java index 7049ada4e..54f690ddf 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ratelimiting/RateLimitingService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ratelimiting/RateLimitingService.java @@ -368,8 +368,17 @@ private Long getRateLimitingConsulIndex(RateLimitingKey rateLimitingKey) { } private void setRateLimitingConsulIndex(RateLimitingKey rateLimitingKey, Long lastIndex, Long newIndex) { - LOG.debug("RateLimitingKey: {}; lastIndex: {}; newIndex: {}", rateLimitingKey, lastIndex, newIndex); - rateLimitingConsulIndexMap.put(rateLimitingKey, newIndex); + if (isEnable() && isReset) { + LOG.info("RateLimitingKey: {} is reset.", rateLimitingKey); + rateLimitingConsulIndexMap.remove(rateLimitingKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("RateLimitingKey: {}; lastIndex: {}; newIndex: {}", rateLimitingKey, lastIndex, newIndex); + rateLimitingConsulIndexMap.put(rateLimitingKey, newIndex); + } else { + LOG.info("RateLimitingKey: {} is disabled.", rateLimitingKey); + rateLimitingConsulIndexMap.remove(rateLimitingKey); + } } static class RateLimitingKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/NearByRouteRuleService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/NearByRouteRuleService.java index 1eda27adf..d63b41757 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/NearByRouteRuleService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/NearByRouteRuleService.java @@ -210,8 +210,17 @@ private Long getRouterRuleConsulIndex(NearByRouteRuleKey nearByRouteRuleKey) { } private void setRouterRuleConsulIndex(NearByRouteRuleKey nearByRouteRuleKey, Long lastIndex, Long newIndex) { - LOG.debug("NearByRouteRuleKey: {}; lastIndex: {}; newIndex: {}", nearByRouteRuleKey, lastIndex, newIndex); - affinityConsulIndexMap.put(nearByRouteRuleKey, newIndex); + if (isEnable() && isReset) { + LOG.info("NearByRouteRuleKey: {} is reset.", nearByRouteRuleKey); + affinityConsulIndexMap.remove(nearByRouteRuleKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("NearByRouteRuleKey: {}; lastIndex: {}; newIndex: {}", nearByRouteRuleKey, lastIndex, newIndex); + affinityConsulIndexMap.put(nearByRouteRuleKey, newIndex); + } else { + LOG.info("NearByRouteRuleKey: {} is disabled.", nearByRouteRuleKey); + affinityConsulIndexMap.remove(nearByRouteRuleKey); + } } static class NearByRouteRuleKey { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/RoutingService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/RoutingService.java index 043a0502a..f8be3064a 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/RoutingService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/router/RoutingService.java @@ -308,8 +308,17 @@ private Long getRouterRuleConsulIndex(RouterRuleKey routerRuleKey) { } private void setRouterRuleConsulIndex(RouterRuleKey routerRuleKey, Long lastIndex, Long newIndex) { - LOG.debug("RouterRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); - routerRuleConsulIndexMap.put(routerRuleKey, newIndex); + if (isEnable() && isReset) { + LOG.info("RouterRuleKey: {} is reset.", routerRuleKey); + routerRuleConsulIndexMap.remove(routerRuleKey); + isReset = false; + } else if (isEnable()) { + LOG.debug("RouterRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); + routerRuleConsulIndexMap.put(routerRuleKey, newIndex); + } else { + LOG.info("RouterRuleKey: {} is disabled.", routerRuleKey); + routerRuleConsulIndexMap.remove(routerRuleKey); + } } static class RouterRuleKey {