From 36702e693e898b8c9050aced2234980e31b4fae3 Mon Sep 17 00:00:00 2001 From: andrew shan <45474304+andrewshan@users.noreply.github.com> Date: Mon, 12 Aug 2024 14:41:16 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=80=9A=E9=85=8DAPI?= =?UTF-8?q?=E5=9C=A8=E6=8E=A2=E6=B5=8B=E5=85=B3=E9=97=AD=E5=90=8E=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E5=81=9C=E6=AD=A2=E6=8E=A2=E6=B5=8B=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../factory/PrometheusHttpServerTest.java | 18 ++-- .../test/core/PrometheusHttpServerTest.java | 18 ++-- .../api/plugin/compose/Extensions.java | 2 + .../composite/PolarisCircuitBreaker.java | 83 ++++++++++++------- .../composite/ResourceHealthChecker.java | 2 +- .../composite/PolarisCircuitBreakerTest.java | 4 +- 6 files changed, 84 insertions(+), 43 deletions(-) diff --git a/polaris-assembly/polaris-assembly-factory/src/test/java/com/tencent/polaris/assembly/factory/PrometheusHttpServerTest.java b/polaris-assembly/polaris-assembly-factory/src/test/java/com/tencent/polaris/assembly/factory/PrometheusHttpServerTest.java index 30d1cf6bb..bccf89f6c 100644 --- a/polaris-assembly/polaris-assembly-factory/src/test/java/com/tencent/polaris/assembly/factory/PrometheusHttpServerTest.java +++ b/polaris-assembly/polaris-assembly-factory/src/test/java/com/tencent/polaris/assembly/factory/PrometheusHttpServerTest.java @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException { statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig); try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) { sdkContext.init(); - URL metricsUrl = new URL("http://127.0.0.1:28080/metrics"); - HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection(); - metricsConn.setRequestMethod("GET"); - metricsConn.connect(); - assertThat(metricsConn.getResponseCode()).isEqualTo(200); - metricsConn.disconnect(); + for (int i = 0; i < 3; i++) { + URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", i)); + HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection(); + metricsConn.setRequestMethod("GET"); + try { + metricsConn.connect(); + } catch (IOException e) { + continue; + } + assertThat(metricsConn.getResponseCode()).isEqualTo(200); + metricsConn.disconnect(); + } } } diff --git a/polaris-discovery/polaris-discovery-factory/src/test/java/com/tencent/polaris/discovery/test/core/PrometheusHttpServerTest.java b/polaris-discovery/polaris-discovery-factory/src/test/java/com/tencent/polaris/discovery/test/core/PrometheusHttpServerTest.java index 5ecb42b98..b5cfffae4 100644 --- a/polaris-discovery/polaris-discovery-factory/src/test/java/com/tencent/polaris/discovery/test/core/PrometheusHttpServerTest.java +++ b/polaris-discovery/polaris-discovery-factory/src/test/java/com/tencent/polaris/discovery/test/core/PrometheusHttpServerTest.java @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException { statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig); try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) { sdkContext.init(); - URL metricsUrl = new URL("http://127.0.0.1:28080/metrics"); - HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection(); - metricsConn.setRequestMethod("GET"); - metricsConn.connect(); - assertThat(metricsConn.getResponseCode()).isEqualTo(200); - metricsConn.disconnect(); + for (int i = 0; i < 3; i++) { + URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", 28080 + i)); + HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection(); + metricsConn.setRequestMethod("GET"); + try { + metricsConn.connect(); + } catch (IOException e) { + continue; + } + assertThat(metricsConn.getResponseCode()).isEqualTo(200); + metricsConn.disconnect(); + } } } diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/compose/Extensions.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/compose/Extensions.java index cd65cdf57..12dab757a 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/compose/Extensions.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/compose/Extensions.java @@ -50,6 +50,7 @@ import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.Node; import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.client.util.Utils; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.specification.api.v1.model.ModelProto; import org.slf4j.Logger; @@ -574,6 +575,7 @@ protected void doDestroy() { HttpServer httpServer = entry.getValue(); httpServer.stop(0); ((ExecutorService) httpServer.getExecutor()).shutdownNow(); + Utils.sleepUninterrupted(1000); } } } diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java index 1b2ac668a..37e240f57 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreaker.java @@ -90,7 +90,7 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker // map the wildcard resource to rule specific resource, // eg. /path/wildcard/123 => /path/wildcard/.+ - private Cache resourceMapping; + private final Map resourceMapping = new ConcurrentHashMap<>(); private Extensions extensions; @@ -102,6 +102,8 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker private long checkPeriod; + private long resourceExpireInterval; + private CircuitBreakerRuleDictionary circuitBreakerRuleDictionary; private FaultDetectRuleDictionary faultDetectRuleDictionary; @@ -110,7 +112,7 @@ public class PolarisCircuitBreaker extends Destroyable implements CircuitBreaker @Override public CircuitBreakerStatus checkResource(Resource resource) { - Resource ruleResource = getActualResource(resource); + Resource ruleResource = getActualResource(resource, false); Optional resourceCounters = getResourceCounters(ruleResource); if (null == resourceCounters) { if (resource.getLevel() == Level.METHOD && Objects.equals(ruleResource, resource)) { @@ -118,7 +120,7 @@ public CircuitBreakerStatus checkResource(Resource resource) { CircuitBreakerProto.CircuitBreakerRule circuitBreakerRule = circuitBreakerRuleDictionary.lookupCircuitBreakerRule(resource); ruleResource = computeResourceByRule(resource, circuitBreakerRule); if (!Objects.equals(ruleResource, resource)) { - resourceMapping.put(resource, ruleResource); + // 这里不能放缓存,需要在report的时候统一放,否则会有探测规则无法关联resource的问题 resourceCounters = getResourceCounters(ruleResource); } } @@ -139,16 +141,19 @@ public void report(ResourceStat resourceStat) { doReport(resourceStat, true); } - public Resource getActualResource(Resource resource) { - Resource ruleResource = resourceMapping.getIfPresent(resource); - if (null == ruleResource) { - ruleResource = resource; + public Resource getActualResource(Resource resource, boolean internal) { + ResourceWrap resourceWrap = resourceMapping.get(resource); + if (null == resourceWrap) { + return resource; + } + if (!internal) { + resourceWrap.lastAccessTimeMilli = System.currentTimeMillis(); } - return ruleResource; + return resourceWrap.resource; } private ResourceCounters getOrInitResourceCounters(Resource resource) throws ExecutionException { - Resource ruleResource = getActualResource(resource); + Resource ruleResource = getActualResource(resource, false); Optional resourceCounters = getResourceCounters(ruleResource); boolean reloadFaultDetect = false; if (null == resourceCounters) { @@ -211,7 +216,7 @@ private void reloadFaultDetector(Resource resource, ResourceCounters resourceCou if (null == healthCheckContainer) { return; } - healthCheckContainer.removeResource(getActualResource(resource)); + healthCheckContainer.removeResource(resource); } else { if (null == healthCheckContainer) { @@ -268,10 +273,12 @@ private Optional initResourceCounter(Resource resource) throws Cache> resourceOptionalCache = countersCache.get(resource.getLevel()); CircuitBreakerProto.CircuitBreakerRule finalCircuitBreakerRule = circuitBreakerRule; Resource ruleResource = computeResourceByRule(resource, circuitBreakerRule); + if (!Objects.equals(ruleResource, resource)) { + resourceMapping.put(resource, new ResourceWrap(ruleResource, System.currentTimeMillis())); + } return resourceOptionalCache.get(ruleResource, new Callable>() { @Override public Optional call() { - resourceMapping.put(resource, ruleResource); if (null == finalCircuitBreakerRule) { return Optional.empty(); } @@ -303,7 +310,7 @@ private void addInstanceForFaultDetect(Resource resource) { InstanceResource instanceResource = (InstanceResource) resource; HealthCheckContainer healthCheckContainer = healthCheckCache .get(instanceResource.getService()); - if (null == healthCheckContainer) { + if (null == healthCheckContainer || instanceResource.getPort() == 0) { return; } healthCheckContainer.addInstance(instanceResource); @@ -314,7 +321,7 @@ public PluginType getType() { return PluginTypes.CIRCUIT_BREAKER.getBaseType(); } - private class CounterRemoveListener implements RemovalListener> { + private static class CounterRemoveListener implements RemovalListener> { @Override public void onRemoval(RemovalNotification> removalNotification) { @@ -323,21 +330,12 @@ public void onRemoval(RemovalNotification> return; } value.ifPresent(resourceCounters -> resourceCounters.setDestroyed(true)); - Resource resource = removalNotification.getKey(); - if (null == resource) { - return; - } - HealthCheckContainer healthCheckContainer = PolarisCircuitBreaker.this.healthCheckCache.get(resource.getService()); - if (null != healthCheckContainer) { - healthCheckContainer.removeResource(resource); - } } } @Override public void init(InitContext ctx) throws PolarisException { - long expireIntervalMilli = ctx.getConfig().getConsumer().getCircuitBreaker().getCountersExpireInterval(); - resourceMapping = CacheBuilder.newBuilder().expireAfterAccess(expireIntervalMilli, TimeUnit.MILLISECONDS).build(); + resourceExpireInterval = ctx.getConfig().getConsumer().getCircuitBreaker().getCountersExpireInterval(); countersCache.put(Level.SERVICE, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build()); countersCache.put(Level.METHOD, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build()); countersCache.put(Level.INSTANCE, CacheBuilder.newBuilder().removalListener(new CounterRemoveListener()).build()); @@ -366,7 +364,19 @@ public void run() { } public void cleanupExpiredResources() { - resourceMapping.cleanUp(); + LOG.info("[CIRCUIT_BREAKER] cleanup expire resources"); + for (Map.Entry entry : resourceMapping.entrySet()) { + Resource resource = entry.getKey(); + if (System.currentTimeMillis() - entry.getValue().lastAccessTimeMilli >= resourceExpireInterval) { + LOG.info("[CIRCUIT_BREAKER] resource {} expired, start to cleanup", resource); + resourceMapping.remove(resource); + HealthCheckContainer healthCheckContainer = healthCheckCache.get(resource.getService()); + if (null == healthCheckContainer) { + continue; + } + healthCheckContainer.removeResource(resource); + } + } for (Map.Entry>> entry : countersCache.entrySet()) { Cache> values = entry.getValue(); values.asMap().forEach(new BiConsumer>() { @@ -458,8 +468,8 @@ public void setCircuitBreakerConfig(CircuitBreakerConfig circuitBreakerConfig) { this.circuitBreakerConfig = circuitBreakerConfig; } - Cache getResourceMapping() { - return resourceMapping; + int getResourceMappingSize() { + return resourceMapping.size(); } @Override @@ -477,8 +487,13 @@ void onCircuitBreakerRuleChanged(ServiceKey serviceKey) { if (Objects.equals(resource.getService(), serviceKey)) { cacheValue.invalidate(resource); } - HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey); - if (null != healthCheckContainer) { + } + } + HealthCheckContainer healthCheckContainer = healthCheckCache.get(serviceKey); + if (null != healthCheckContainer) { + for (Map.Entry entry : resourceMapping.entrySet()) { + Resource resource = entry.getKey(); + if (Objects.equals(resource.getService(), serviceKey)) { LOG.info("onCircuitBreakerRuleChanged: clear resource {} from healthCheckContainer", resource); healthCheckContainer.removeResource(resource); } @@ -551,4 +566,16 @@ public HealthCheckContainer apply(ServiceKey serviceKey, HealthCheckContainer he } }); } + + private static class ResourceWrap { + // target resource, not nullable + final Resource resource; + // only record the report time + long lastAccessTimeMilli; + + ResourceWrap(Resource resource, long lastAccessTimeMilli) { + this.resource = resource; + this.lastAccessTimeMilli = lastAccessTimeMilli; + } + } } diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceHealthChecker.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceHealthChecker.java index ab7f7fc1d..5bf1b7d62 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceHealthChecker.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceHealthChecker.java @@ -196,7 +196,7 @@ private boolean doCheck(Instance instance, Protocol protocol, FaultDetectRule fa if (!matchInstanceToResource(instance, resource)) { continue; } - Resource actualResource = polarisCircuitBreaker.getActualResource(resource); + Resource actualResource = polarisCircuitBreaker.getActualResource(resource, true); if (reportedResources.contains(actualResource)) { continue; } diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/test/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreakerTest.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/test/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreakerTest.java index f2a0a0373..16021363f 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/test/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreakerTest.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/test/java/com/tencent/polaris/plugins/circuitbreaker/composite/PolarisCircuitBreakerTest.java @@ -244,7 +244,7 @@ public void testCircuitBreakerMethodRules() { Assert.assertEquals(Status.OPEN, circuitBreakerStatus.getStatus()); } Assert.assertEquals(1, polarisCircuitBreaker.getCountersCache().get(Level.METHOD).size()); - Assert.assertEquals(1000, polarisCircuitBreaker.getResourceMapping().size()); + Assert.assertEquals(1000, polarisCircuitBreaker.getResourceMappingSize()); //check cleanup try { @@ -253,6 +253,6 @@ public void testCircuitBreakerMethodRules() { e.printStackTrace(); } polarisCircuitBreaker.cleanupExpiredResources(); - Assert.assertEquals(0, polarisCircuitBreaker.getResourceMapping().size()); + Assert.assertEquals(0, polarisCircuitBreaker.getResourceMappingSize()); } }