diff --git a/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java new file mode 100644 index 000000000..02a604dba --- /dev/null +++ b/polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java @@ -0,0 +1,162 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.client.remote; + +import com.tencent.polaris.annonation.JustForTest; +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; +import com.tencent.polaris.api.config.verify.DefaultValues; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.flow.BaseFlow; +import com.tencent.polaris.client.pojo.Node; +import com.tencent.polaris.client.util.CommonValidator; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * Repository for service addresses. + * + * @author Haotian Zhang + */ +public class ServiceAddressRepository { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceAddressRepository.class); + + private final List nodes; + + private int curIndex; + + private final String clientId; + + private final Extensions extensions; + + private final ServiceKey remoteCluster; + + private final List routers; + + private final String lbPolicy; + + private final String protocol; + + public ServiceAddressRepository(List addresses, String clientId, Extensions extensions, + ServiceKey remoteCluster) { + this(addresses, clientId, extensions, remoteCluster, null, null, null); + this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA); + this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY); + } + + public ServiceAddressRepository(List addresses, String clientId, Extensions extensions, + ServiceKey remoteCluster, List routers, String lbPolicy, String protocol) { + // to ip addresses. + this.nodes = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(addresses)) { + for (String address : addresses) { + if (StringUtils.isNotBlank(address)) { + int colonIdx = address.lastIndexOf(":"); + if (colonIdx > 0 && colonIdx < address.length() - 1) { + String host = address.substring(0, colonIdx); + try { + int port = Integer.parseInt(address.substring(colonIdx + 1)); + nodes.add(new Node(host, port)); + } catch (NumberFormatException e) { + LOG.warn("Invalid port number in address: {}", address); + } + } else { + LOG.warn("Invalid address format, expected 'host:port': {}", address); + } + } + } + } + this.curIndex = 0; + + // from discovery. + this.clientId = clientId; + this.extensions = extensions; + CommonValidator.validateNamespaceService(remoteCluster.getNamespace(), remoteCluster.getService()); + this.remoteCluster = remoteCluster; + if (CollectionUtils.isEmpty(routers)) { + this.routers = new ArrayList<>(); + } else { + this.routers = routers; + } + if (StringUtils.isBlank(lbPolicy)) { + this.lbPolicy = DefaultValues.DEFAULT_LOADBALANCER; + } else { + this.lbPolicy = lbPolicy; + } + if (StringUtils.isBlank(protocol)) { + this.protocol = "http"; + } else { + this.protocol = protocol; + } + } + + public String getServiceAddress() throws PolarisException { + Node node = getServiceAddressNode(); + return node.getHostPort(); + } + + public Node getServiceAddressNode() throws PolarisException { + if (CollectionUtils.isNotEmpty(nodes)) { + Node node = nodes.get(Math.abs(curIndex % nodes.size())); + curIndex = (curIndex + 1) % Integer.MAX_VALUE; + if (LOG.isDebugEnabled()) { + LOG.debug("success to get instance, instance is {}:{}", node.getHost(), node.getPort()); + } + return node; + } + Instance instance = getDiscoverInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort()); + } + return new Node(instance.getHost(), instance.getPort()); + } + + private Instance getDiscoverInstance() throws PolarisException { + Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, clientId); + LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort()); + return instance; + } + + @JustForTest + List getNodes() { + return nodes; + } + + @JustForTest + List getRouters() { + return routers; + } + + @JustForTest + String getLbPolicy() { + return lbPolicy; + } + + @JustForTest + String getProtocol() { + return protocol; + } +} diff --git a/polaris-common/polaris-client/src/test/java/com/tencent/polaris/client/remote/ServiceAddressRepositoryTest.java b/polaris-common/polaris-client/src/test/java/com/tencent/polaris/client/remote/ServiceAddressRepositoryTest.java new file mode 100644 index 000000000..c125f0392 --- /dev/null +++ b/polaris-common/polaris-client/src/test/java/com/tencent/polaris/client/remote/ServiceAddressRepositoryTest.java @@ -0,0 +1,211 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.client.remote; + +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.client.flow.BaseFlow; +import com.tencent.polaris.client.pojo.Node; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.when; + +/** + * Test for {@link ServiceAddressRepository}. + * + * @author Haotian Zhang + */ +@RunWith(MockitoJUnitRunner.class) +public class ServiceAddressRepositoryTest { + + private static MockedStatic mockedBaseFlow; + + @Mock + private Extensions extensions; + + @Mock + private static Instance mockInstance; + + private final ServiceKey remoteCluster = new ServiceKey("test-namespace", "test-service"); + private final String clientId = "test-client"; + + @BeforeClass + public static void beforeClass() { + mockedBaseFlow = Mockito.mockStatic(BaseFlow.class); + } + + @Before + public void setUp() { + when(mockInstance.getHost()).thenReturn("1.2.3.4"); + when(mockInstance.getPort()).thenReturn(8080); + } + + @AfterClass + public static void AfterClass() { + if (mockedBaseFlow != null) { + mockedBaseFlow.close(); + } + } + + @Test + public void testConstructorWithDefaultParams() { + List addresses = Arrays.asList("host1:8080", "host2:9090"); + ServiceAddressRepository repository = new ServiceAddressRepository( + addresses, clientId, extensions, remoteCluster); + + assertNotNull(repository); + assertEquals(2, repository.getNodes().size()); + assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_METADATA, repository.getRouters().get(0)); + assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY, repository.getRouters().get(1)); + assertEquals("http", repository.getProtocol()); + } + + @Test + public void testConstructorWithCustomParams() { + List addresses = Arrays.asList("host1:8080", "host2:9090"); + List routers = Arrays.asList("custom-router1", "custom-router2"); + String lbPolicy = "custom-lb"; + String protocol = "grpc"; + + ServiceAddressRepository repository = new ServiceAddressRepository( + addresses, clientId, extensions, remoteCluster, routers, lbPolicy, protocol); + + assertNotNull(repository); + assertEquals(2, repository.getNodes().size()); + assertEquals("custom-router1", repository.getRouters().get(0)); + assertEquals("custom-router2", repository.getRouters().get(1)); + assertEquals("custom-lb", repository.getLbPolicy()); + assertEquals("grpc", repository.getProtocol()); + } + + @Test + public void testConstructorWithEmptyAddresses() { + ServiceAddressRepository repository = new ServiceAddressRepository( + null, clientId, extensions, remoteCluster); + + assertNotNull(repository); + assertTrue(repository.getNodes().isEmpty()); + } + + @Test + public void testConstructorWithInvalidAddresses() { + List addresses = Arrays.asList("host1", "host2:", ":8080", "host3:invalid", ""); + ServiceAddressRepository repository = new ServiceAddressRepository( + addresses, clientId, extensions, remoteCluster); + + assertNotNull(repository); + assertTrue(repository.getNodes().isEmpty()); + } + + @Test + public void testGetServiceAddressNodeWithLocalNodes() throws PolarisException { + List addresses = Arrays.asList("host1:8080", "host2:9090", "host3:7070"); + ServiceAddressRepository repository = new ServiceAddressRepository( + addresses, clientId, extensions, remoteCluster); + + // First call + Node node1 = repository.getServiceAddressNode(); + assertEquals("host1", node1.getHost()); + assertEquals(8080, node1.getPort()); + + // Second call - should round robin + Node node2 = repository.getServiceAddressNode(); + assertEquals("host2", node2.getHost()); + assertEquals(9090, node2.getPort()); + + // Third call + Node node3 = repository.getServiceAddressNode(); + assertEquals("host3", node3.getHost()); + assertEquals(7070, node3.getPort()); + + // Fourth call - should wrap around + Node node4 = repository.getServiceAddressNode(); + assertEquals("host1", node4.getHost()); + assertEquals(8080, node4.getPort()); + } + + @Test + public void testGetServiceAddressNodeWithEmptyNodes() throws PolarisException { + ServiceAddressRepository repository = new ServiceAddressRepository( + Collections.emptyList(), clientId, extensions, remoteCluster); + + mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance( + any(), any(), anyList(), anyString(), anyString(), anyString())) + .thenReturn(mockInstance); + + Node node = repository.getServiceAddressNode(); + assertEquals("1.2.3.4", node.getHost()); + assertEquals(8080, node.getPort()); + } + + @Test(expected = PolarisException.class) + public void testGetServiceAddressNodeWithDiscoveryFailure() throws PolarisException { + ServiceAddressRepository repository = new ServiceAddressRepository( + Collections.emptyList(), clientId, extensions, remoteCluster); + + mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance( + any(), any(), anyList(), anyString(), anyString(), anyString())) + .thenThrow(new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "Discovery failed")); + + repository.getServiceAddressNode(); + } + + @Test + public void testGetServiceAddress() throws PolarisException { + List addresses = Arrays.asList("host1:8080", "host2:9090"); + ServiceAddressRepository repository = new ServiceAddressRepository( + addresses, clientId, extensions, remoteCluster); + + String address1 = repository.getServiceAddress(); + assertTrue(address1.equals("host1:8080") || address1.equals("host2:9090")); + + String address2 = repository.getServiceAddress(); + assertNotEquals(address1, address2); // Should be different due to round robin + } + + @Test + public void testGetServiceAddressWithDiscovery() throws PolarisException { + ServiceAddressRepository repository = new ServiceAddressRepository( + Collections.emptyList(), clientId, extensions, remoteCluster); + + mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance( + any(), any(), anyList(), anyString(), anyString(), anyString())) + .thenReturn(mockInstance); + + String address = repository.getServiceAddress(); + assertEquals("1.2.3.4:8080", address); + } +} diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml index e74eb4d93..4de1fa1a4 100644 --- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml +++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml @@ -97,9 +97,13 @@ global: # 描述:PushGateway 事件上报开关 enable: false # 描述:PushGateway 事件上报队列长度 - eventQueueSize: 10000; + eventQueueSize: 1000; # 描述:PushGateway 事件上报最大批量大小 maxBatchSize: 100 + # 描述:事件服务的命名空间 + namespace: Polaris + # 描述:事件服务的服务名 + service: polaris.pushgateway # 描述:Admin相关的配置 admin: # 描述:Admin的监听的IP @@ -131,6 +135,10 @@ global: # #范围:[1s:...] # #默认值:10s # pushInterval: 10s + # 描述:监控服务的命名空间 + namespace: Polaris + # 描述:监控服务的服务名 + service: polaris.pushgateway location: providers: - type: local diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/Node.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/Node.java index 12d8c9ddd..cf93650bd 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/Node.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/Node.java @@ -17,14 +17,13 @@ package com.tencent.polaris.client.pojo; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Objects; -import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.logging.LoggerFactory; -import org.slf4j.Logger; - public class Node { private static final Logger LOG = LoggerFactory.getLogger(Node.class); @@ -46,6 +45,10 @@ public int getPort() { return port; } + public String getHostPort() { + return host + ":" + port; + } + @Override @SuppressWarnings("checkstyle:all") public boolean equals(Object o) { diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java index 46917cd76..16d1ae5de 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java @@ -175,9 +175,15 @@ public void resetReRegisterCounter() { public void destroy() { try { - getTaskFuture().cancel(false); - getReRegisterFuture().cancel(false); - getReRegisterExecutor().shutdownNow(); + if (getTaskFuture() != null) { + getTaskFuture().cancel(false); + } + if (getReRegisterFuture() != null) { + getReRegisterFuture().cancel(false); + } + if (getReRegisterExecutor() != null) { + getReRegisterExecutor().shutdownNow(); + } } catch (Throwable throwable) { LOG.warn("[RegisterState] destroy error. namespace:{}, service:{}, host:{}, port:{}.", getInstanceRegisterRequest().getNamespace(), getInstanceRegisterRequest().getService(), diff --git a/polaris-plugins/polaris-plugins-connector/pom.xml b/polaris-plugins/polaris-plugins-connector/pom.xml index 61afdd223..b55e8fda9 100644 --- a/polaris-plugins/polaris-plugins-connector/pom.xml +++ b/polaris-plugins/polaris-plugins-connector/pom.xml @@ -39,12 +39,6 @@ polaris-config ${project.version} - - org.mockito - mockito-core - 4.3.1 - test - \ No newline at end of file diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml index 1b3fbf539..5e61f3627 100644 --- a/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml @@ -15,6 +15,11 @@ Polaris Plugins Observability Push Gateway Event Reporter JAR + + com.tencent.polaris + polaris-client + ${project.version} + org.apache.httpcomponents httpclient diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java index e1195ae10..e744acb78 100644 --- a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java @@ -32,9 +32,12 @@ import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.plugin.event.BaseEvent; import com.tencent.polaris.api.plugin.event.EventReporter; +import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.api.utils.ThreadPoolUtils; +import com.tencent.polaris.client.pojo.Node; +import com.tencent.polaris.client.remote.ServiceAddressRepository; import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.logging.LoggerFactory; import org.apache.http.HttpResponse; @@ -50,7 +53,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.*; /** @@ -70,7 +75,9 @@ public class PushGatewayEventReporter implements EventReporter, PluginConfigProv private PushGatewayEventReporterConfig config; - private URI eventUri; + private ServiceAddressRepository serviceAddressRepository; + + private final Map eventUriMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService eventExecutors = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("event-pushgateway")); @@ -82,7 +89,7 @@ public boolean isEnabled() { @Override public boolean reportEvent(BaseEvent flowEvent) { - if (eventUri == null) { + if (serviceAddressRepository == null) { LOG.warn("build event request url fail, can not sent event."); return false; } @@ -144,34 +151,48 @@ public void postContextInit(Extensions ctx) throws PolarisException { eventQueue = new LinkedBlockingQueue<>(config.getEventQueueSize()); - // 分割主机名和端口号 - if (StringUtils.isBlank(config.getAddress())) { - throw new RuntimeException("PushGateway address is empty."); - } - String[] parts = config.getAddress().split(":"); - if (parts.length != 2) { - throw new RuntimeException(String.format("PushGateway address %s format error.", config.getAddress())); - } - String host = parts[0]; - int port = Integer.parseInt(parts[1]); - eventUri = new URIBuilder() - .setHost(host) - .setPort(port) - .setScheme("http") - .setPath("/polaris/client/events") - .build(); - LOG.info("PushGateway event reporter init with uri: {}", eventUri); + serviceAddressRepository = new ServiceAddressRepository(Collections.singletonList(this.config.getAddress()), + ctx.getValueContext().getClientId(), ctx, new ServiceKey(config.getNamespace(), config.getService())); eventExecutors.scheduleWithFixedDelay(new PushGatewayEventTask(), 1000, 1000, TimeUnit.MILLISECONDS); LOG.info("PushGateway event reporter starts reporting task."); - } catch (URISyntaxException e) { - LOG.error("Build event request url fail.", e); + } catch (Throwable e) { + LOG.error("Init PushGateway event reporter fail.", e); } } } } } + private URI getEventUri() { + return getEventUriByNode(serviceAddressRepository.getServiceAddressNode()); + } + + private URI getEventUriByNode(Node node) { + if (node != null) { + // First try to get URI from cache. + URI cachedUri = eventUriMap.get(node); + if (cachedUri != null) { + return cachedUri; + } + + // If not in cache, build new URI. + try { + URI uri = new URIBuilder() + .setHost(node.getHost()) + .setPort(node.getPort()) + .setScheme("http") + .setPath("/polaris/client/events") + .build(); + URI existingUri = eventUriMap.putIfAbsent(node, uri); + return existingUri != null ? existingUri : uri; + } catch (URISyntaxException e) { + LOG.error("Build event request url with node {} fail.", node, e); + } + } + return null; + } + @Override public void destroy() { ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{eventExecutors}); @@ -201,7 +222,7 @@ private void postPushGatewayEvent(PushGatewayEventRequest request) { StringEntity postBody = null; RequestConfig config = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(10000).setSocketTimeout(10000).build(); try (CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build()) { - HttpPost httpPost = new HttpPost(eventUri); + HttpPost httpPost = new HttpPost(getEventUri()); postBody = new StringEntity(mapper.writeValueAsString(request)); if (LOG.isDebugEnabled()) { LOG.debug("postPushGatewayEvent body:{}", postBody); diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java index 94ea9d519..105d3d856 100644 --- a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java @@ -36,10 +36,16 @@ public class PushGatewayEventReporterConfig implements Verifier { private String address; @JsonProperty - private Integer eventQueueSize = 1000; + private Integer eventQueueSize; @JsonProperty - private Integer maxBatchSize = 100; + private Integer maxBatchSize; + + @JsonProperty + private String namespace; + + @JsonProperty + private String service; @Override public void verify() { @@ -48,6 +54,10 @@ public void verify() { return; } ConfigUtils.validateString(address, "global.eventReporter.plugin.pushgateway.address"); + ConfigUtils.validatePositive(eventQueueSize, "global.eventReporter.plugin.pushgateway.eventQueueSize"); + ConfigUtils.validatePositive(maxBatchSize, "global.eventReporter.plugin.pushgateway.maxBatchSize"); + ConfigUtils.validateString(namespace, "global.eventReporter.plugin.pushgateway.namespace"); + ConfigUtils.validateString(service, "global.eventReporter.plugin.pushgateway.service"); } @Override @@ -60,6 +70,18 @@ public void setDefault(Object defaultObject) { if (StringUtils.isBlank(address)) { setAddress(pushGatewayEventReporterConfig.getAddress()); } + if (null == eventQueueSize) { + setEventQueueSize(pushGatewayEventReporterConfig.getEventQueueSize()); + } + if (null == maxBatchSize) { + setMaxBatchSize(pushGatewayEventReporterConfig.getMaxBatchSize()); + } + if (StringUtils.isBlank(namespace)) { + setNamespace(pushGatewayEventReporterConfig.getNamespace()); + } + if (StringUtils.isBlank(service)) { + setService(pushGatewayEventReporterConfig.getService()); + } } } @@ -97,4 +119,32 @@ public Integer getMaxBatchSize() { public void setMaxBatchSize(Integer maxBatchSize) { this.maxBatchSize = maxBatchSize; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + @Override + public String toString() { + return "PushGatewayEventReporterConfig{" + + "enable=" + enable + + ", address='" + address + '\'' + + ", eventQueueSize=" + eventQueueSize + + ", maxBatchSize=" + maxBatchSize + + ", namespace='" + namespace + '\'' + + ", service='" + service + '\'' + + '}'; + } } diff --git a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/handler/PrometheusHandlerConfig.java b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/handler/PrometheusHandlerConfig.java index c0c617b83..e724755be 100644 --- a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/handler/PrometheusHandlerConfig.java +++ b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/handler/PrometheusHandlerConfig.java @@ -37,6 +37,12 @@ public class PrometheusHandlerConfig implements Verifier { @JsonProperty private String address; + @JsonProperty + private String namespace; + + @JsonProperty + private String service; + @JsonProperty @JsonDeserialize(using = TimeStrJsonDeserializer.class) private Long pushInterval = 10000L; @@ -87,6 +93,12 @@ public void setDefault(Object defaultObject) { setOpenGzip(false); } } + if (StringUtils.isBlank(namespace)) { + setNamespace(config.getNamespace()); + } + if (StringUtils.isBlank(service)) { + setService(config.getService()); + } } } @@ -130,12 +142,30 @@ public void setOpenGzip(Boolean openGzip) { this.openGzip = openGzip; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + @Override public String toString() { return "PrometheusHandlerConfig{" + - ", path='" + path + '\'' + + "path='" + path + '\'' + ", type='" + type + '\'' + ", address='" + address + '\'' + + ", namespace='" + namespace + '\'' + + ", service='" + service + '\'' + ", pushInterval=" + pushInterval + ", openGzip=" + openGzip + '}'; diff --git a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporter.java b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporter.java index 4d2bb7f8c..8498354e1 100644 --- a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporter.java +++ b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/main/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporter.java @@ -18,6 +18,7 @@ package com.tencent.polaris.plugins.stat.prometheus.plugin; import com.sun.net.httpserver.HttpHandler; +import com.tencent.polaris.annonation.JustForTest; import com.tencent.polaris.api.config.global.StatReporterConfig; import com.tencent.polaris.api.config.plugin.PluginConfigProvider; import com.tencent.polaris.api.config.verify.Verifier; @@ -29,9 +30,10 @@ import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.plugin.stat.*; import com.tencent.polaris.api.pojo.InstanceGauge; -import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.Node; +import com.tencent.polaris.client.remote.ServiceAddressRepository; import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.stat.common.model.*; @@ -45,7 +47,11 @@ import org.slf4j.Logger; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,8 +86,6 @@ public class PrometheusReporter implements StatReporter, PluginConfigProvider, H private ScheduledExecutorService executorService; - private PushGateway pushGateway; - private Extensions extensions; private boolean enable; @@ -92,6 +96,10 @@ public class PrometheusReporter implements StatReporter, PluginConfigProvider, H private Map handlers; + private ServiceAddressRepository serviceAddressRepository; + + private final Map pushGatewayMap = new ConcurrentHashMap<>(); + public PrometheusReporter() { this.container = new StatInfoCollectorContainer(); this.sampleMapping = new HashMap<>(); @@ -118,6 +126,10 @@ public void postContextInit(Extensions extensions) throws PolarisException { this.enable = extensions.getConfiguration().getGlobal().getStatReporter().isEnable(); this.executorService = Executors.newScheduledThreadPool(4, new NamedThreadFactory(getName())); this.port = extensions.getConfiguration().getGlobal().getAdmin().getPort(); + + this.serviceAddressRepository = new ServiceAddressRepository(Collections.singletonList(this.config.getAddress()), + extensions.getValueContext().getClientId(), extensions, new ServiceKey(config.getNamespace(), config.getService())); + this.initHandle(); } @@ -282,15 +294,7 @@ private void doAggregation() { private void startSchedulePushTask() { - if (StringUtils.isBlank(config.getAddress())) { - List addresses = extensions.getConfiguration().getGlobal().getServerConnector().getAddresses(); - if (CollectionUtils.isNotEmpty(addresses)) { - String address = addresses.get(0); - config.setAddress(address.split(":")[0] + ":" + 9091); - } - } - - if (null != container && null != executorService && null != sampleMapping) { + if (null != serviceAddressRepository && null != container && null != executorService && null != sampleMapping) { this.executorService.scheduleWithFixedDelay(this::doPush, config.getPushInterval(), config.getPushInterval(), @@ -300,6 +304,7 @@ private void startSchedulePushTask() { } private void doPush() { + String address = serviceAddressRepository.getServiceAddress(); try { CommonHandler.putDataFromContainerInOrder(sampleMapping, container.getInsCollector(), container.getInsCollector().getCurrentRevision(), @@ -311,9 +316,12 @@ private void doPush() { container.getRateLimitCollector().getCurrentRevision(), SystemMetricLabelOrder.RATELIMIT_GAUGE_LABEL_ORDER); try { - if (Objects.isNull(pushGateway)) { - LOGGER.info("init push-gateway {} ", config.getAddress()); - pushGateway = new PushGateway(config.getAddress()); + PushGateway pushGateway; + if (pushGatewayMap.containsKey(address)) { + pushGateway = pushGatewayMap.get(address); + } else { + pushGateway = pushGatewayMap.computeIfAbsent(address, k -> new PushGateway(address)); + LOGGER.info("init push-gateway {} ", address); } if (config.isOpenGzip()) { pushGateway.pushAddByGzip(promRegistry, CommonHandler.PUSH_DEFAULT_JOB_NAME, @@ -323,11 +331,11 @@ private void doPush() { Collections.singletonMap(CommonHandler.PUSH_GROUP_KEY, instanceID)); } - LOGGER.info("push result to push-gateway {} success, open gzip {}", config.getAddress(), config.isOpenGzip()); + LOGGER.info("push result to push-gateway {} success, open gzip {}", address, config.isOpenGzip()); } catch (IOException exception) { LOGGER.error("push result to push-gateway {} open gzip {} encountered exception, exception:{}", - config.getAddress(), config.isOpenGzip(), exception.getMessage()); - pushGateway = null; + address, config.isOpenGzip(), exception.getMessage()); + pushGatewayMap.remove(address); return; } @@ -339,7 +347,7 @@ private void doPush() { } } catch (Exception e) { LOGGER.error("push result to push-gateway {} open gzip {} encountered exception, exception:{}", - config.getAddress(), config.isOpenGzip(), e.getMessage()); + address, config.isOpenGzip(), e.getMessage()); } } @@ -359,14 +367,6 @@ public void setPromRegistry(CollectorRegistry promRegistry) { this.promRegistry = promRegistry; } - public PushGateway getPushGateway() { - return pushGateway; - } - - public void setPushGateway(PushGateway pushGateway) { - this.pushGateway = pushGateway; - } - String getSdkIP() { return sdkIP; } @@ -390,4 +390,14 @@ public Map getHandlers() { } return handlers; } + + @JustForTest + void setServiceAddressRepository(ServiceAddressRepository serviceAddressRepository) { + this.serviceAddressRepository = serviceAddressRepository; + } + + @JustForTest + Map getPushGatewayMap() { + return pushGatewayMap; + } } diff --git a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/test/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporterTest.java b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/test/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporterTest.java index d1feabe93..a89379238 100644 --- a/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/test/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporterTest.java +++ b/polaris-plugins/polaris-plugins-observability/stat-prometheus/src/test/java/com/tencent/polaris/plugins/stat/prometheus/plugin/PrometheusReporterTest.java @@ -20,11 +20,9 @@ import com.tencent.polaris.api.plugin.stat.CircuitBreakGauge; import com.tencent.polaris.api.plugin.stat.DefaultCircuitBreakResult; import com.tencent.polaris.api.plugin.stat.StatInfo; -import com.tencent.polaris.api.pojo.CircuitBreakerStatus; -import com.tencent.polaris.api.pojo.InstanceGauge; -import com.tencent.polaris.api.pojo.RetStatus; -import com.tencent.polaris.api.pojo.ServiceInfo; +import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.rpc.ServiceCallResult; +import com.tencent.polaris.client.remote.ServiceAddressRepository; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.stat.common.model.MetricValueAggregationStrategy; import com.tencent.polaris.plugins.stat.common.model.MetricValueAggregationStrategyCollections; @@ -71,7 +69,9 @@ public void setUp() { handler.setEnable(true); handler.setSdkIP("127.0.0.1"); handler.setConfig(config); - handler.setPushGateway(pgw); + handler.setServiceAddressRepository(new ServiceAddressRepository(Collections.singletonList(PUSH_DEFAULT_ADDRESS), + null, null, new ServiceKey("Polaris", "polaris.pushgateway"))); + handler.getPushGatewayMap().put(PUSH_DEFAULT_ADDRESS, pgw); handler.setExecutorService(Executors.newScheduledThreadPool(4)); handler.initHandle(); } diff --git a/pom.xml b/pom.xml index 94868ad29..c2041d433 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ 3.4.4 1.2.17 4.13.1 + 1.14.19 4.9.0 3.16.1 -Xmx2048m @@ -139,9 +140,21 @@ ${junit.version} test + + net.bytebuddy + byte-buddy + ${byte-buddy.version} + test + + + net.bytebuddy + byte-buddy-agent + ${byte-buddy.version} + test + org.mockito - mockito-core + mockito-inline ${mockito.version} test