Skip to content

fix: 修复通配API在探测关闭后无法停止探测任务的问题 #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException {
statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig);
try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) {
sdkContext.init();
URL metricsUrl = new URL("http://127.0.0.1:28080/metrics");
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
metricsConn.connect();
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
for (int i = 0; i < 3; i++) {
URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", i));
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
try {
metricsConn.connect();
} catch (IOException e) {
continue;
}
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException {
statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig);
try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) {
sdkContext.init();
URL metricsUrl = new URL("http://127.0.0.1:28080/metrics");
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
metricsConn.connect();
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
for (int i = 0; i < 3; i++) {
URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", 28080 + i));
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
try {
metricsConn.connect();
} catch (IOException e) {
continue;
}
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.specification.api.v1.model.ModelProto;
import org.slf4j.Logger;
Expand Down Expand Up @@ -574,6 +575,7 @@ protected void doDestroy() {
HttpServer httpServer = entry.getValue();
httpServer.stop(0);
((ExecutorService) httpServer.getExecutor()).shutdownNow();
Utils.sleepUninterrupted(1000);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker

// map the wildcard resource to rule specific resource,
// eg. /path/wildcard/123 => /path/wildcard/.+
private Cache<Resource, Resource> resourceMapping;
private final Map<Resource, ResourceWrap> resourceMapping = new ConcurrentHashMap<>();

private Extensions extensions;

Expand All @@ -102,6 +102,8 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker

private long checkPeriod;

private long resourceExpireInterval;

private CircuitBreakerRuleDictionary circuitBreakerRuleDictionary;

private FaultDetectRuleDictionary faultDetectRuleDictionary;
Expand All @@ -110,15 +112,15 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker

@Override
public CircuitBreakerStatus checkResource(Resource resource) {
Resource ruleResource = getActualResource(resource);
Resource ruleResource = getActualResource(resource, false);
Optional<ResourceCounters> resourceCounters = getResourceCounters(ruleResource);
if (null == resourceCounters) {
if (resource.getLevel() == Level.METHOD && Objects.equals(ruleResource, resource)) {
// 可能是被淘汰了,需要重新计算RuleResource
CircuitBreakerProto.CircuitBreakerRule circuitBreakerRule = circuitBreakerRuleDictionary.lookupCircuitBreakerRule(resource);
ruleResource = computeResourceByRule(resource, circuitBreakerRule);
if (!Objects.equals(ruleResource, resource)) {
resourceMapping.put(resource, ruleResource);
// 这里不能放缓存,需要在report的时候统一放,否则会有探测规则无法关联resource的问题
resourceCounters = getResourceCounters(ruleResource);
}
}
Expand All @@ -139,16 +141,19 @@ public void report(ResourceStat resourceStat) {
doReport(resourceStat, true);
}

public Resource getActualResource(Resource resource) {
Resource ruleResource = resourceMapping.getIfPresent(resource);
if (null == ruleResource) {
ruleResource = resource;
public Resource getActualResource(Resource resource, boolean internal) {
ResourceWrap resourceWrap = resourceMapping.get(resource);
if (null == resourceWrap) {
return resource;
}
if (!internal) {
resourceWrap.lastAccessTimeMilli = System.currentTimeMillis();
}
return ruleResource;
return resourceWrap.resource;
}

private ResourceCounters getOrInitResourceCounters(Resource resource) throws ExecutionException {
Resource ruleResource = getActualResource(resource);
Resource ruleResource = getActualResource(resource, false);
Optional<ResourceCounters> resourceCounters = getResourceCounters(ruleResource);
boolean reloadFaultDetect = false;
if (null == resourceCounters) {
Expand Down Expand Up @@ -211,7 +216,7 @@ private void reloadFaultDetector(Resource resource, ResourceCounters resourceCou
if (null == healthCheckContainer) {
return;
}
healthCheckContainer.removeResource(getActualResource(resource));
healthCheckContainer.removeResource(resource);
}
else {
if (null == healthCheckContainer) {
Expand Down Expand Up @@ -268,10 +273,12 @@ private Optional<ResourceCounters> initResourceCounter(Resource resource) throws
Cache<Resource, Optional<ResourceCounters>> resourceOptionalCache = countersCache.get(resource.getLevel());
CircuitBreakerProto.CircuitBreakerRule finalCircuitBreakerRule = circuitBreakerRule;
Resource ruleResource = computeResourceByRule(resource, circuitBreakerRule);
if (!Objects.equals(ruleResource, resource)) {
resourceMapping.put(resource, new ResourceWrap(ruleResource, System.currentTimeMillis()));
}
return resourceOptionalCache.get(ruleResource, new Callable<Optional<ResourceCounters>>() {
@Override
public Optional<ResourceCounters> call() {
resourceMapping.put(resource, ruleResource);
if (null == finalCircuitBreakerRule) {
return Optional.empty();
}
Expand Down Expand Up @@ -303,7 +310,7 @@ private void addInstanceForFaultDetect(Resource resource) {
InstanceResource instanceResource = (InstanceResource) resource;
HealthCheckContainer healthCheckContainer = healthCheckCache
.get(instanceResource.getService());
if (null == healthCheckContainer) {
if (null == healthCheckContainer || instanceResource.getPort() == 0) {
return;
}
healthCheckContainer.addInstance(instanceResource);
Expand All @@ -314,7 +321,7 @@ public PluginType getType() {
return PluginTypes.CIRCUIT_BREAKER.getBaseType();
}

private class CounterRemoveListener implements RemovalListener<Resource, Optional<ResourceCounters>> {
private static class CounterRemoveListener implements RemovalListener<Resource, Optional<ResourceCounters>> {

@Override
public void onRemoval(RemovalNotification<Resource, Optional<ResourceCounters>> removalNotification) {
Expand All @@ -323,21 +330,12 @@ public void onRemoval(RemovalNotification<Resource, Optional<ResourceCounters>>
return;
}
value.ifPresent(resourceCounters -> resourceCounters.setDestroyed(true));
Resource resource = removalNotification.getKey();
if (null == resource) {
return;
}
HealthCheckContainer healthCheckContainer = PolarisCircuitBreaker.this.healthCheckCache.get(resource.getService());
if (null != healthCheckContainer) {
healthCheckContainer.removeResource(resource);
}
}
}

@Override
public void init(InitContext ctx) throws PolarisException {
long expireIntervalMilli = ctx.getConfig().getConsumer().getCircuitBreaker().getCountersExpireInterval();
resourceMapping = CacheBuilder.newBuilder().expireAfterAccess(expireIntervalMilli, TimeUnit.MILLISECONDS).build();
resourceExpireInterval = ctx.getConfig().getConsumer().getCircuitBreaker().getCountersExpireInterval();
countersCache.put(Level.SERVICE, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build());
countersCache.put(Level.METHOD, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build());
countersCache.put(Level.INSTANCE, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build());
Expand Down Expand Up @@ -366,7 +364,19 @@ public void run() {
}

public void cleanupExpiredResources() {
resourceMapping.cleanUp();
LOG.info("[CIRCUIT_BREAKER] cleanup expire resources");
for (Map.Entry<Resource, ResourceWrap> entry : resourceMapping.entrySet()) {
Resource resource = entry.getKey();
if (System.currentTimeMillis() - entry.getValue().lastAccessTimeMilli >= resourceExpireInterval) {
LOG.info("[CIRCUIT_BREAKER] resource {} expired, start to cleanup", resource);
resourceMapping.remove(resource);
HealthCheckContainer healthCheckContainer = healthCheckCache.get(resource.getService());
if (null == healthCheckContainer) {
continue;
}
healthCheckContainer.removeResource(resource);
}
}
for (Map.Entry<Level, Cache<Resource, Optional<ResourceCounters>>> entry : countersCache.entrySet()) {
Cache<Resource, Optional<ResourceCounters>> values = entry.getValue();
values.asMap().forEach(new BiConsumer<Resource, Optional<ResourceCounters>>() {
Expand Down Expand Up @@ -458,8 +468,8 @@ public void setCircuitBreakerConfig(CircuitBreakerConfig circuitBreakerConfig) {
this.circuitBreakerConfig = circuitBreakerConfig;
}

Cache<Resource, Resource> getResourceMapping() {
return resourceMapping;
int getResourceMappingSize() {
return resourceMapping.size();
}

@Override
Expand All @@ -477,8 +487,13 @@ void onCircuitBreakerRuleChanged(ServiceKey serviceKey) {
if (Objects.equals(resource.getService(), serviceKey)) {
cacheValue.invalidate(resource);
}
HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey);
if (null != healthCheckContainer) {
}
}
HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey);
if (null != healthCheckContainer) {
for (Map.Entry<Resource, ResourceWrap> entry : resourceMapping.entrySet()) {
Resource resource = entry.getKey();
if (Objects.equals(resource.getService(), serviceKey)) {
LOG.info("onCircuitBreakerRuleChanged: clear resource {} from healthCheckContainer", resource);
healthCheckContainer.removeResource(resource);
}
Expand Down Expand Up @@ -551,4 +566,16 @@ public HealthCheckContainer apply(ServiceKey serviceKey, HealthCheckContainer he
}
});
}

private static class ResourceWrap {
// target resource, not nullable
final Resource resource;
// only record the report time
long lastAccessTimeMilli;

ResourceWrap(Resource resource, long lastAccessTimeMilli) {
this.resource = resource;
this.lastAccessTimeMilli = lastAccessTimeMilli;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private boolean doCheck(Instance instance, Protocol protocol, FaultDetectRule fa
if (!matchInstanceToResource(instance, resource)) {
continue;
}
Resource actualResource = polarisCircuitBreaker.getActualResource(resource);
Resource actualResource = polarisCircuitBreaker.getActualResource(resource, true);
if (reportedResources.contains(actualResource)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void testCircuitBreakerMethodRules() {
Assert.assertEquals(Status.OPEN, circuitBreakerStatus.getStatus());
}
Assert.assertEquals(1, polarisCircuitBreaker.getCountersCache().get(Level.METHOD).size());
Assert.assertEquals(1000, polarisCircuitBreaker.getResourceMapping().size());
Assert.assertEquals(1000, polarisCircuitBreaker.getResourceMappingSize());

//check cleanup
try {
Expand All @@ -253,6 +253,6 @@ public void testCircuitBreakerMethodRules() {
e.printStackTrace();
}
polarisCircuitBreaker.cleanupExpiredResources();
Assert.assertEquals(0, polarisCircuitBreaker.getResourceMapping().size());
Assert.assertEquals(0, polarisCircuitBreaker.getResourceMappingSize());
}
}
Loading