From e17996d3e51fa2b4021c2bb1c6144531dd09dd5b Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Fri, 28 Feb 2025 11:10:59 +0800 Subject: [PATCH] fix:fix circuit breaker rule wrong update when using tsf consul. --- .../composite/PolarisCircuitBreaker.java | 6 ++ .../service/authority/AuthorityService.java | 4 -- .../CircuitBreakingService.java | 65 ++++++++++++++++--- 3 files changed, 62 insertions(+), 13 deletions(-) diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java index e8e4e3011..7c4965c8f 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java @@ -149,6 +149,12 @@ private ResourceCounters getOrInitResourceCounters(Resource resource) throws Exe Resource ruleResource = getActualResource(resource, false); Optional resourceCounters = getResourceCounters(ruleResource); boolean reloadFaultDetect = false; + /* + 这里不判断!resourceCounters.isPresent(),因为在初始化的时候, + 如果没有对应的资源,就会生成Optional.empty进行资源占位,防止每次请求都去后端获取熔断规则。 + 如果有对应的资源,但是没有熔断规则,也会生成Optional.empty进行资源占位。如果后续新增熔断规则,会等到资源过期清理后,再创建熔断器。 + 如果有对应的资源,且有熔断规则,就会正常获取到熔断器。 + */ if (null == resourceCounters) { synchronized (countersCache) { resourceCounters = getResourceCounters(ruleResource); diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java index 83c58cdac..1859a01f0 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/authority/AuthorityService.java @@ -25,8 +25,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import com.google.protobuf.StringValue; import com.google.protobuf.UInt32Value; import com.tencent.polaris.api.exception.ErrorCode; @@ -70,8 +68,6 @@ public class AuthorityService extends ConsulService { private static final Logger LOG = LoggerFactory.getLogger(AuthorityService.class); - private final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); - private final Map authorityConsulIndexMap = new ConcurrentHashMap<>(); public AuthorityService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java index 32aebf0af..be1d7b9a4 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/circuitbreaker/CircuitBreakingService.java @@ -66,7 +66,7 @@ public class CircuitBreakingService extends ConsulService { private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakingService.class); - private final Map circuitBreakingConsulIndexMap = new ConcurrentHashMap<>(); + private final Map circuitBreakingConsulIndexMap = new ConcurrentHashMap<>(); public CircuitBreakingService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, String threadName, ObjectMapper mapper) { @@ -83,6 +83,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) { circuitBreakingKey.setNamespace(namespace); circuitBreakingKey.setService(service); Long currentIndex = getCircuitBreakingConsulIndex(circuitBreakingKey); + CircuitBreakingValue currentCircuitBreakingValue = getCircuitBreakingValue(circuitBreakingKey); QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), currentIndex); int code = ServerCodes.DATA_NO_CHANGE; try { @@ -100,6 +101,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) { } Long newIndex = response.getConsulIndex(); + boolean is404 = false; // create service. ServiceProto.Service.Builder newServiceBuilder = ServiceProto.Service.newBuilder(); newServiceBuilder.setNamespace(StringValue.of(namespace)); @@ -130,9 +132,16 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) { } } } else { + if (currentCircuitBreakingValue != null && currentCircuitBreakingValue.isLast404) { + code = ServerCodes.DATA_NO_CHANGE; + } + is404 = true; LOG.info("empty circuit breaker rule: {}", response); } } else { + if (currentCircuitBreakingValue != null && currentCircuitBreakingValue.isLast404) { + is404 = true; + } LOG.debug("[TSF CIRCUIT BREAKER] Consul data is not changed"); } } else { @@ -146,7 +155,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) { ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL); boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent); if (newIndex != null) { - setCircuitBreakingConsulIndex(circuitBreakingKey, currentIndex, newIndex); + setCircuitBreakingConsulIndex(circuitBreakingKey, currentIndex, newIndex, is404); } if (!svcDeleted) { serviceUpdateTask.addUpdateTaskSet(); @@ -244,17 +253,21 @@ private List parseResponse(String decode } private Long getCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey) { - Long index = circuitBreakingConsulIndexMap.get(circuitBreakingKey); - if (index != null) { - return index; + CircuitBreakingValue circuitBreakingValue = circuitBreakingConsulIndexMap.get(circuitBreakingKey); + if (circuitBreakingValue != null && circuitBreakingValue.getIndex() != null) { + return circuitBreakingValue.getIndex(); } - setCircuitBreakingConsulIndex(circuitBreakingKey, null, -1L); + setCircuitBreakingConsulIndex(circuitBreakingKey, null, -1L, false); return -1L; } - private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex) { - LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}", circuitBreakingKey, lastIndex, newIndex); - circuitBreakingConsulIndexMap.put(circuitBreakingKey, newIndex); + private CircuitBreakingValue getCircuitBreakingValue(CircuitBreakingKey circuitBreakingKey) { + return circuitBreakingConsulIndexMap.get(circuitBreakingKey); + } + + private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex, Boolean is404) { + LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404); + circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404)); } static class CircuitBreakingKey { @@ -309,6 +322,40 @@ public String toString() { } } + static class CircuitBreakingValue { + private Long index; + private Boolean isLast404 = false; + + public CircuitBreakingValue(Long index, Boolean isLast404) { + this.index = index; + this.isLast404 = isLast404; + } + + public Long getIndex() { + return index; + } + + public void setIndex(Long index) { + this.index = index; + } + + public Boolean getLast404() { + return isLast404; + } + + public void setLast404(Boolean last404) { + isLast404 = last404; + } + + @Override + public String toString() { + return "CircuitBreakingValue{" + + "index=" + index + + ", isLast404=" + isLast404 + + '}'; + } + } + private CircuitBreakerProto.Level parseLevel(TsfCircuitBreakerIsolationLevelEnum isolationLevel) { switch (isolationLevel) { case INSTANCE: