Skip to content

feat:support stat and event report with service discovery. #599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Tencent is pleased to support the open source community by making polaris-java available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.tencent.polaris.client.remote;

import com.tencent.polaris.annonation.JustForTest;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.config.verify.DefaultValues;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.client.util.CommonValidator;
import com.tencent.polaris.logging.LoggerFactory;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.List;

/**
* Repository for service addresses.
*
* @author Haotian Zhang
*/
public class ServiceAddressRepository {

private static final Logger LOG = LoggerFactory.getLogger(ServiceAddressRepository.class);

private final List<Node> nodes;

private int curIndex;

private final String clientId;

private final Extensions extensions;

private final ServiceKey remoteCluster;

private final List<String> routers;

private final String lbPolicy;

private final String protocol;

public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
ServiceKey remoteCluster) {
this(addresses, clientId, extensions, remoteCluster, null, null, null);
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA);
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY);
}

public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
ServiceKey remoteCluster, List<String> routers, String lbPolicy, String protocol) {
// to ip addresses.
this.nodes = new ArrayList<>();
if (CollectionUtils.isNotEmpty(addresses)) {
for (String address : addresses) {
if (StringUtils.isNotBlank(address)) {
int colonIdx = address.lastIndexOf(":");
if (colonIdx > 0 && colonIdx < address.length() - 1) {
String host = address.substring(0, colonIdx);
try {
int port = Integer.parseInt(address.substring(colonIdx + 1));
nodes.add(new Node(host, port));
} catch (NumberFormatException e) {
LOG.warn("Invalid port number in address: {}", address);
}
} else {
LOG.warn("Invalid address format, expected 'host:port': {}", address);
}
}
}
}
this.curIndex = 0;

// from discovery.
this.clientId = clientId;
this.extensions = extensions;
CommonValidator.validateNamespaceService(remoteCluster.getNamespace(), remoteCluster.getService());
this.remoteCluster = remoteCluster;
if (CollectionUtils.isEmpty(routers)) {
this.routers = new ArrayList<>();
} else {
this.routers = routers;
}
if (StringUtils.isBlank(lbPolicy)) {
this.lbPolicy = DefaultValues.DEFAULT_LOADBALANCER;
} else {
this.lbPolicy = lbPolicy;
}
if (StringUtils.isBlank(protocol)) {
this.protocol = "http";
} else {
this.protocol = protocol;
}
}

public String getServiceAddress() throws PolarisException {
Node node = getServiceAddressNode();
return node.getHostPort();
}

public Node getServiceAddressNode() throws PolarisException {
if (CollectionUtils.isNotEmpty(nodes)) {
Node node = nodes.get(Math.abs(curIndex % nodes.size()));
curIndex = (curIndex + 1) % Integer.MAX_VALUE;
if (LOG.isDebugEnabled()) {
LOG.debug("success to get instance, instance is {}:{}", node.getHost(), node.getPort());
}
return node;
}
Instance instance = getDiscoverInstance();
if (LOG.isDebugEnabled()) {
LOG.debug("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
}
return new Node(instance.getHost(), instance.getPort());
}

private Instance getDiscoverInstance() throws PolarisException {
Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, clientId);
LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
return instance;
}

@JustForTest
List<Node> getNodes() {
return nodes;
}

@JustForTest
List<String> getRouters() {
return routers;
}

@JustForTest
String getLbPolicy() {
return lbPolicy;
}

@JustForTest
String getProtocol() {
return protocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Tencent is pleased to support the open source community by making polaris-java available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.tencent.polaris.client.remote;

import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.exception.ErrorCode;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.client.pojo.Node;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;

/**
* Test for {@link ServiceAddressRepository}.
*
* @author Haotian Zhang
*/
@RunWith(MockitoJUnitRunner.class)
public class ServiceAddressRepositoryTest {

private static MockedStatic<BaseFlow> mockedBaseFlow;

@Mock
private Extensions extensions;

@Mock
private static Instance mockInstance;

private final ServiceKey remoteCluster = new ServiceKey("test-namespace", "test-service");
private final String clientId = "test-client";

@BeforeClass
public static void beforeClass() {
mockedBaseFlow = Mockito.mockStatic(BaseFlow.class);
}

@Before
public void setUp() {
when(mockInstance.getHost()).thenReturn("1.2.3.4");
when(mockInstance.getPort()).thenReturn(8080);
}

@AfterClass
public static void AfterClass() {
if (mockedBaseFlow != null) {
mockedBaseFlow.close();
}
}

@Test
public void testConstructorWithDefaultParams() {
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
ServiceAddressRepository repository = new ServiceAddressRepository(
addresses, clientId, extensions, remoteCluster);

assertNotNull(repository);
assertEquals(2, repository.getNodes().size());
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_METADATA, repository.getRouters().get(0));
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY, repository.getRouters().get(1));
assertEquals("http", repository.getProtocol());
}

@Test
public void testConstructorWithCustomParams() {
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
List<String> routers = Arrays.asList("custom-router1", "custom-router2");
String lbPolicy = "custom-lb";
String protocol = "grpc";

ServiceAddressRepository repository = new ServiceAddressRepository(
addresses, clientId, extensions, remoteCluster, routers, lbPolicy, protocol);

assertNotNull(repository);
assertEquals(2, repository.getNodes().size());
assertEquals("custom-router1", repository.getRouters().get(0));
assertEquals("custom-router2", repository.getRouters().get(1));
assertEquals("custom-lb", repository.getLbPolicy());
assertEquals("grpc", repository.getProtocol());
}

@Test
public void testConstructorWithEmptyAddresses() {
ServiceAddressRepository repository = new ServiceAddressRepository(
null, clientId, extensions, remoteCluster);

assertNotNull(repository);
assertTrue(repository.getNodes().isEmpty());
}

@Test
public void testConstructorWithInvalidAddresses() {
List<String> addresses = Arrays.asList("host1", "host2:", ":8080", "host3:invalid", "");
ServiceAddressRepository repository = new ServiceAddressRepository(
addresses, clientId, extensions, remoteCluster);

assertNotNull(repository);
assertTrue(repository.getNodes().isEmpty());
}

@Test
public void testGetServiceAddressNodeWithLocalNodes() throws PolarisException {
List<String> addresses = Arrays.asList("host1:8080", "host2:9090", "host3:7070");
ServiceAddressRepository repository = new ServiceAddressRepository(
addresses, clientId, extensions, remoteCluster);

// First call
Node node1 = repository.getServiceAddressNode();
assertEquals("host1", node1.getHost());
assertEquals(8080, node1.getPort());

// Second call - should round robin
Node node2 = repository.getServiceAddressNode();
assertEquals("host2", node2.getHost());
assertEquals(9090, node2.getPort());

// Third call
Node node3 = repository.getServiceAddressNode();
assertEquals("host3", node3.getHost());
assertEquals(7070, node3.getPort());

// Fourth call - should wrap around
Node node4 = repository.getServiceAddressNode();
assertEquals("host1", node4.getHost());
assertEquals(8080, node4.getPort());
}

@Test
public void testGetServiceAddressNodeWithEmptyNodes() throws PolarisException {
ServiceAddressRepository repository = new ServiceAddressRepository(
Collections.emptyList(), clientId, extensions, remoteCluster);

mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
any(), any(), anyList(), anyString(), anyString(), anyString()))
.thenReturn(mockInstance);

Node node = repository.getServiceAddressNode();
assertEquals("1.2.3.4", node.getHost());
assertEquals(8080, node.getPort());
}

@Test(expected = PolarisException.class)
public void testGetServiceAddressNodeWithDiscoveryFailure() throws PolarisException {
ServiceAddressRepository repository = new ServiceAddressRepository(
Collections.emptyList(), clientId, extensions, remoteCluster);

mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
any(), any(), anyList(), anyString(), anyString(), anyString()))
.thenThrow(new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "Discovery failed"));

repository.getServiceAddressNode();
}

@Test
public void testGetServiceAddress() throws PolarisException {
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
ServiceAddressRepository repository = new ServiceAddressRepository(
addresses, clientId, extensions, remoteCluster);

String address1 = repository.getServiceAddress();
assertTrue(address1.equals("host1:8080") || address1.equals("host2:9090"));

String address2 = repository.getServiceAddress();
assertNotEquals(address1, address2); // Should be different due to round robin
}

@Test
public void testGetServiceAddressWithDiscovery() throws PolarisException {
ServiceAddressRepository repository = new ServiceAddressRepository(
Collections.emptyList(), clientId, extensions, remoteCluster);

mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
any(), any(), anyList(), anyString(), anyString(), anyString()))
.thenReturn(mockInstance);

String address = repository.getServiceAddress();
assertEquals("1.2.3.4:8080", address);
}
}
Loading