Skip to content

Commit 3aa56a3

Browse files
feat:support stat and event report with service discovery. (#599)
1 parent 5a9753c commit 3aa56a3

File tree

13 files changed

+586
-73
lines changed

13 files changed

+586
-73
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making polaris-java available.
3+
*
4+
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.client.remote;
19+
20+
import com.tencent.polaris.annonation.JustForTest;
21+
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
22+
import com.tencent.polaris.api.config.verify.DefaultValues;
23+
import com.tencent.polaris.api.exception.PolarisException;
24+
import com.tencent.polaris.api.plugin.compose.Extensions;
25+
import com.tencent.polaris.api.pojo.Instance;
26+
import com.tencent.polaris.api.pojo.ServiceKey;
27+
import com.tencent.polaris.api.utils.CollectionUtils;
28+
import com.tencent.polaris.api.utils.StringUtils;
29+
import com.tencent.polaris.client.flow.BaseFlow;
30+
import com.tencent.polaris.client.pojo.Node;
31+
import com.tencent.polaris.client.util.CommonValidator;
32+
import com.tencent.polaris.logging.LoggerFactory;
33+
import org.slf4j.Logger;
34+
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
38+
/**
39+
* Repository for service addresses.
40+
*
41+
* @author Haotian Zhang
42+
*/
43+
public class ServiceAddressRepository {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(ServiceAddressRepository.class);
46+
47+
private final List<Node> nodes;
48+
49+
private int curIndex;
50+
51+
private final String clientId;
52+
53+
private final Extensions extensions;
54+
55+
private final ServiceKey remoteCluster;
56+
57+
private final List<String> routers;
58+
59+
private final String lbPolicy;
60+
61+
private final String protocol;
62+
63+
public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
64+
ServiceKey remoteCluster) {
65+
this(addresses, clientId, extensions, remoteCluster, null, null, null);
66+
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA);
67+
this.routers.add(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY);
68+
}
69+
70+
public ServiceAddressRepository(List<String> addresses, String clientId, Extensions extensions,
71+
ServiceKey remoteCluster, List<String> routers, String lbPolicy, String protocol) {
72+
// to ip addresses.
73+
this.nodes = new ArrayList<>();
74+
if (CollectionUtils.isNotEmpty(addresses)) {
75+
for (String address : addresses) {
76+
if (StringUtils.isNotBlank(address)) {
77+
int colonIdx = address.lastIndexOf(":");
78+
if (colonIdx > 0 && colonIdx < address.length() - 1) {
79+
String host = address.substring(0, colonIdx);
80+
try {
81+
int port = Integer.parseInt(address.substring(colonIdx + 1));
82+
nodes.add(new Node(host, port));
83+
} catch (NumberFormatException e) {
84+
LOG.warn("Invalid port number in address: {}", address);
85+
}
86+
} else {
87+
LOG.warn("Invalid address format, expected 'host:port': {}", address);
88+
}
89+
}
90+
}
91+
}
92+
this.curIndex = 0;
93+
94+
// from discovery.
95+
this.clientId = clientId;
96+
this.extensions = extensions;
97+
CommonValidator.validateNamespaceService(remoteCluster.getNamespace(), remoteCluster.getService());
98+
this.remoteCluster = remoteCluster;
99+
if (CollectionUtils.isEmpty(routers)) {
100+
this.routers = new ArrayList<>();
101+
} else {
102+
this.routers = routers;
103+
}
104+
if (StringUtils.isBlank(lbPolicy)) {
105+
this.lbPolicy = DefaultValues.DEFAULT_LOADBALANCER;
106+
} else {
107+
this.lbPolicy = lbPolicy;
108+
}
109+
if (StringUtils.isBlank(protocol)) {
110+
this.protocol = "http";
111+
} else {
112+
this.protocol = protocol;
113+
}
114+
}
115+
116+
public String getServiceAddress() throws PolarisException {
117+
Node node = getServiceAddressNode();
118+
return node.getHostPort();
119+
}
120+
121+
public Node getServiceAddressNode() throws PolarisException {
122+
if (CollectionUtils.isNotEmpty(nodes)) {
123+
Node node = nodes.get(Math.abs(curIndex % nodes.size()));
124+
curIndex = (curIndex + 1) % Integer.MAX_VALUE;
125+
if (LOG.isDebugEnabled()) {
126+
LOG.debug("success to get instance, instance is {}:{}", node.getHost(), node.getPort());
127+
}
128+
return node;
129+
}
130+
Instance instance = getDiscoverInstance();
131+
if (LOG.isDebugEnabled()) {
132+
LOG.debug("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
133+
}
134+
return new Node(instance.getHost(), instance.getPort());
135+
}
136+
137+
private Instance getDiscoverInstance() throws PolarisException {
138+
Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol, clientId);
139+
LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(), instance.getPort());
140+
return instance;
141+
}
142+
143+
@JustForTest
144+
List<Node> getNodes() {
145+
return nodes;
146+
}
147+
148+
@JustForTest
149+
List<String> getRouters() {
150+
return routers;
151+
}
152+
153+
@JustForTest
154+
String getLbPolicy() {
155+
return lbPolicy;
156+
}
157+
158+
@JustForTest
159+
String getProtocol() {
160+
return protocol;
161+
}
162+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making polaris-java available.
3+
*
4+
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.client.remote;
19+
20+
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
21+
import com.tencent.polaris.api.exception.ErrorCode;
22+
import com.tencent.polaris.api.exception.PolarisException;
23+
import com.tencent.polaris.api.plugin.compose.Extensions;
24+
import com.tencent.polaris.api.pojo.Instance;
25+
import com.tencent.polaris.api.pojo.ServiceKey;
26+
import com.tencent.polaris.client.flow.BaseFlow;
27+
import com.tencent.polaris.client.pojo.Node;
28+
import org.junit.AfterClass;
29+
import org.junit.Before;
30+
import org.junit.BeforeClass;
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
import org.mockito.Mock;
34+
import org.mockito.MockedStatic;
35+
import org.mockito.Mockito;
36+
import org.mockito.junit.MockitoJUnitRunner;
37+
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.List;
41+
42+
import static org.junit.Assert.*;
43+
import static org.mockito.ArgumentMatchers.*;
44+
import static org.mockito.Mockito.when;
45+
46+
/**
47+
* Test for {@link ServiceAddressRepository}.
48+
*
49+
* @author Haotian Zhang
50+
*/
51+
@RunWith(MockitoJUnitRunner.class)
52+
public class ServiceAddressRepositoryTest {
53+
54+
private static MockedStatic<BaseFlow> mockedBaseFlow;
55+
56+
@Mock
57+
private Extensions extensions;
58+
59+
@Mock
60+
private static Instance mockInstance;
61+
62+
private final ServiceKey remoteCluster = new ServiceKey("test-namespace", "test-service");
63+
private final String clientId = "test-client";
64+
65+
@BeforeClass
66+
public static void beforeClass() {
67+
mockedBaseFlow = Mockito.mockStatic(BaseFlow.class);
68+
}
69+
70+
@Before
71+
public void setUp() {
72+
when(mockInstance.getHost()).thenReturn("1.2.3.4");
73+
when(mockInstance.getPort()).thenReturn(8080);
74+
}
75+
76+
@AfterClass
77+
public static void AfterClass() {
78+
if (mockedBaseFlow != null) {
79+
mockedBaseFlow.close();
80+
}
81+
}
82+
83+
@Test
84+
public void testConstructorWithDefaultParams() {
85+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
86+
ServiceAddressRepository repository = new ServiceAddressRepository(
87+
addresses, clientId, extensions, remoteCluster);
88+
89+
assertNotNull(repository);
90+
assertEquals(2, repository.getNodes().size());
91+
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_METADATA, repository.getRouters().get(0));
92+
assertEquals(ServiceRouterConfig.DEFAULT_ROUTER_NEARBY, repository.getRouters().get(1));
93+
assertEquals("http", repository.getProtocol());
94+
}
95+
96+
@Test
97+
public void testConstructorWithCustomParams() {
98+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
99+
List<String> routers = Arrays.asList("custom-router1", "custom-router2");
100+
String lbPolicy = "custom-lb";
101+
String protocol = "grpc";
102+
103+
ServiceAddressRepository repository = new ServiceAddressRepository(
104+
addresses, clientId, extensions, remoteCluster, routers, lbPolicy, protocol);
105+
106+
assertNotNull(repository);
107+
assertEquals(2, repository.getNodes().size());
108+
assertEquals("custom-router1", repository.getRouters().get(0));
109+
assertEquals("custom-router2", repository.getRouters().get(1));
110+
assertEquals("custom-lb", repository.getLbPolicy());
111+
assertEquals("grpc", repository.getProtocol());
112+
}
113+
114+
@Test
115+
public void testConstructorWithEmptyAddresses() {
116+
ServiceAddressRepository repository = new ServiceAddressRepository(
117+
null, clientId, extensions, remoteCluster);
118+
119+
assertNotNull(repository);
120+
assertTrue(repository.getNodes().isEmpty());
121+
}
122+
123+
@Test
124+
public void testConstructorWithInvalidAddresses() {
125+
List<String> addresses = Arrays.asList("host1", "host2:", ":8080", "host3:invalid", "");
126+
ServiceAddressRepository repository = new ServiceAddressRepository(
127+
addresses, clientId, extensions, remoteCluster);
128+
129+
assertNotNull(repository);
130+
assertTrue(repository.getNodes().isEmpty());
131+
}
132+
133+
@Test
134+
public void testGetServiceAddressNodeWithLocalNodes() throws PolarisException {
135+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090", "host3:7070");
136+
ServiceAddressRepository repository = new ServiceAddressRepository(
137+
addresses, clientId, extensions, remoteCluster);
138+
139+
// First call
140+
Node node1 = repository.getServiceAddressNode();
141+
assertEquals("host1", node1.getHost());
142+
assertEquals(8080, node1.getPort());
143+
144+
// Second call - should round robin
145+
Node node2 = repository.getServiceAddressNode();
146+
assertEquals("host2", node2.getHost());
147+
assertEquals(9090, node2.getPort());
148+
149+
// Third call
150+
Node node3 = repository.getServiceAddressNode();
151+
assertEquals("host3", node3.getHost());
152+
assertEquals(7070, node3.getPort());
153+
154+
// Fourth call - should wrap around
155+
Node node4 = repository.getServiceAddressNode();
156+
assertEquals("host1", node4.getHost());
157+
assertEquals(8080, node4.getPort());
158+
}
159+
160+
@Test
161+
public void testGetServiceAddressNodeWithEmptyNodes() throws PolarisException {
162+
ServiceAddressRepository repository = new ServiceAddressRepository(
163+
Collections.emptyList(), clientId, extensions, remoteCluster);
164+
165+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
166+
any(), any(), anyList(), anyString(), anyString(), anyString()))
167+
.thenReturn(mockInstance);
168+
169+
Node node = repository.getServiceAddressNode();
170+
assertEquals("1.2.3.4", node.getHost());
171+
assertEquals(8080, node.getPort());
172+
}
173+
174+
@Test(expected = PolarisException.class)
175+
public void testGetServiceAddressNodeWithDiscoveryFailure() throws PolarisException {
176+
ServiceAddressRepository repository = new ServiceAddressRepository(
177+
Collections.emptyList(), clientId, extensions, remoteCluster);
178+
179+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
180+
any(), any(), anyList(), anyString(), anyString(), anyString()))
181+
.thenThrow(new PolarisException(ErrorCode.INSTANCE_NOT_FOUND, "Discovery failed"));
182+
183+
repository.getServiceAddressNode();
184+
}
185+
186+
@Test
187+
public void testGetServiceAddress() throws PolarisException {
188+
List<String> addresses = Arrays.asList("host1:8080", "host2:9090");
189+
ServiceAddressRepository repository = new ServiceAddressRepository(
190+
addresses, clientId, extensions, remoteCluster);
191+
192+
String address1 = repository.getServiceAddress();
193+
assertTrue(address1.equals("host1:8080") || address1.equals("host2:9090"));
194+
195+
String address2 = repository.getServiceAddress();
196+
assertNotEquals(address1, address2); // Should be different due to round robin
197+
}
198+
199+
@Test
200+
public void testGetServiceAddressWithDiscovery() throws PolarisException {
201+
ServiceAddressRepository repository = new ServiceAddressRepository(
202+
Collections.emptyList(), clientId, extensions, remoteCluster);
203+
204+
mockedBaseFlow.when(() -> BaseFlow.commonGetOneInstance(
205+
any(), any(), anyList(), anyString(), anyString(), anyString()))
206+
.thenReturn(mockInstance);
207+
208+
String address = repository.getServiceAddress();
209+
assertEquals("1.2.3.4:8080", address);
210+
}
211+
}

0 commit comments

Comments
 (0)