Skip to content

fix:fix circuit breaker rule wrong update when using tsf consul. #586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ private ResourceCounters getOrInitResourceCounters(Resource resource) throws Exe
Resource ruleResource = getActualResource(resource, false);
Optional<ResourceCounters> resourceCounters = getResourceCounters(ruleResource);
boolean reloadFaultDetect = false;
/*
这里不判断!resourceCounters.isPresent(),因为在初始化的时候,
如果没有对应的资源,就会生成Optional.empty进行资源占位,防止每次请求都去后端获取熔断规则。
如果有对应的资源,但是没有熔断规则,也会生成Optional.empty进行资源占位。如果后续新增熔断规则,会等到资源过期清理后,再创建熔断器。
如果有对应的资源,且有熔断规则,就会正常获取到熔断器。
*/
if (null == resourceCounters) {
synchronized (countersCache) {
resourceCounters = getResourceCounters(ruleResource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.tencent.polaris.api.exception.ErrorCode;
Expand Down Expand Up @@ -70,8 +68,6 @@ public class AuthorityService extends ConsulService {

private static final Logger LOG = LoggerFactory.getLogger(AuthorityService.class);

private final Gson gson = new GsonBuilder().disableHtmlEscaping().create();

private final Map<AuthorityKey, Long> authorityConsulIndexMap = new ConcurrentHashMap<>();

public AuthorityService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class CircuitBreakingService extends ConsulService {

private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakingService.class);

private final Map<CircuitBreakingKey, Long> circuitBreakingConsulIndexMap = new ConcurrentHashMap<>();
private final Map<CircuitBreakingKey, CircuitBreakingValue> circuitBreakingConsulIndexMap = new ConcurrentHashMap<>();

public CircuitBreakingService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext,
String threadName, ObjectMapper mapper) {
Expand All @@ -83,6 +83,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
circuitBreakingKey.setNamespace(namespace);
circuitBreakingKey.setService(service);
Long currentIndex = getCircuitBreakingConsulIndex(circuitBreakingKey);
CircuitBreakingValue currentCircuitBreakingValue = getCircuitBreakingValue(circuitBreakingKey);
QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), currentIndex);
int code = ServerCodes.DATA_NO_CHANGE;
try {
Expand All @@ -100,6 +101,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
}

Long newIndex = response.getConsulIndex();
boolean is404 = false;
// create service.
ServiceProto.Service.Builder newServiceBuilder = ServiceProto.Service.newBuilder();
newServiceBuilder.setNamespace(StringValue.of(namespace));
Expand Down Expand Up @@ -130,9 +132,16 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
}
}
} else {
if (currentCircuitBreakingValue != null && currentCircuitBreakingValue.isLast404) {
code = ServerCodes.DATA_NO_CHANGE;
}
is404 = true;
LOG.info("empty circuit breaker rule: {}", response);
}
} else {
if (currentCircuitBreakingValue != null && currentCircuitBreakingValue.isLast404) {
is404 = true;
}
LOG.debug("[TSF CIRCUIT BREAKER] Consul data is not changed");
}
} else {
Expand All @@ -146,7 +155,7 @@ public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL);
boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent);
if (newIndex != null) {
setCircuitBreakingConsulIndex(circuitBreakingKey, currentIndex, newIndex);
setCircuitBreakingConsulIndex(circuitBreakingKey, currentIndex, newIndex, is404);
}
if (!svcDeleted) {
serviceUpdateTask.addUpdateTaskSet();
Expand Down Expand Up @@ -244,17 +253,21 @@ private List<CircuitBreakerProto.CircuitBreakerRule> parseResponse(String decode
}

private Long getCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey) {
Long index = circuitBreakingConsulIndexMap.get(circuitBreakingKey);
if (index != null) {
return index;
CircuitBreakingValue circuitBreakingValue = circuitBreakingConsulIndexMap.get(circuitBreakingKey);
if (circuitBreakingValue != null && circuitBreakingValue.getIndex() != null) {
return circuitBreakingValue.getIndex();
}
setCircuitBreakingConsulIndex(circuitBreakingKey, null, -1L);
setCircuitBreakingConsulIndex(circuitBreakingKey, null, -1L, false);
return -1L;
}

private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex) {
LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}", circuitBreakingKey, lastIndex, newIndex);
circuitBreakingConsulIndexMap.put(circuitBreakingKey, newIndex);
private CircuitBreakingValue getCircuitBreakingValue(CircuitBreakingKey circuitBreakingKey) {
return circuitBreakingConsulIndexMap.get(circuitBreakingKey);
}

private void setCircuitBreakingConsulIndex(CircuitBreakingKey circuitBreakingKey, Long lastIndex, Long newIndex, Boolean is404) {
LOG.debug("CircuitBreakingKey: {}; lastIndex: {}; newIndex: {}, is404: {}", circuitBreakingKey, lastIndex, newIndex, is404);
circuitBreakingConsulIndexMap.put(circuitBreakingKey, new CircuitBreakingValue(newIndex, is404));
}

static class CircuitBreakingKey {
Expand Down Expand Up @@ -309,6 +322,40 @@ public String toString() {
}
}

static class CircuitBreakingValue {
private Long index;
private Boolean isLast404 = false;

public CircuitBreakingValue(Long index, Boolean isLast404) {
this.index = index;
this.isLast404 = isLast404;
}

public Long getIndex() {
return index;
}

public void setIndex(Long index) {
this.index = index;
}

public Boolean getLast404() {
return isLast404;
}

public void setLast404(Boolean last404) {
isLast404 = last404;
}

@Override
public String toString() {
return "CircuitBreakingValue{" +
"index=" + index +
", isLast404=" + isLast404 +
'}';
}
}

private CircuitBreakerProto.Level parseLevel(TsfCircuitBreakerIsolationLevelEnum isolationLevel) {
switch (isolationLevel) {
case INSTANCE:
Expand Down