From 8f7d6f44c8ffe58b966e5cf2bec94188fbc2accb Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 10 Jun 2025 21:17:15 +0800 Subject: [PATCH 1/2] feat:support instance detect. --- .../client/api/ServiceCallResultChecker.java | 159 --------- .../task/InstancesCircuitBreakTask.java | 190 ----------- .../client/task/PriorityTaskScheduler.java | 48 --- ...laris.client.api.ServiceCallResultListener | 1 - .../main/resources/conf/default-config.yml | 2 +- .../consumer/OutlierDetectionConfig.java | 3 +- .../polaris/api/pojo/DefaultInstance.java | 5 + .../tencent/polaris/api/pojo/Instance.java | 7 + .../polaris/api/pojo/InstanceWrap.java | 5 + .../polaris/client/pojo/InstanceByProto.java | 16 +- .../tencent/polaris/client/util/Utils.java | 17 +- .../client/stat}/InstancesDetectTask.java | 313 +++++++++--------- .../stat/ServiceCallResultCollector.java | 85 +++++ ...laris.client.api.ServiceCallResultListener | 1 + .../registry/memory/InMemoryRegistry.java | 4 - 15 files changed, 278 insertions(+), 578 deletions(-) delete mode 100644 polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/api/ServiceCallResultChecker.java delete mode 100644 polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesCircuitBreakTask.java delete mode 100644 polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/PriorityTaskScheduler.java delete mode 100644 polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener rename {polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task => polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat}/InstancesDetectTask.java (83%) create mode 100644 polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/ServiceCallResultCollector.java create mode 100644 polaris-discovery/polaris-discovery-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/api/ServiceCallResultChecker.java b/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/api/ServiceCallResultChecker.java deleted file mode 100644 index 546810c27..000000000 --- a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/api/ServiceCallResultChecker.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making polaris-java available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.tencent.polaris.circuitbreak.client.api; - -import com.tencent.polaris.api.config.consumer.CircuitBreakerConfig; -import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig; -import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig.When; -import com.tencent.polaris.api.plugin.circuitbreaker.InstanceCircuitBreaker; -import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.registry.ResourceFilter; -import com.tencent.polaris.api.pojo.InstanceGauge; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; -import com.tencent.polaris.api.pojo.ServiceInstances; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.utils.CollectionUtils; -import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.circuitbreak.client.task.InstancesCircuitBreakTask; -import com.tencent.polaris.circuitbreak.client.task.InstancesDetectTask; -import com.tencent.polaris.circuitbreak.client.task.PriorityTaskScheduler; -import com.tencent.polaris.client.api.SDKContext; -import com.tencent.polaris.client.api.ServiceCallResultListener; - -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * 数据上报时用于检查下是否已经熔断 - */ -public class ServiceCallResultChecker implements ServiceCallResultListener { - - private PriorityTaskScheduler priorityTaskScheduler; - - private ScheduledExecutorService cbTaskExecutors; - - private ScheduledExecutorService detectTaskExecutors; - - private Extensions extensions; - - private InstancesDetectTask detectTask; - - private final AtomicInteger state = new AtomicInteger(0); - - @Override - public synchronized void init(SDKContext sdkContext) { - if (!state.compareAndSet(0, 1)) { - return; - } - extensions = sdkContext.getExtensions(); - CircuitBreakerConfig cbConfig = sdkContext.getConfig().getConsumer().getCircuitBreaker(); - if (cbConfig.isEnable()) { - priorityTaskScheduler = new PriorityTaskScheduler(); - cbTaskExecutors = Executors.newSingleThreadScheduledExecutor(); - CheckServicesCircuitBreak checker = new CheckServicesCircuitBreak( - sdkContext.getExtensions(), priorityTaskScheduler); - long checkPeriodMs = cbConfig.getCheckPeriod(); - cbTaskExecutors.scheduleAtFixedRate( - checker, checkPeriodMs, checkPeriodMs / 2, TimeUnit.MILLISECONDS); - } - OutlierDetectionConfig outlierDetection = sdkContext.getConfig().getConsumer().getOutlierDetection(); - if (outlierDetection.getWhen() != When.never) { - detectTaskExecutors = Executors.newSingleThreadScheduledExecutor(); - long checkPeriodMs = outlierDetection.getCheckPeriod(); - detectTask = new InstancesDetectTask(extensions, outlierDetection.getWhen()); - detectTaskExecutors.scheduleAtFixedRate(detectTask, checkPeriodMs, checkPeriodMs, TimeUnit.MILLISECONDS); - } - } - - @Override - public void onServiceCallResult(InstanceGauge result) { - if (null == priorityTaskScheduler) { - return; - } - if (CollectionUtils.isEmpty(extensions.getInstanceCircuitBreakers())) { - return; - } - InstancesCircuitBreakTask rtTask = null; - for (InstanceCircuitBreaker circuitBreaker : extensions.getInstanceCircuitBreakers()) { - String cbName = circuitBreaker.getName(); - boolean rtLimit = circuitBreaker.stat(result); - String instId = result.getInstanceId(); - if (rtLimit && StringUtils.isNotEmpty(instId)) { - ServiceKey svcKey = new ServiceKey(result.getNamespace(), result.getService()); - rtTask = new InstancesCircuitBreakTask( - svcKey, cbName, null, instId, extensions, InstancesCircuitBreakTask.TaskPriority.HIGH); - break; - } - } - if (null == rtTask) { - return; - } - priorityTaskScheduler.addCircuitBreakTask(rtTask); - } - - @Override - public synchronized void destroy() { - if (!state.compareAndSet(1, 0)) { - return; - } - if (null != priorityTaskScheduler) { - priorityTaskScheduler.destroy(); - } - if (null != cbTaskExecutors) { - cbTaskExecutors.shutdown(); - } - if (null != detectTask) { - detectTask.destroy(); - } - if (null != detectTaskExecutors) { - detectTaskExecutors.shutdown(); - } - } - - private static class CheckServicesCircuitBreak implements Runnable { - - private final Extensions extensions; - - private final PriorityTaskScheduler priorityTaskScheduler; - - public CheckServicesCircuitBreak(Extensions extensions, PriorityTaskScheduler priorityTaskScheduler) { - this.extensions = extensions; - this.priorityTaskScheduler = priorityTaskScheduler; - } - - @Override - public void run() { - Set services = extensions.getLocalRegistry().getServices(); - for (ServiceKey service : services) { - ServiceEventKey svcEventKey = new ServiceEventKey(service, EventType.INSTANCE); - ServiceInstances svcInstances = extensions.getLocalRegistry() - .getInstances(new ResourceFilter(svcEventKey, true, true)); - if (!svcInstances.isInitialized() || svcInstances.getInstances().isEmpty()) { - continue; - } - priorityTaskScheduler.addCircuitBreakTask( - new InstancesCircuitBreakTask(service, "", svcInstances.getInstances(), "", - extensions, InstancesCircuitBreakTask.TaskPriority.LOW)); - } - } - } -} diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesCircuitBreakTask.java b/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesCircuitBreakTask.java deleted file mode 100644 index 1c4d564a7..000000000 --- a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesCircuitBreakTask.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making polaris-java available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.tencent.polaris.circuitbreak.client.task; - -import static com.tencent.polaris.api.plugin.registry.InstanceProperty.PROPERTY_CIRCUIT_BREAKER_STATUS; - -import com.tencent.polaris.api.plugin.circuitbreaker.CircuitBreakResult; -import com.tencent.polaris.api.plugin.circuitbreaker.CircuitBreakResult.ResultKey; -import com.tencent.polaris.api.plugin.circuitbreaker.InstanceCircuitBreaker; -import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.registry.InstanceProperty; -import com.tencent.polaris.api.plugin.registry.ResourceFilter; -import com.tencent.polaris.api.plugin.registry.ServiceUpdateRequest; -import com.tencent.polaris.api.pojo.CircuitBreakerStatus; -import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.pojo.StatusDimension; -import com.tencent.polaris.api.utils.CollectionUtils; -import com.tencent.polaris.api.utils.MapUtils; -import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.client.pojo.ServiceInstancesByProto; -import com.tencent.polaris.logging.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; - -/** - * 熔断单个实例的任务 - */ -public class InstancesCircuitBreakTask implements Runnable, Comparable { - - @Override - public int compareTo(InstancesCircuitBreakTask o) { - return this.priority.ordinal() - o.priority.ordinal(); - } - - /** - * 任务优先级 - */ - public enum TaskPriority { - HIGH, - LOW, - } - - private static final Logger LOG = LoggerFactory.getLogger(InstancesCircuitBreakTask.class); - - private final ServiceKey serviceKey; - - private final String cbName; - - private final Collection instances; - - private final String instId; - - private final Extensions extensions; - - private final TaskPriority priority; - - public InstancesCircuitBreakTask(ServiceKey serviceKey, String cbName, Collection instances, - String instId, Extensions extensions, TaskPriority priority) { - this.serviceKey = serviceKey; - this.cbName = cbName; - this.instId = instId; - this.instances = instances; - this.extensions = extensions; - this.priority = priority; - } - - private Instance getInstance() { - ServiceEventKey serviceEventKey = new ServiceEventKey(serviceKey, EventType.INSTANCE); - ResourceFilter resourceFilter = new ResourceFilter(serviceEventKey, true, true); - ServiceInstancesByProto instances = (ServiceInstancesByProto) extensions.getLocalRegistry() - .getInstances(resourceFilter); - if (!instances.isInitialized()) { - return null; - } - return instances.getInstance(instId); - } - - @Override - public void run() { - Map allResults = new HashMap<>(); - Set statusChangedInstances = new HashSet<>(); - Collection targetInstances = instances; - if (StringUtils.isNotEmpty(instId)) { - Instance instance = getInstance(); - if (null != instance) { - targetInstances = new ArrayList<>(); - targetInstances.add(instance); - } - } - if (CollectionUtils.isEmpty(targetInstances)) { - return; - } - for (InstanceCircuitBreaker circuitBreaker : extensions.getInstanceCircuitBreakers()) { - if (StringUtils.isNotBlank(cbName) && !cbName.equals(circuitBreaker.getName())) { - continue; - } - CircuitBreakResult circuitBreakResult = circuitBreaker.checkInstance(targetInstances); - if (null == circuitBreakResult || circuitBreakResult.isEmptyResult()) { - continue; - } - cleanInstanceSet(circuitBreakResult.getInstancesToOpen(), statusChangedInstances); - cleanInstanceSet(circuitBreakResult.getInstancesToHalfOpen(), statusChangedInstances); - cleanInstanceSet(circuitBreakResult.getInstancesToClose(), statusChangedInstances); - allResults.put(circuitBreaker.getName(), circuitBreakResult); - } - ServiceUpdateRequest updateRequest = buildServiceUpdateRequest(serviceKey, allResults); - if (CollectionUtils.isEmpty(updateRequest.getProperties())) { - return; - } - LOG.info("update cache for circuitbreaker, value is {}", updateRequest); - extensions.getLocalRegistry().updateInstances(updateRequest); - } - - private void cleanInstanceSet(Map instanceSet, Set allInstances) { - Set instIdsToRemove = new HashSet<>(); - for (Map.Entry entry : instanceSet.entrySet()) { - ResultKey resultKey = entry.getKey(); - if (allInstances.contains(resultKey)) { - instIdsToRemove.add(resultKey); - } else { - allInstances.add(resultKey); - } - } - instIdsToRemove.forEach(instanceSet::remove); - } - - @SuppressWarnings("unchecked") - private void buildInstanceProperty(long now, Map results, int maxRequestAfterHalfOpen, - Map instanceProperties, String cbName, CircuitBreakerStatus.Status status) { - if (MapUtils.isEmpty(results)) { - return; - } - for (Map.Entry entry : results.entrySet()) { - ResultKey resultKey = entry.getKey(); - Instance instance = entry.getValue(); - String instId = resultKey.getInstId(); - InstanceProperty instanceProperty = instanceProperties.get(instId); - if (null == instanceProperty) { - Map properties = new HashMap<>(); - properties.put(PROPERTY_CIRCUIT_BREAKER_STATUS, new HashMap()); - instanceProperty = new InstanceProperty(instance, properties); - instanceProperties.put(instId, instanceProperty); - } - Map statusMap = (Map) - instanceProperty.getProperties().get(PROPERTY_CIRCUIT_BREAKER_STATUS); - statusMap.put(resultKey.getStatusDimension(), - new CircuitBreakerStatus(cbName, status, now)); - } - } - - private ServiceUpdateRequest buildServiceUpdateRequest( - ServiceKey serviceKey, Map allResults) { - Map properties = new HashMap<>(); - allResults.forEach((cbName, result) -> { - buildInstanceProperty(result.getCreateTimeMs(), result.getInstancesToOpen(), - result.getMaxRequestCountAfterHalfOpen(), properties, cbName, CircuitBreakerStatus.Status.OPEN); - buildInstanceProperty(result.getCreateTimeMs(), result.getInstancesToHalfOpen(), - result.getMaxRequestCountAfterHalfOpen(), properties, cbName, - CircuitBreakerStatus.Status.HALF_OPEN); - buildInstanceProperty(result.getCreateTimeMs(), result.getInstancesToClose(), - result.getMaxRequestCountAfterHalfOpen(), properties, cbName, CircuitBreakerStatus.Status.CLOSE); - }); - return new ServiceUpdateRequest(serviceKey, properties.values()); - } -} diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/PriorityTaskScheduler.java b/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/PriorityTaskScheduler.java deleted file mode 100644 index 4c248f793..000000000 --- a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/PriorityTaskScheduler.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making polaris-java available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.tencent.polaris.circuitbreak.client.task; - -import com.tencent.polaris.api.utils.ThreadPoolUtils; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 优先级队列调度器 - */ -public class PriorityTaskScheduler { - - private final ExecutorService priorityJobScheduler; - - public PriorityTaskScheduler() { - priorityJobScheduler = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new PriorityBlockingQueue<>()); - } - - public void addCircuitBreakTask(InstancesCircuitBreakTask task) { - priorityJobScheduler.execute(task); - } - - public void destroy() { - ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{priorityJobScheduler}); - } - -} diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener b/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener deleted file mode 100644 index ba410d5b0..000000000 --- a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener +++ /dev/null @@ -1 +0,0 @@ -com.tencent.polaris.circuitbreak.client.api.ServiceCallResultChecker diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml index 60281abea..b5c292276 100644 --- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml +++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml @@ -284,7 +284,7 @@ consumer: requestVolumeThreshold: 10 #描述:主动探测相关配置 outlierDetection: - #描述:何时开启主动探测。never(永不开启),on_recover(恢复时才开启主动探测),always(一直开启主动探测) + #描述:何时开启主动探测。never(永不开启),on_recover(废弃,恢复时才开启主动探测),after_call(调用后开启探测),always(一直开启主动探测) when: never #描述:主动探测周期 checkPeriod: 30s diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/consumer/OutlierDetectionConfig.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/consumer/OutlierDetectionConfig.java index feba9e34b..caf4740c6 100644 --- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/consumer/OutlierDetectionConfig.java +++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/consumer/OutlierDetectionConfig.java @@ -19,6 +19,7 @@ import com.tencent.polaris.api.config.plugin.PluginConfig; import com.tencent.polaris.api.config.verify.Verifier; + import java.util.List; /** @@ -30,7 +31,7 @@ public interface OutlierDetectionConfig extends PluginConfig, Verifier { enum When { - never, on_recover, always + never, on_recover, after_call, always } /** diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/DefaultInstance.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/DefaultInstance.java index 5b033b1e4..e752e169b 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/DefaultInstance.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/DefaultInstance.java @@ -226,6 +226,11 @@ public CircuitBreakerStatus getCircuitBreakerStatus(StatusDimension statusDimens return circuitBreakerStatuses.get(statusDimension); } + @Override + public RetStatus getDetectStatus() { + return null; + } + @Override public int compareTo(Instance instance) { String curHost = StringUtils.defaultString(this.getHost()); diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/Instance.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/Instance.java index 73e096a46..23a1184c1 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/Instance.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/Instance.java @@ -57,6 +57,13 @@ public interface Instance extends BaseInstance, Comparable { */ CircuitBreakerStatus getCircuitBreakerStatus(StatusDimension statusDimension); + /** + * 获取探测状态 + * + * @return 探测状态 + */ + RetStatus getDetectStatus(); + boolean isHealthy(); boolean isIsolated(); diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/InstanceWrap.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/InstanceWrap.java index 8bc78986e..3f741f7db 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/InstanceWrap.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/pojo/InstanceWrap.java @@ -69,6 +69,11 @@ public CircuitBreakerStatus getCircuitBreakerStatus(StatusDimension statusDimens return originalInstance.getCircuitBreakerStatus(statusDimension); } + @Override + public RetStatus getDetectStatus() { + return originalInstance.getDetectStatus(); + } + @Override public boolean isHealthy() { return originalInstance.isHealthy(); diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/InstanceByProto.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/InstanceByProto.java index 24fbec117..5f92e07ef 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/InstanceByProto.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/InstanceByProto.java @@ -18,14 +18,10 @@ package com.tencent.polaris.client.pojo; import com.google.protobuf.UInt32Value; -import com.tencent.polaris.api.pojo.CircuitBreakerStatus; -import com.tencent.polaris.api.pojo.DetectResult; -import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.InstanceLocalValue; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.pojo.StatusDimension; +import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.utils.TimeUtils; import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; + import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -115,6 +111,14 @@ public CircuitBreakerStatus getCircuitBreakerStatus(StatusDimension statusDimens return instanceLocalValue.getCircuitBreakerStatus(statusDimension); } + @Override + public RetStatus getDetectStatus() { + if (null == instanceLocalValue || null == instanceLocalValue.getDetectResult()) { + return null; + } + return instanceLocalValue.getDetectResult().getRetStatus(); + } + public DetectResult getDetectResult() { if (null == instanceLocalValue) { return null; diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/util/Utils.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/util/Utils.java index 801b5f910..f93cc449f 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/util/Utils.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/util/Utils.java @@ -18,20 +18,17 @@ package com.tencent.polaris.client.util; import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.RetStatus; import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.ServiceInstancesByProto; import com.tencent.polaris.logging.LoggerFactory; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import org.slf4j.Logger; + +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.slf4j.Logger; /** * Common util class. @@ -88,10 +85,10 @@ public static boolean regMatch(String regex, String input) { public static boolean isHealthyInstance(Instance instance) { - if (!instance.isHealthy()) { - return false; + if (instance.getDetectStatus() != null) { + return instance.getDetectStatus() == RetStatus.RetSuccess; } - return true; + return instance.isHealthy(); } diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesDetectTask.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/InstancesDetectTask.java similarity index 83% rename from polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesDetectTask.java rename to polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/InstancesDetectTask.java index ad4e525ca..369beaa77 100644 --- a/polaris-circuitbreaker/polaris-circuitbreaker-client/src/main/java/com/tencent/polaris/circuitbreak/client/task/InstancesDetectTask.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/InstancesDetectTask.java @@ -1,158 +1,155 @@ -/* - * Tencent is pleased to support the open source community by making polaris-java available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package com.tencent.polaris.circuitbreak.client.task; - -import static com.tencent.polaris.api.plugin.registry.InstanceProperty.PROPERTY_DETECT_RESULT; - -import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig.When; -import com.tencent.polaris.api.exception.PolarisException; -import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.detect.HealthChecker; -import com.tencent.polaris.api.plugin.registry.InstanceProperty; -import com.tencent.polaris.api.plugin.registry.ResourceFilter; -import com.tencent.polaris.api.plugin.registry.ServiceUpdateRequest; -import com.tencent.polaris.api.pojo.DetectResult; -import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.RetStatus; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; -import com.tencent.polaris.api.pojo.ServiceInstances; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.utils.MapUtils; -import com.tencent.polaris.api.pojo.CircuitBreakerStatus; -import com.tencent.polaris.client.pojo.InstanceByProto; -import com.tencent.polaris.logging.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; - -public class InstancesDetectTask implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(InstancesDetectTask.class); - - private final Extensions extensions; - private final When when; - - private final AtomicBoolean destroy = new AtomicBoolean(false); - - public InstancesDetectTask(Extensions extensions, When when) { - this.extensions = extensions; - this.when = when; - } - - @Override - public void run() { - Set services = extensions.getLocalRegistry().getServices(); - for (ServiceKey serviceKey : services) { - if (Objects.equals(serviceKey.getNamespace(), "Polaris")) { - continue; // 北极星内部服务器不进行网络探测 - } - try { - doInstanceDetectForService(serviceKey); - } catch (PolarisException e) { - LOG.error("fail to do instance detect for {}, e:{}", serviceKey, e); - } - } - } - - /** - * 注销任务 - */ - public void destroy() { - destroy.set(true); - } - - private void doInstanceDetectForService(ServiceKey serviceKey) throws PolarisException { - ServiceEventKey svcEventKey = new ServiceEventKey(serviceKey, EventType.INSTANCE); - ServiceInstances instances = extensions.getLocalRegistry().getInstances( - new ResourceFilter(svcEventKey, true, true)); - if (!instances.isInitialized() || instances.getInstances().size() == 0) { - return; - } - - Map aliveResults = new HashMap<>(); - for (Instance instance : instances.getInstances()) { - if (destroy.get()) { - // 如果要停止定时任务,则剩下的实例探测也没必要进行下去了 - break; - } - if (when == When.on_recover - && instance.getCircuitBreakerStatus().getStatus() != CircuitBreakerStatus.Status.OPEN) { - continue; // 只探测熔断实例 - } - DetectResult result = detectInstance(instance); - if (result == null) { - continue; - } - aliveResults.put(instance, result); - - } - if (MapUtils.isNotEmpty(aliveResults)) { - ServiceUpdateRequest updateRequest = buildInstanceUpdateResult(serviceKey, aliveResults); - LOG.debug("update cache for outlier detect, value is {}", updateRequest); - extensions.getLocalRegistry().updateInstances(updateRequest); - } - } - - private ServiceUpdateRequest buildInstanceUpdateResult(ServiceKey serviceKey, - Map aliveResults) { - List instances = new ArrayList<>(); - int notChange = 0; - for (Map.Entry entry : aliveResults.entrySet()) { - InstanceByProto instance = (InstanceByProto) entry.getKey(); - DetectResult detectResult = instance.getDetectResult(); - if (detectResult != null && detectResult.getRetStatus() == entry.getValue().getRetStatus()) { - notChange++; - continue; - } - Map properties = new HashMap<>(); - properties.put(PROPERTY_DETECT_RESULT, entry.getValue()); - InstanceProperty instanceProperty = new InstanceProperty(entry.getKey(), properties); - instances.add(instanceProperty); - } - LOG.info("{} detect {} instances, update {}", serviceKey, aliveResults.size(), - aliveResults.size() - notChange); - return new ServiceUpdateRequest(serviceKey, instances); - } - - private DetectResult detectInstance(Instance instance) throws PolarisException { - DetectResult result = null; - for (HealthChecker detector : extensions.getHealthCheckers()) { - DetectResult pluginResult = detector.detectInstance(instance, null); - if (pluginResult == null) { - continue; - } - result = pluginResult; - if (result.getRetStatus() == RetStatus.RetSuccess) { - result.setDetectType(detector.getName()); - return result; - } - } - if (result != null) { - result.setDetectType("all"); - } - return result; // 如果没有探测则返回null, 全部失败返回失败 - } -} +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.discovery.client.stat; + +import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig.When; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.detect.HealthChecker; +import com.tencent.polaris.api.plugin.registry.InstanceProperty; +import com.tencent.polaris.api.plugin.registry.ResourceFilter; +import com.tencent.polaris.api.plugin.registry.ServiceUpdateRequest; +import com.tencent.polaris.api.pojo.*; +import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.MapUtils; +import com.tencent.polaris.client.pojo.InstanceByProto; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.tencent.polaris.api.plugin.registry.InstanceProperty.PROPERTY_DETECT_RESULT; + +public class InstancesDetectTask implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(InstancesDetectTask.class); + + private final Extensions extensions; + private final When when; + + private final AtomicBoolean destroy = new AtomicBoolean(false); + + private final Set serviceKeySet; + + public InstancesDetectTask(Extensions extensions, When when) { + this.extensions = extensions; + this.when = when; + this.serviceKeySet = new HashSet<>(); + } + + @Override + public void run() { + Set services; + if (when == When.after_call) { + services = serviceKeySet; + } else { + services = extensions.getLocalRegistry().getServices(); + } + for (ServiceKey serviceKey : services) { + if (Objects.equals(serviceKey.getNamespace(), "Polaris")) { + continue; // 北极星内部服务器不进行网络探测 + } + try { + doInstanceDetectForService(serviceKey); + } catch (PolarisException e) { + LOG.error("fail to do instance detect for {}, e:{}", serviceKey, e); + } + } + } + + /** + * 注销任务 + */ + public void destroy() { + destroy.set(true); + } + + private void doInstanceDetectForService(ServiceKey serviceKey) throws PolarisException { + ServiceEventKey svcEventKey = new ServiceEventKey(serviceKey, EventType.INSTANCE); + ServiceInstances instances = extensions.getLocalRegistry().getInstances( + new ResourceFilter(svcEventKey, true, true)); + if (!instances.isInitialized() || CollectionUtils.isEmpty(instances.getInstances())) { + return; + } + + Map aliveResults = new HashMap<>(); + for (Instance instance : instances.getInstances()) { + if (destroy.get()) { + // 如果要停止定时任务,则剩下的实例探测也没必要进行下去了 + break; + } + DetectResult result = detectInstance(instance); + if (result == null) { + continue; + } + aliveResults.put(instance, result); + + } + if (MapUtils.isNotEmpty(aliveResults)) { + ServiceUpdateRequest updateRequest = buildInstanceUpdateResult(serviceKey, aliveResults); + LOG.debug("update cache for outlier detect, value is {}", updateRequest); + extensions.getLocalRegistry().updateInstances(updateRequest); + } + } + + private ServiceUpdateRequest buildInstanceUpdateResult(ServiceKey serviceKey, + Map aliveResults) { + List instances = new ArrayList<>(); + int notChange = 0; + for (Map.Entry entry : aliveResults.entrySet()) { + InstanceByProto instance = (InstanceByProto) entry.getKey(); + DetectResult detectResult = instance.getDetectResult(); + if (detectResult != null && detectResult.getRetStatus() == entry.getValue().getRetStatus()) { + notChange++; + continue; + } + Map properties = new HashMap<>(); + properties.put(PROPERTY_DETECT_RESULT, entry.getValue()); + InstanceProperty instanceProperty = new InstanceProperty(entry.getKey(), properties); + instances.add(instanceProperty); + } + LOG.info("{} detect {} instances, update {}", serviceKey, aliveResults.size(), + aliveResults.size() - notChange); + return new ServiceUpdateRequest(serviceKey, instances); + } + + private DetectResult detectInstance(Instance instance) throws PolarisException { + DetectResult result = null; + for (HealthChecker detector : extensions.getHealthCheckers()) { + DetectResult pluginResult = detector.detectInstance(instance, null); + if (pluginResult == null) { + continue; + } + result = pluginResult; + if (result.getRetStatus() == RetStatus.RetSuccess) { + result.setDetectType(detector.getName()); + return result; + } + } + if (result != null) { + result.setDetectType("all"); + } + return result; // 如果没有探测则返回null, 全部失败返回失败 + } + + public Set getServiceKeySet() { + return serviceKeySet; + } +} diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/ServiceCallResultCollector.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/ServiceCallResultCollector.java new file mode 100644 index 000000000..29b905039 --- /dev/null +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/stat/ServiceCallResultCollector.java @@ -0,0 +1,85 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.discovery.client.stat; + +import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig; +import com.tencent.polaris.api.config.consumer.OutlierDetectionConfig.When; +import com.tencent.polaris.api.pojo.InstanceGauge; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.client.api.ServiceCallResultListener; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 调用结果记录。 + * + * @author Haotian Zhang + */ +public class ServiceCallResultCollector implements ServiceCallResultListener { + + private ScheduledExecutorService detectTaskExecutors; + + private OutlierDetectionConfig outlierDetectionConfig; + + private InstancesDetectTask detectTask; + + private final AtomicInteger state = new AtomicInteger(0); + + private Set calledServiceSet = new HashSet<>(); + + @Override + public synchronized void init(SDKContext sdkContext) { + if (!state.compareAndSet(0, 1)) { + return; + } + outlierDetectionConfig = sdkContext.getConfig().getConsumer().getOutlierDetection(); + if (outlierDetectionConfig.getWhen() != When.never) { + detectTaskExecutors = Executors.newSingleThreadScheduledExecutor(); + long checkPeriodMs = outlierDetectionConfig.getCheckPeriod(); + detectTask = new InstancesDetectTask(sdkContext.getExtensions(), outlierDetectionConfig.getWhen()); + calledServiceSet = detectTask.getServiceKeySet(); + detectTaskExecutors.scheduleAtFixedRate(detectTask, checkPeriodMs, checkPeriodMs, TimeUnit.MILLISECONDS); + } + } + + @Override + public void onServiceCallResult(InstanceGauge result) { + if (outlierDetectionConfig.getWhen() == When.after_call) { + calledServiceSet.add(new ServiceKey(result.getNamespace(), result.getService())); + } + } + + @Override + public synchronized void destroy() { + if (!state.compareAndSet(1, 0)) { + return; + } + if (null != detectTask) { + detectTask.destroy(); + } + if (null != detectTaskExecutors) { + detectTaskExecutors.shutdown(); + } + } +} diff --git a/polaris-discovery/polaris-discovery-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener b/polaris-discovery/polaris-discovery-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener new file mode 100644 index 000000000..ac63c2779 --- /dev/null +++ b/polaris-discovery/polaris-discovery-client/src/main/resources/META-INF/services/com.tencent.polaris.client.api.ServiceCallResultListener @@ -0,0 +1 @@ +com.tencent.polaris.discovery.client.stat.ServiceCallResultCollector diff --git a/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/InMemoryRegistry.java b/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/InMemoryRegistry.java index 9a781c855..f950b88e9 100644 --- a/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/InMemoryRegistry.java +++ b/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/InMemoryRegistry.java @@ -369,10 +369,6 @@ public void updateInstances(ServiceUpdateRequest request) { instance.getHost(), instance.getPort(), properties); for (Map.Entry entry : properties.entrySet()) { switch (entry.getKey()) { - case InstanceProperty.PROPERTY_CIRCUIT_BREAKER_STATUS: - onCircuitBreakStatus(entry.getValue(), instanceLocalValue, instance); - changed = true; - break; case InstanceProperty.PROPERTY_DETECT_RESULT: instanceLocalValue.setDetectResult((DetectResult) entry.getValue()); break; From 330bff3a597567a24ce40a41d075812f13d16589 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 10 Jun 2025 21:46:46 +0800 Subject: [PATCH 2/2] feat:support instance detect. --- .../factory/test/CircuitBreakerTest.java | 130 ------------------ 1 file changed, 130 deletions(-) diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-factory/src/test/java/com/tencent/polaris/circuitbreaker/factory/test/CircuitBreakerTest.java b/polaris-circuitbreaker/polaris-circuitbreaker-factory/src/test/java/com/tencent/polaris/circuitbreaker/factory/test/CircuitBreakerTest.java index 492df5a65..5c4a9859f 100644 --- a/polaris-circuitbreaker/polaris-circuitbreaker-factory/src/test/java/com/tencent/polaris/circuitbreaker/factory/test/CircuitBreakerTest.java +++ b/polaris-circuitbreaker/polaris-circuitbreaker-factory/src/test/java/com/tencent/polaris/circuitbreaker/factory/test/CircuitBreakerTest.java @@ -19,8 +19,6 @@ import com.google.protobuf.util.JsonFormat; import com.tencent.polaris.api.config.Configuration; -import com.tencent.polaris.api.config.plugin.DefaultPlugins; -import com.tencent.polaris.api.pojo.CircuitBreakerStatus; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.RetStatus; import com.tencent.polaris.api.pojo.ServiceKey; @@ -34,7 +32,6 @@ import com.tencent.polaris.circuitbreak.client.exception.CallAbortedException; import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory; import com.tencent.polaris.client.util.Utils; -import com.tencent.polaris.factory.config.ConfigurationImpl; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto; import com.tencent.polaris.test.common.TestUtils; @@ -51,7 +48,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -138,132 +134,6 @@ public void testUpdateServiceCallResult() { } } - @Test - public void testCircuitBreakByErrorCount() { - Configuration configuration = TestUtils.configWithEnvAddress(); - ((ConfigurationImpl) configuration).getConsumer().getCircuitBreaker().setChain( - Collections.singletonList(DefaultPlugins.CIRCUIT_BREAKER_ERROR_COUNT)); - try (AssemblyAPI assemblyAPI = AssemblyAPIFactory.createAssemblyAPIByConfig(configuration)) { - Utils.sleepUninterrupted(10000); - Assert.assertNotNull(assemblyAPI); - GetReachableInstancesRequest req = new GetReachableInstancesRequest(); - req.setNamespace(NAMESPACE_TEST); - req.setService(SERVICE_CIRCUIT_BREAKER); - List instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT, instances.size()); - Instance instanceToLimit = instances.get(1); - //report 60 fail in 500ms - for (int i = 0; i < 60; ++i) { - if (i == 1) { - Utils.sleepUninterrupted(5 * 1000); - } - ServiceCallResult result = instanceToResult(instanceToLimit); - result.setRetCode(-1); - result.setDelay(1000L); - result.setRetStatus(RetStatus.RetFail); - assemblyAPI.updateServiceCallResult(result); - if (i % 10 == 0) { - Utils.sleepUninterrupted(1); - } - } - Utils.sleepUninterrupted(3000); - instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT - 1, instances.size()); - boolean exists = false; - for (Instance instance : instances) { - if (instance.getId().equals(instanceToLimit.getId())) { - exists = true; - } - } - Assert.assertFalse(exists); - LOG.info("start to test half open by error rate"); - Utils.sleepUninterrupted(10000); - instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT, instances.size()); - for (Instance instance : instances) { - CircuitBreakerStatus circuitBreakerStatus = instance.getCircuitBreakerStatus(); - if (null != circuitBreakerStatus - && circuitBreakerStatus.getStatus() == CircuitBreakerStatus.Status.HALF_OPEN) { - LOG.info("half open instance is {}", instance); - } - } - //default halfopen pass 3 success - int requestCountAfterHalfOpen = configuration.getConsumer().getCircuitBreaker() - .getRequestCountAfterHalfOpen(); - for (int i = 0; i < requestCountAfterHalfOpen; i++) { - ServiceCallResult result = instanceToResult(instanceToLimit); - result.setRetCode(-1); - result.setRetStatus(RetStatus.RetSuccess); - assemblyAPI.updateServiceCallResult(result); - Utils.sleepUninterrupted(200); - assemblyAPI.updateServiceCallResult(result); - } - LOG.info("start to test half open to close"); - Utils.sleepUninterrupted(1000); - instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT, instances.size()); - } - } - - @Test - public void testCircuitBreakByErrorRate() { - Configuration configuration = TestUtils.configWithEnvAddress(); - ((ConfigurationImpl) configuration).getConsumer().getCircuitBreaker().setChain( - Collections.singletonList(DefaultPlugins.CIRCUIT_BREAKER_ERROR_RATE)); - try (AssemblyAPI assemblyAPI = AssemblyAPIFactory.createAssemblyAPIByConfig(configuration)) { - GetReachableInstancesRequest req = new GetReachableInstancesRequest(); - req.setNamespace(NAMESPACE_TEST); - req.setService(SERVICE_CIRCUIT_BREAKER); - List instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT, instances.size()); - Instance instanceToLimit = instances.get(1); - //report 60 fail in 500ms - for (int i = 0; i < 60; ++i) { - if (i == 1) { - Utils.sleepUninterrupted(5 * 1000); - } - ServiceCallResult result = instanceToResult(instanceToLimit); - result.setDelay(1000L); - if (i % 2 == 0) { - result.setRetCode(0); - result.setRetStatus(RetStatus.RetSuccess); - Utils.sleepUninterrupted(1); - } else { - result.setRetCode(-1); - result.setRetStatus(RetStatus.RetFail); - } - assemblyAPI.updateServiceCallResult(result); - Utils.sleepUninterrupted(1); - } - Utils.sleepUninterrupted(1000); - instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT - 1, instances.size()); - boolean exists = false; - for (Instance instance : instances) { - if (instance.getId().equals(instanceToLimit.getId())) { - exists = true; - } - } - Assert.assertFalse(exists); - Utils.sleepUninterrupted(10000); - //default halfopen pass 3 success - int requestCountAfterHalfOpen = configuration.getConsumer().getCircuitBreaker() - .getRequestCountAfterHalfOpen(); - for (int i = 0; i < requestCountAfterHalfOpen; i++) { - ServiceCallResult result = instanceToResult(instanceToLimit); - result.setRetCode(-1); - result.setRetStatus(RetStatus.RetSuccess); - assemblyAPI.updateServiceCallResult(result); - Utils.sleepUninterrupted(200); - assemblyAPI.updateServiceCallResult(result); - } - LOG.info("start to test half open to close"); - Utils.sleepUninterrupted(1000); - instances = assemblyAPI.getReachableInstances(req); - Assert.assertEquals(MAX_COUNT, instances.size()); - } - } - @Test public void testFunctionalDecorator() { Configuration configuration = TestUtils.configWithEnvAddress();