Skip to content

fix: 修复通配API在探测关闭后无法停止探测任务的问题 #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException {
statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig);
try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) {
sdkContext.init();
URL metricsUrl = new URL("http://127.0.0.1:28080/metrics");
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
metricsConn.connect();
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
for (int i = 0; i < 3; i++) {
URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", i));
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
try {
metricsConn.connect();
} catch (IOException e) {
continue;
}
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
}
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

package com.tencent.polaris.circuitbreaker.factory.test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.protobuf.util.JsonFormat;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.plugin.DefaultPlugins;
Expand Down Expand Up @@ -56,6 +46,16 @@
import org.junit.Test;
import org.slf4j.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.tencent.polaris.test.common.Consts.NAMESPACE_TEST;
import static com.tencent.polaris.test.common.Consts.SERVICE_CIRCUIT_BREAKER;
import static com.tencent.polaris.test.common.TestUtils.SERVER_ADDRESS_ENV;
Expand Down Expand Up @@ -154,6 +154,9 @@ public void testCircuitBreakByErrorCount() {
Instance instanceToLimit = instances.get(1);
//report 60 fail in 500ms
for (int i = 0; i < 60; ++i) {
if (i == 1) {
Utils.sleepUninterrupted(5 * 1000);
}
ServiceCallResult result = instanceToResult(instanceToLimit);
result.setRetCode(-1);
result.setDelay(1000L);
Expand Down Expand Up @@ -216,6 +219,9 @@ public void testCircuitBreakByErrorRate() {
Instance instanceToLimit = instances.get(1);
//report 60 fail in 500ms
for (int i = 0; i < 60; ++i) {
if (i == 1) {
Utils.sleepUninterrupted(5 * 1000);
}
ServiceCallResult result = instanceToResult(instanceToLimit);
result.setDelay(1000L);
if (i % 2 == 0) {
Expand Down Expand Up @@ -268,8 +274,7 @@ public void testFunctionalDecorator() {
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
throw new IllegalArgumentException("invoke failed");
}
else {
} else {
System.out.println("invoke success");
}
});
Expand All @@ -278,8 +283,7 @@ public void testFunctionalDecorator() {
Utils.sleepUninterrupted(1000);
integerConsumer.accept(2);
Utils.sleepUninterrupted(1000);
}
catch (Exception e) {
} catch (Exception e) {
if (!(e instanceof IllegalArgumentException)) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package com.tencent.polaris.circuitbreaker.factory.test;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;

import com.google.protobuf.StringValue;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.plugin.circuitbreaker.CircuitBreaker;
Expand All @@ -45,116 +40,121 @@
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;

import static com.tencent.polaris.test.common.TestUtils.SERVER_ADDRESS_ENV;

public class FaultDetectorTest {

private NamingServer namingServer;

private final ServiceKey matchMethodService = new ServiceKey("Test", "SvcCbMethod");

private final ServiceKey matchMethodDetectService = new ServiceKey("Test", "SvcCbMethodDetect");

@Before
public void before() throws IOException {
try {
namingServer = NamingServer.startNamingServer(-1);
System.setProperty(SERVER_ADDRESS_ENV, String.format("127.0.0.1:%d", namingServer.getPort()));
}
catch (IOException e) {
Assert.fail(e.getMessage());
}

CircuitBreakerProto.CircuitBreakerRule cbRule1 = CbTestUtils.loadCbRule("circuitBreakerMethodRuleNoDetect.json");
CircuitBreakerProto.CircuitBreaker circuitBreaker = CircuitBreakerProto.CircuitBreaker.newBuilder()
.addRules(cbRule1).setRevision(StringValue.newBuilder().setValue("0000").build()).build();
namingServer.getNamingService().setCircuitBreaker(matchMethodService, circuitBreaker);


CircuitBreakerProto.CircuitBreakerRule cbRule3 = CbTestUtils.loadCbRule("circuitBreakerMethodRule.json");
CircuitBreakerProto.CircuitBreakerRule cbRule4 = CbTestUtils.loadCbRule("circuitBreakerRule.json");
circuitBreaker = CircuitBreakerProto.CircuitBreaker.newBuilder()
.addRules(cbRule3).addRules(cbRule4).setRevision(StringValue.newBuilder().setValue("1111").build()).build();
namingServer.getNamingService().setCircuitBreaker(matchMethodDetectService, circuitBreaker);
FaultDetectorProto.FaultDetectRule rule1 = CbTestUtils.loadFdRule("faultDetectRule.json");
FaultDetectorProto.FaultDetectRule rule2 = CbTestUtils.loadFdRule("faultDetectMethodRule.json");
FaultDetectorProto.FaultDetector faultDetector = FaultDetectorProto.FaultDetector.newBuilder()
.addRules(rule1).addRules(rule2).setRevision("2222").build();
namingServer.getNamingService().setFaultDetector(matchMethodDetectService, faultDetector);
}

@Test
public void testFaultDetectRuleChanged() throws IOException {
Configuration configuration = TestUtils.configWithEnvAddress();
ConfigurationImpl configurationImpl = (ConfigurationImpl) configuration;
try (CircuitBreakAPI circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByConfig(configurationImpl)) {
for (int i = 0; i < 10; i++) {
String method = "";
if (i < 9) {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
throw new IllegalArgumentException("invoke failed" + finalI);
}
else {
System.out.println("invoke success" + finalI);
}
});
integerConsumer.accept(1);
}
BaseEngine baseEngine = (BaseEngine) circuitBreakAPI;
CircuitBreaker resourceBreaker = baseEngine.getSDKContext().getExtensions().getResourceBreaker();
PolarisCircuitBreaker polarisCircuitBreaker = (PolarisCircuitBreaker) resourceBreaker;
Map<ServiceKey, HealthCheckContainer> healthCheckCache = polarisCircuitBreaker.getHealthCheckCache();
Assert.assertEquals(1, healthCheckCache.size());
HealthCheckContainer healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNotNull(healthCheckContainer);
Collection<ResourceHealthChecker> healthCheckerValues = healthCheckContainer.getHealthCheckerValues();
Assert.assertEquals(2, healthCheckerValues.size());

FaultDetectorProto.FaultDetectRule rule1 = CbTestUtils.loadFdRule("faultDetectMethodRuleChanged.json");
FaultDetectorProto.FaultDetector faultDetector = FaultDetectorProto.FaultDetector.newBuilder()
.addRules(rule1).setRevision("33333").build();
namingServer.getNamingService().setFaultDetector(matchMethodDetectService, faultDetector);

Utils.sleepUninterrupted(20 * 1000);
healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNull(healthCheckContainer);
for (int i = 0; i < 3; i++) {
String method = "";
if (i > 0) {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
throw new IllegalArgumentException("invoke failed" + finalI);
}
else {
System.out.println("invoke success" + finalI);
}
});
integerConsumer.accept(1);
}
healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNotNull(healthCheckContainer);
healthCheckerValues = healthCheckContainer.getHealthCheckerValues();
Assert.assertEquals(1, healthCheckerValues.size());
}
}

@After
public void after() {
if (null != namingServer) {
namingServer.terminate();
}
}
private NamingServer namingServer;

private final ServiceKey matchMethodService = new ServiceKey("Test", "SvcCbMethod");

private final ServiceKey matchMethodDetectService = new ServiceKey("Test", "SvcCbMethodDetect");

@Before
public void before() throws IOException {
try {
namingServer = NamingServer.startNamingServer(-1);
System.setProperty(SERVER_ADDRESS_ENV, String.format("127.0.0.1:%d", namingServer.getPort()));
} catch (IOException e) {
Assert.fail(e.getMessage());
}

CircuitBreakerProto.CircuitBreakerRule cbRule1 = CbTestUtils.loadCbRule("circuitBreakerMethodRuleNoDetect.json");
CircuitBreakerProto.CircuitBreaker circuitBreaker = CircuitBreakerProto.CircuitBreaker.newBuilder()
.addRules(cbRule1).setRevision(StringValue.newBuilder().setValue("0000").build()).build();
namingServer.getNamingService().setCircuitBreaker(matchMethodService, circuitBreaker);


CircuitBreakerProto.CircuitBreakerRule cbRule3 = CbTestUtils.loadCbRule("circuitBreakerMethodRule.json");
CircuitBreakerProto.CircuitBreakerRule cbRule4 = CbTestUtils.loadCbRule("circuitBreakerRule.json");
circuitBreaker = CircuitBreakerProto.CircuitBreaker.newBuilder()
.addRules(cbRule3).addRules(cbRule4).setRevision(StringValue.newBuilder().setValue("1111").build()).build();
namingServer.getNamingService().setCircuitBreaker(matchMethodDetectService, circuitBreaker);
FaultDetectorProto.FaultDetectRule rule1 = CbTestUtils.loadFdRule("faultDetectRule.json");
FaultDetectorProto.FaultDetectRule rule2 = CbTestUtils.loadFdRule("faultDetectMethodRule.json");
FaultDetectorProto.FaultDetector faultDetector = FaultDetectorProto.FaultDetector.newBuilder()
.addRules(rule1).addRules(rule2).setRevision("2222").build();
namingServer.getNamingService().setFaultDetector(matchMethodDetectService, faultDetector);
}

@Test
public void testFaultDetectRuleChanged() throws IOException {
Configuration configuration = TestUtils.configWithEnvAddress();
ConfigurationImpl configurationImpl = (ConfigurationImpl) configuration;
try (CircuitBreakAPI circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByConfig(configurationImpl)) {
for (int i = 0; i < 10; i++) {
if (i == 1) {
Utils.sleepUninterrupted(5 * 1000);
}
String method = "";
if (i < 9) {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
throw new IllegalArgumentException("invoke failed" + finalI);
} else {
System.out.println("invoke success" + finalI);
}
});
integerConsumer.accept(1);
}
BaseEngine baseEngine = (BaseEngine) circuitBreakAPI;
CircuitBreaker resourceBreaker = baseEngine.getSDKContext().getExtensions().getResourceBreaker();
PolarisCircuitBreaker polarisCircuitBreaker = (PolarisCircuitBreaker) resourceBreaker;
Map<ServiceKey, HealthCheckContainer> healthCheckCache = polarisCircuitBreaker.getHealthCheckCache();
Assert.assertEquals(1, healthCheckCache.size());
HealthCheckContainer healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNotNull(healthCheckContainer);
Collection<ResourceHealthChecker> healthCheckerValues = healthCheckContainer.getHealthCheckerValues();
Assert.assertEquals(2, healthCheckerValues.size());

FaultDetectorProto.FaultDetectRule rule1 = CbTestUtils.loadFdRule("faultDetectMethodRuleChanged.json");
FaultDetectorProto.FaultDetector faultDetector = FaultDetectorProto.FaultDetector.newBuilder()
.addRules(rule1).setRevision("33333").build();
namingServer.getNamingService().setFaultDetector(matchMethodDetectService, faultDetector);

Utils.sleepUninterrupted(20 * 1000);
healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNull(healthCheckContainer);
for (int i = 0; i < 3; i++) {
String method = "";
if (i > 0) {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
throw new IllegalArgumentException("invoke failed" + finalI);
} else {
System.out.println("invoke success" + finalI);
}
});
integerConsumer.accept(1);
}
healthCheckContainer = healthCheckCache.get(matchMethodDetectService);
Assert.assertNotNull(healthCheckContainer);
healthCheckerValues = healthCheckContainer.getHealthCheckerValues();
Assert.assertEquals(1, healthCheckerValues.size());
}
}

@After
public void after() {
if (null != namingServer) {
namingServer.terminate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,18 @@ public void testHttpServerRandomPort() throws IOException {
statReporterConfig.setPluginConfig(StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS, prometheusHandlerConfig);
try (SDKContext sdkContext = SDKContext.initContextByConfig(configuration)) {
sdkContext.init();
URL metricsUrl = new URL("http://127.0.0.1:28080/metrics");
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
metricsConn.connect();
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
for (int i = 0; i < 3; i++) {
URL metricsUrl = new URL(String.format("http://127.0.0.1:%d/metrics", 28080 + i));
HttpURLConnection metricsConn = (HttpURLConnection) metricsUrl.openConnection();
metricsConn.setRequestMethod("GET");
try {
metricsConn.connect();
} catch (IOException e) {
continue;
}
assertThat(metricsConn.getResponseCode()).isEqualTo(200);
metricsConn.disconnect();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.specification.api.v1.model.ModelProto;
import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto;
Expand Down Expand Up @@ -573,6 +574,7 @@ protected void doDestroy() {
HttpServer httpServer = entry.getValue();
httpServer.stop(0);
((ExecutorService) httpServer.getExecutor()).shutdownNow();
Utils.sleepUninterrupted(1000);
}
}
}
Expand Down
Loading
Loading