diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/ServicesByProto.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/ServicesByProto.java index 9f9d6d041..39266f4c9 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/ServicesByProto.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/ServicesByProto.java @@ -25,6 +25,7 @@ import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -36,6 +37,8 @@ public class ServicesByProto implements Services, RegistryCacheValue { private final List services; + private final List originServicesList; + private final boolean initialized; private final boolean loadedFromFile; @@ -46,6 +49,7 @@ public class ServicesByProto implements Services, RegistryCacheValue { public ServicesByProto() { this.services = Collections.emptyList(); + this.originServicesList = Collections.emptyList(); this.initialized = false; this.loadedFromFile = false; this.hashCode = 0; @@ -53,6 +57,7 @@ public ServicesByProto() { public ServicesByProto(List services) { this.services = services; + this.originServicesList = new ArrayList<>(); this.initialized = true; this.loadedFromFile = false; this.hashCode = 0; @@ -62,6 +67,7 @@ public ServicesByProto(ResponseProto.DiscoverResponse response, boolean loadFrom List tmpServices = response.getServicesList(); this.services = new ArrayList<>(); + this.originServicesList = new ArrayList<>(); this.svcKey = new ServiceKey("", ""); if (CollectionUtils.isNotEmpty(tmpServices)) { @@ -74,10 +80,11 @@ public ServicesByProto(ResponseProto.DiscoverResponse response, boolean loadFrom .metadata(service.getMetadataMap()) .revision(service.getRevision().getValue()) .build()); + originServicesList.add(service); }); } - this.hashCode = Objects.hash(response.getServicesList()); + this.hashCode = Objects.hash(tmpServices); this.initialized = true; this.loadedFromFile = loadFromFile; } @@ -115,6 +122,10 @@ public List getServices() { return services; } + public List getOriginServicesList() { + return originServicesList; + } + public int getHashCode() { return hashCode; } @@ -122,11 +133,12 @@ public int getHashCode() { @Override public String toString() { return "ServicesByProto{" + - "svcKey=" + svcKey + - ", services=" + services + + "services=" + services + + ", originServicesList=" + originServicesList + ", initialized=" + initialized + ", loadedFromFile=" + loadedFromFile + ", hashCode=" + hashCode + + ", svcKey=" + svcKey + '}'; } } diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/CommonInstancesRequest.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/CommonInstancesRequest.java index 51d45be59..0d8f1ccdd 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/CommonInstancesRequest.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/CommonInstancesRequest.java @@ -20,22 +20,13 @@ import com.tencent.polaris.api.config.Configuration; import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; import com.tencent.polaris.api.plugin.route.RouteInfo; -import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; -import com.tencent.polaris.api.pojo.ServiceEventKeysProvider; -import com.tencent.polaris.api.pojo.ServiceInfo; -import com.tencent.polaris.api.pojo.ServiceInstances; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.pojo.SourceService; -import com.tencent.polaris.api.rpc.Criteria; -import com.tencent.polaris.api.rpc.GetAllInstancesRequest; -import com.tencent.polaris.api.rpc.GetHealthyInstancesRequest; -import com.tencent.polaris.api.rpc.GetInstancesRequest; -import com.tencent.polaris.api.rpc.GetOneInstanceRequest; -import com.tencent.polaris.api.rpc.RequestBaseEntity; +import com.tencent.polaris.api.rpc.*; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.flow.BaseFlow; import com.tencent.polaris.client.flow.FlowControlParam; + import java.util.HashSet; import java.util.Set; @@ -67,7 +58,7 @@ public class CommonInstancesRequest implements ServiceEventKeysProvider, FlowCon /** * 构造函数 * - * @param request 请求 + * @param request 请求 * @param configuration 配置 */ public CommonInstancesRequest(GetAllInstancesRequest request, Configuration configuration) { @@ -84,7 +75,7 @@ public CommonInstancesRequest(GetAllInstancesRequest request, Configuration conf /** * 构造函数,获取健康的全部实例。只保留:isolatedRouter,recoverRouter * - * @param request 请求 + * @param request 请求 * @param configuration 配置 */ public CommonInstancesRequest(GetHealthyInstancesRequest request, Configuration configuration) { @@ -92,9 +83,7 @@ public CommonInstancesRequest(GetHealthyInstancesRequest request, Configuration dstInstanceEventKey = new ServiceEventKey(dstSvcKey, EventType.INSTANCE); svcEventKeys.add(dstInstanceEventKey); - dstRuleEventKey = new ServiceEventKey(dstSvcKey, EventType.ROUTING); - svcEventKeys.add(dstRuleEventKey); - + dstRuleEventKey = null; srcRuleEventKey = null; ServiceInfo dstServiceInfo = new ServiceInfo(); @@ -127,7 +116,7 @@ public CommonInstancesRequest(GetHealthyInstancesRequest request, Configuration /** * 构造函数 * - * @param request 请求 + * @param request 请求 * @param configuration 配置 */ public CommonInstancesRequest(GetOneInstanceRequest request, Configuration configuration) { @@ -160,7 +149,7 @@ public CommonInstancesRequest(GetOneInstanceRequest request, Configuration confi /** * 构造函数 * - * @param request 请求 + * @param request 请求 * @param configuration 配置 */ public CommonInstancesRequest(GetInstancesRequest request, Configuration configuration) { @@ -193,7 +182,7 @@ public CommonInstancesRequest(GetInstancesRequest request, Configuration configu } public CommonInstancesRequest(ServiceEventKey dstInstanceEventKey, ServiceEventKey dstRuleEventKey, ServiceEventKey srcRuleEventKey, - RouteInfo routeInfo, Criteria criteria, RequestBaseEntity request, Configuration configuration) { + RouteInfo routeInfo, Criteria criteria, RequestBaseEntity request, Configuration configuration) { this.srcRuleEventKey = srcRuleEventKey; if (null != srcRuleEventKey) { svcEventKeys.add(srcRuleEventKey); diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerEvent.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerEvent.java index 95922c612..1f60720f4 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerEvent.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/server/ServerEvent.java @@ -23,6 +23,8 @@ import java.util.Optional; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC; + /** * 服务变更事件 * @@ -44,16 +46,23 @@ public class ServerEvent { private Object value; /** * Polaris的版本号 - * + *

* 如果修改了value中的版本号,那么将原来Polaris的版本号保存在这里, * 主要用于 CompositeServiceUpdateTask。 */ private Optional polarisRevision = Optional.empty(); + private String connectorType; + public ServerEvent(ServiceEventKey serviceEventKey, Object value, PolarisException error) { + this(serviceEventKey, value, error, SERVER_CONNECTOR_GRPC); + } + + public ServerEvent(ServiceEventKey serviceEventKey, Object value, PolarisException error, String connectorType) { this.serviceEventKey = serviceEventKey; this.value = value; this.error = error; + this.connectorType = connectorType; } public ServiceEventKey getServiceEventKey() { @@ -83,4 +92,12 @@ public PolarisException getError() { public void setError(PolarisException error) { this.error = error; } + + public String getConnectorType() { + return connectorType; + } + + public void setConnectorType(String connectorType) { + this.connectorType = connectorType; + } } diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index 67c16422f..613db3655 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -41,10 +41,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -87,6 +84,9 @@ public class ConsulConfigFileConnector implements ConfigFileConnector { @Override public void init(InitContext ctx) throws PolarisException { + if (!Objects.equals(ctx.getConfig().getConfigFile().getServerConnector().getConnectorType(), CONSUL_FILE_CONNECTOR_TYPE)) { + return; + } if (!initialized) { // init consul client ConnectorConfigImpl connectorConfig = ctx.getConfig().getConfigFile().getServerConnector(); @@ -235,7 +235,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi if (!newModifyIndex.equals(currentModifyIndex)) { LOGGER.info("KeyPrefix '{}' has new index {} and new modify index {} with old index {} and old modify index {}", keyPrefix, newIndex, newModifyIndex, currentIndex, currentModifyIndex); - } else if (LOGGER.isDebugEnabled()) { + } else { code = CodeProto.Code.DataNoChange.getNumber(); message = "config data is no change"; LOGGER.debug("KeyPrefix '{}' not modified with new index {}, index {} and modify index {}", @@ -244,7 +244,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi // 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry this.consulIndexes.put(keyPrefix, newIndex); this.consulModifyIndexes.put(keyPrefix, newModifyIndex); - } else if (LOGGER.isDebugEnabled()) { + } else { code = CodeProto.Code.DataNoChange.getNumber(); message = "config data is no change"; LOGGER.debug("KeyPrefix '{}' unchanged with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); diff --git a/polaris-plugins/polaris-plugins-connector/connector-common/pom.xml b/polaris-plugins/polaris-plugins-connector/connector-common/pom.xml index af34aac7b..8dfa9b4ad 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-common/pom.xml +++ b/polaris-plugins/polaris-plugins-connector/connector-common/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> polaris-plugins-connector com.tencent.polaris @@ -17,6 +17,11 @@ Polaris Plugins Client Connector Common JAR + + com.google.guava + guava + ${guava.version} + org.slf4j slf4j-api diff --git a/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/ServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/ServiceUpdateTask.java index ecddb092e..0d6dd63aa 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/ServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/ServiceUpdateTask.java @@ -25,10 +25,11 @@ import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type; +import org.slf4j.Logger; + import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; /** * Abstract service update task class. @@ -86,6 +87,10 @@ public boolean setStatus(Status last, Status current) { return taskStatus.compareAndSet(last, current); } + public void setLastUpdateTime(long currentTime) { + lastUpdateTime.set(currentTime); + } + public Type getTaskType() { return taskType.get(); } @@ -105,10 +110,17 @@ public void run() { /** * Business to be executed. - * - * @throws Throwable */ - protected abstract void execute() throws Throwable; + public void execute() { + + } + + /** + * Business with serviceUpdateTask to be executed. + */ + public void execute(ServiceUpdateTask serviceUpdateTask) { + + } /** * Throwable to be handled. @@ -149,13 +161,18 @@ public boolean needUpdate() { } @Override - @SuppressWarnings("checkstyle:all") public String toString() { return "ServiceUpdateTask{" + - "taskType=" + taskType.get() + + "serviceEventHandler=" + serviceEventHandler + + ", serverConnector=" + serverConnector + + ", targetClusterType=" + targetClusterType.get() + + ", taskType=" + taskType.get() + ", taskStatus=" + taskStatus.get() + ", serviceEventKey=" + serviceEventKey + - ", targetClusterType=" + targetClusterType.get() + + ", lastUpdateTime=" + lastUpdateTime + + ", successUpdates=" + successUpdates + + ", refreshIntervalMs=" + refreshIntervalMs + + ", eventHandler=" + eventHandler + '}'; } diff --git a/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConnectorConstant.java b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConnectorConstant.java new file mode 100644 index 000000000..1bb738b38 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConnectorConstant.java @@ -0,0 +1,34 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.common.constant; + +import com.google.common.collect.Lists; + +import java.util.List; + +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.*; + +/** + * @author Haotian Zhang + */ +public interface ConnectorConstant { + + String SERVER_CONNECTOR_TYPE = "SERVER_CONNECTOR_TYPE"; + + List ORDER_LIST = Lists.newArrayList(SERVER_CONNECTOR_GRPC, SERVER_CONNECTOR_CONSUL, SERVER_CONNECTOR_NACOS); +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConsulConstant.java b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConsulConstant.java index ea22d3914..c5754224a 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConsulConstant.java +++ b/polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/constant/ConsulConstant.java @@ -33,7 +33,7 @@ interface MetadataMapKey { String IP_ADDRESS_KEY = "IP_ADDRESS_KEY"; String PREFER_IP_ADDRESS_KEY = "PREFER_IP_ADDRESS_KEY"; - + String TAGS_KEY = "TAGS_KEY"; String CHECK_KEY = "CHECK_KEY"; @@ -41,6 +41,10 @@ interface MetadataMapKey { String QUERY_TAG_KEY = "QUERY_TAG_KEY"; String QUERY_PASSING_KEY = "QUERY_PASSING_KEY"; + + String WAIT_TIME_KEY = "waitTime"; + + String CONSUL_ERROR_SLEEP_KEY = "consulErrorSleep"; } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/pom.xml b/polaris-plugins/polaris-plugins-connector/connector-composite/pom.xml index 35b08bf71..1ccd58cba 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/pom.xml +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> polaris-plugins-connector com.tencent.polaris @@ -22,6 +22,11 @@ connector-polaris-grpc ${project.version} + + com.tencent.polaris + connector-consul + ${project.version} + com.tencent.polaris connector-common diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java index a247162aa..1be73c060 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeConnector.java @@ -25,15 +25,7 @@ import com.tencent.polaris.api.plugin.common.InitContext; import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.server.CommonProviderRequest; -import com.tencent.polaris.api.plugin.server.CommonProviderResponse; -import com.tencent.polaris.api.plugin.server.CommonServiceContractRequest; -import com.tencent.polaris.api.plugin.server.ReportClientRequest; -import com.tencent.polaris.api.plugin.server.ReportClientResponse; -import com.tencent.polaris.api.plugin.server.ReportServiceContractRequest; -import com.tencent.polaris.api.plugin.server.ReportServiceContractResponse; -import com.tencent.polaris.api.plugin.server.ServerConnector; -import com.tencent.polaris.api.plugin.server.ServiceEventHandler; +import com.tencent.polaris.api.plugin.server.*; import com.tencent.polaris.api.pojo.ServiceEventKey; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.client.pojo.ServiceRuleByProto; @@ -42,6 +34,7 @@ import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector; import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type; import com.tencent.polaris.plugins.connector.composite.zero.TestConnectivityTask; import com.tencent.polaris.plugins.connector.composite.zero.TestConnectivityTaskManager; @@ -269,14 +262,16 @@ public boolean isInitialized() { @Override protected void submitServiceHandler(ServiceUpdateTask updateTask, long delayMs) { - LOG.debug("[ServerConnector]task for service {} has been scheduled discover", updateTask); - sendDiscoverExecutor.schedule(updateTask, delayMs, TimeUnit.MILLISECONDS); + LOG.debug("[CompositeServerConnector]task for service {} has been scheduled discover", updateTask); + if (updateTask.setStatus(ServiceUpdateTaskConstant.Status.READY, ServiceUpdateTaskConstant.Status.RUNNING)) { + sendDiscoverExecutor.schedule(updateTask, delayMs, TimeUnit.MILLISECONDS); + } } protected boolean submitTestConnectivityTask(ServiceUpdateTask updateTask, ResponseProto.DiscoverResponse discoverResponse) { if (updateTask instanceof CompositeServiceUpdateTask && isZeroProtectionEnabled() && isNeedTestConnectivity()) { - LOG.debug("[ServerConnector]task for service {} has been scheduled test connectivity.", + LOG.debug("[CompositeServerConnector]task for service {} has been scheduled test connectivity.", updateTask.getServiceEventKey()); return testConnectivityTaskManager.submitTask(new TestConnectivityTask((CompositeServiceUpdateTask) updateTask, discoverResponse, zeroProtectionConfig)); diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java index ebd926e02..0371bd257 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeRevision.java @@ -17,13 +17,9 @@ package com.tencent.polaris.plugins.connector.composite; -import com.google.common.collect.Lists; import com.tencent.polaris.api.utils.StringUtils; -import java.util.List; -import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; -import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC; -import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_NACOS; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.ORDER_LIST; /** * Revision handler for multi-discovery server. @@ -32,8 +28,6 @@ */ public class CompositeRevision { - private static final List ORDER_LIST = Lists.newArrayList(SERVER_CONNECTOR_GRPC, SERVER_CONNECTOR_CONSUL, SERVER_CONNECTOR_NACOS); - private static final String BIG_SEPARATOR = ";"; private static final String LIL_SEPARATOR = ":"; @@ -43,7 +37,7 @@ public class CompositeRevision { /** * Set revision of corresponding server connector by name. * - * @param name name of server connector + * @param name name of server connector * @param revision revision */ public void setRevision(String name, String revision) { @@ -52,6 +46,13 @@ public void setRevision(String name, String revision) { } } + public String getRevision(String name) { + if (ORDER_LIST.contains(name)) { + return content[ORDER_LIST.indexOf(name)]; + } + return ""; + } + /** * Generate composite revision string. * @@ -69,4 +70,18 @@ public String getCompositeRevisionString() { } return revision.toString(); } + + public static CompositeRevision of(String revision) { + CompositeRevision compositeRevision = new CompositeRevision(); + String[] bigs = revision.split(BIG_SEPARATOR); + for (String big : bigs) { + if (StringUtils.isNotBlank(big)) { + String[] lils = big.split(LIL_SEPARATOR); + if (lils.length == 2) { + compositeRevision.content[ORDER_LIST.indexOf(lils[0])] = lils[1]; + } + } + } + return compositeRevision; + } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java index fca24928a..2213a7860 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java @@ -32,12 +32,15 @@ import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.ServiceInstancesByProto; +import com.tencent.polaris.client.pojo.ServicesByProto; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector; import com.tencent.polaris.plugins.connector.common.ServiceInstancesResponse; import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status; import com.tencent.polaris.plugins.connector.composite.zero.InstanceListMeta; +import com.tencent.polaris.plugins.connector.consul.ConsulServiceUpdateTask; import com.tencent.polaris.plugins.connector.grpc.GrpcServiceUpdateTask; import com.tencent.polaris.specification.api.v1.model.ModelProto; import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto.DiscoverResponse; @@ -45,10 +48,13 @@ import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto.Service; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.ORDER_LIST; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.SERVER_CONNECTOR_TYPE; /** * Scheduled task for updating service information. @@ -62,20 +68,49 @@ public class CompositeServiceUpdateTask extends ServiceUpdateTask { private final InstanceListMeta instanceListMeta = new InstanceListMeta(); + private boolean isAsync = false; + + private String mainConnectorType = SERVER_CONNECTOR_GRPC; + + private boolean ifMainConnectorTypeSet = false; + + private final Map subServiceUpdateTaskMap = new ConcurrentHashMap<>(); + public CompositeServiceUpdateTask(ServiceEventHandler handler, DestroyableServerConnector connector) { super(handler, connector); + CompositeConnector compositeConnector = (CompositeConnector) connector; + for (DestroyableServerConnector sc : compositeConnector.getServerConnectors()) { + if (SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { + subServiceUpdateTaskMap.put(SERVER_CONNECTOR_GRPC, new GrpcServiceUpdateTask(serviceEventHandler, sc)); + isAsync = true; + mainConnectorType = SERVER_CONNECTOR_GRPC; + ifMainConnectorTypeSet = true; + } + if (SERVER_CONNECTOR_CONSUL.equals(sc.getName()) && sc.isDiscoveryEnable()) { + subServiceUpdateTaskMap.put(SERVER_CONNECTOR_CONSUL, new ConsulServiceUpdateTask(serviceEventHandler, sc)); + isAsync = true; + if (!ifMainConnectorTypeSet) { + mainConnectorType = sc.getName(); + ifMainConnectorTypeSet = true; + } + } + } } @Override - protected void execute() { - CompositeConnector connector = (CompositeConnector) serverConnector; - for (DestroyableServerConnector sc : connector.getServerConnectors()) { - if (SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { - GrpcServiceUpdateTask grpcServiceUpdateTask = new GrpcServiceUpdateTask(serviceEventHandler, sc); - grpcServiceUpdateTask.execute(this); - return; + public void execute() { + for (ServiceUpdateTask serviceUpdateTask : subServiceUpdateTaskMap.values()) { + if ((serviceUpdateTask.getTaskType() == ServiceUpdateTaskConstant.Type.FIRST && serviceUpdateTask.getTaskStatus() == Status.READY) + || serviceUpdateTask.needUpdate()) { + serviceUpdateTask.setStatus(ServiceUpdateTaskConstant.Status.READY, ServiceUpdateTaskConstant.Status.RUNNING); + serviceUpdateTask.execute(this); } } + if (isAsync && ifMainConnectorTypeSet + && (StringUtils.equals(mainConnectorType, SERVER_CONNECTOR_GRPC) + || (serviceEventKey.getEventType().equals(EventType.INSTANCE) || serviceEventKey.getEventType().equals(EventType.SERVICE)))) { + return; + } boolean svcDeleted = this.notifyServerEvent( new ServerEvent(serviceEventKey, DiscoverResponse.newBuilder().build(), null)); if (!svcDeleted) { @@ -91,7 +126,15 @@ protected void handle(Throwable throwable) { @Override public boolean notifyServerEvent(ServerEvent serverEvent) { taskStatus.compareAndSet(Status.RUNNING, Status.READY); - lastUpdateTime.set(System.currentTimeMillis()); + LOG.debug("[CompositeServerConnector]task for service {} has been notified", this); + long currentTimeStamp = System.currentTimeMillis(); + lastUpdateTime.set(currentTimeStamp); + String serverEventConnectorType = serverEvent.getConnectorType(); + ServiceUpdateTask subTask = subServiceUpdateTaskMap.get(serverEventConnectorType); + if (subTask != null) { + subTask.setStatus(Status.RUNNING, Status.READY); + subTask.setLastUpdateTime(currentTimeStamp); + } boolean shouldTest = false; try { if (serverEvent.getValue() instanceof DiscoverResponse) { @@ -101,129 +144,141 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { CompositeConnector connector = (CompositeConnector) serverConnector; if (EventType.INSTANCE.equals(serviceEventKey.getEventType())) { - // Get instance information list except polaris. - List extendInstanceList = new ArrayList<>(); + // load current instance map split by connector type. CompositeRevision compositeRevision = new CompositeRevision(); - compositeRevision.setRevision(SERVER_CONNECTOR_GRPC, - discoverResponse.getService().getRevision().getValue()); + Object value = getEventHandler().getValue(); + Map> instancesMap = new HashMap<>(); + if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) { + ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; + compositeRevision = CompositeRevision.of(cacheValue.getRevision()); + List oldInstancesList = cacheValue.getOriginInstancesList(); + for (Instance oldInstance : oldInstancesList) { + String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); + if (!instancesMap.containsKey(serverConnectorType)) { + instancesMap.put(serverConnectorType, new ArrayList<>()); + } + instancesMap.get(serverConnectorType).add(oldInstance); + } + } + + // 按照事件来源更新对应的revision + compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); + // 按照事件来源更新对应的列表 + List serverEventInstancesList = instancesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); + serverEventInstancesList.clear(); + serverEventInstancesList.addAll(discoverResponse.getInstancesList()); + // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 + if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { + serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); + } + + // Get instance information list except polaris. for (DestroyableServerConnector sc : connector.getServerConnectors()) { if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { ServiceInstancesResponse serviceInstancesResponse = sc.syncGetServiceInstances(this); if (serviceInstancesResponse != null) { compositeRevision.setRevision(sc.getName(), serviceInstancesResponse.getRevision()); - extendInstanceList.addAll(serviceInstancesResponse.getServiceInstanceList()); + List tempServiceInstanceList = serviceInstancesResponse.getServiceInstanceList(); + if (CollectionUtils.isNotEmpty(tempServiceInstanceList)) { + // 将 NO_CHANGE 响应转为 SUCCESS 响应,用于多个发现结果的合并 + newDiscoverResponseBuilder + .setCode(UInt32Value.newBuilder().setValue(ServerCodes.EXECUTE_SUCCESS).build()); + } + List tempServerEventInstancesList = instancesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); + tempServerEventInstancesList.clear(); + for (DefaultInstance e : tempServiceInstanceList) { + Instance.Builder instanceBuilder = Instance.newBuilder() + .setNamespace(StringValue.of(serviceEventKey.getNamespace())) + .setService(StringValue.of(e.getService())) + .setHost(StringValue.of(e.getHost())) + .setPort(UInt32Value.of(e.getPort())) + .setHealthy(BoolValue.of(e.isHealthy())) + .setIsolate(BoolValue.of(e.isIsolated())); + // set Id + if (StringUtils.isNotBlank(e.getId())) { + instanceBuilder.setId(StringValue.of(e.getId())); + } else { + String id = + e.getService() + "-" + e.getHost().replace(".", "-") + "-" + e.getPort(); + instanceBuilder.setId(StringValue.of(id)); + LOG.info("Instance with name {} host {} port {} doesn't have id.", e.getService() + , e.getHost(), e.getPort()); + } + // set location + ModelProto.Location.Builder locationBuilder = ModelProto.Location.newBuilder(); + if (StringUtils.isNotBlank(e.getRegion())) { + locationBuilder.setRegion(StringValue.of(e.getRegion())); + } + if (StringUtils.isNotBlank(e.getZone())) { + locationBuilder.setZone(StringValue.of(e.getZone())); + } + if (StringUtils.isNotBlank(e.getCampus())) { + locationBuilder.setCampus(StringValue.of(e.getCampus())); + } + instanceBuilder.setLocation(locationBuilder.build()); + // set metadata + if (CollectionUtils.isNotEmpty(e.getMetadata())) { + instanceBuilder.putAllMetadata(e.getMetadata()); + } + // set Protocol + if (StringUtils.isNotBlank(e.getProtocol())) { + instanceBuilder.setProtocol(StringValue.of(e.getProtocol())); + } + // set Version + if (StringUtils.isNotBlank(e.getVersion())) { + instanceBuilder.setVersion(StringValue.of(e.getVersion())); + } + tempServerEventInstancesList.add(instanceBuilder.build()); + } } } } // Merge instance information list if needed. - if (CollectionUtils.isNotEmpty(extendInstanceList)) { - // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 - serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); - if (discoverResponse.getCode().getValue() == ServerCodes.DATA_NO_CHANGE) { - // 将 NO_CHANGE 响应转为 SUCCESS 响应,用于多个发现结果的合并 - newDiscoverResponseBuilder - .setCode(UInt32Value.newBuilder().setValue(ServerCodes.EXECUTE_SUCCESS).build()); - Object value = getEventHandler().getValue(); - if (value instanceof ServiceInstancesByProto) { - // Add local cache when DATA_NO_CHANGE - ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; - newDiscoverResponseBuilder.clearInstances(); - newDiscoverResponseBuilder.addAllInstances(cacheValue.getOriginInstancesList()); - } - } - List polarisInstanceList = newDiscoverResponseBuilder.getInstancesList(); - List finalInstanceBuilderList = new ArrayList<>(); - for (Instance i : polarisInstanceList) { - finalInstanceBuilderList.add(Instance.newBuilder().mergeFrom(i)); - } - for (DefaultInstance e : extendInstanceList) { - boolean needAdd = true; - // 看看北极星的实例列表是否存在 - for (Instance.Builder f : finalInstanceBuilderList) { - if (StringUtils.equals(e.getHost(), f.getHost().getValue()) && e.getPort() == f.getPort() - .getValue()) { - // 北极星服务实例状态和拓展服务实例状态,有一个可用即可用 - f.setHealthy(BoolValue.of(e.isHealthy() || f.getHealthy().getValue())); - f.setIsolate(BoolValue.of(e.isIsolated() && f.getIsolate().getValue())); - needAdd = false; - break; + newDiscoverResponseBuilder.clearInstances(); + List finalInstanceList = new ArrayList<>(instancesMap.get(serverEventConnectorType)); + for (String type : ORDER_LIST) { + if (!StringUtils.equals(type, serverEventConnectorType)) { + List instances = instancesMap.get(type); + if (CollectionUtils.isNotEmpty(instances)) { + for (Instance newInstance : instances) { + boolean needAdd = true; + for (Instance existInstance : finalInstanceList) { + if (StringUtils.equals(newInstance.getHost().getValue(), existInstance.getHost().getValue()) && + Objects.equals(newInstance.getPort().getValue(), existInstance.getPort().getValue())) { + needAdd = false; + break; + } + } + if (needAdd) { + finalInstanceList.add(newInstance); + } } } - if (needAdd) { - Instance.Builder instanceBuilder = Instance.newBuilder() - .setNamespace(StringValue.of(serviceEventKey.getNamespace())) - .setService(StringValue.of(e.getService())) - .setHost(StringValue.of(e.getHost())) - .setPort(UInt32Value.of(e.getPort())) - .setHealthy(BoolValue.of(e.isHealthy())) - .setIsolate(BoolValue.of(e.isIsolated())); - // set Id - if (StringUtils.isNotBlank(e.getId())) { - instanceBuilder.setId(StringValue.of(e.getId())); - } else { - String id = - e.getService() + "-" + e.getHost().replace(".", "-") + "-" + e.getPort(); - instanceBuilder.setId(StringValue.of(id)); - LOG.info("Instance with name {} host {} port {} doesn't have id.", e.getService() - , e.getHost(), e.getPort()); - } - // set location - ModelProto.Location.Builder locationBuilder = ModelProto.Location.newBuilder(); - if (StringUtils.isNotBlank(e.getRegion())) { - locationBuilder.setRegion(StringValue.of(e.getRegion())); - } - if (StringUtils.isNotBlank(e.getZone())) { - locationBuilder.setZone(StringValue.of(e.getZone())); - } - if (StringUtils.isNotBlank(e.getCampus())) { - locationBuilder.setCampus(StringValue.of(e.getCampus())); - } - instanceBuilder.setLocation(locationBuilder.build()); - // set metadata - if (CollectionUtils.isNotEmpty(e.getMetadata())) { - instanceBuilder.putAllMetadata(e.getMetadata()); - } - // set Protocol - if (StringUtils.isNotBlank(e.getProtocol())) { - instanceBuilder.setProtocol(StringValue.of(e.getProtocol())); - } - // set Version - if (StringUtils.isNotBlank(e.getVersion())) { - instanceBuilder.setVersion(StringValue.of(e.getVersion())); - } - finalInstanceBuilderList.add(instanceBuilder); - } } - List finalInstanceList = new ArrayList<>(); - for (Instance.Builder i : finalInstanceBuilderList) { - finalInstanceList.add(i.build()); + } + Service.Builder newServiceBuilder = Service.newBuilder() + .mergeFrom(newDiscoverResponseBuilder.getService()); + if (newDiscoverResponseBuilder.getService() != null) { + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getNamespace().getValue())) { + newServiceBuilder.setNamespace(StringValue.of(serviceEventKey.getNamespace())); } - Service.Builder newServiceBuilder = Service.newBuilder() - .mergeFrom(newDiscoverResponseBuilder.getService()); - if (newDiscoverResponseBuilder.getService() != null) { - if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getNamespace().getValue())) { - newServiceBuilder.setNamespace(StringValue.of(serviceEventKey.getNamespace())); - } - if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getName().getValue())) { - newServiceBuilder.setName(StringValue.of(serviceEventKey.getService())); - } + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getName().getValue())) { + newServiceBuilder.setName(StringValue.of(serviceEventKey.getService())); } - newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString())); - newDiscoverResponseBuilder.setService(newServiceBuilder.build()); - newDiscoverResponseBuilder.clearInstances(); - newDiscoverResponseBuilder.addAllInstances(finalInstanceList); } + newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString())); + newDiscoverResponseBuilder.setService(newServiceBuilder.build()); + newDiscoverResponseBuilder.addAllInstances(finalInstanceList); // zero instance protect. if (!newDiscoverResponseBuilder.getInstancesList().isEmpty()) { serverEvent.setError(null); } else if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE && connector.isZeroProtectionEnabled()) { - Object value = getEventHandler().getValue(); if (value instanceof ServiceInstancesByProto) { ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.DATA_NO_CHANGE)); - Service.Builder newServiceBuilder = Service.newBuilder() + newServiceBuilder = Service.newBuilder() .mergeFrom(newDiscoverResponseBuilder.getService()); newServiceBuilder.setRevision(StringValue.of(cacheValue.getRevision())); newDiscoverResponseBuilder.setService(newServiceBuilder.build()); @@ -237,34 +292,74 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { } instanceListMeta.setLastRevision(newDiscoverResponseBuilder.getService().getRevision().getValue()); } else if (EventType.SERVICE.equals(serviceEventKey.getEventType())) { + // load current instance map split by connector type. + CompositeRevision compositeRevision = new CompositeRevision(); + Object value = getEventHandler().getValue(); + Map> servicesMap = new HashMap<>(); + if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) { + ServicesByProto cacheValue = (ServicesByProto) value; + compositeRevision = CompositeRevision.of(cacheValue.getRevision()); + List oldInstancesList = cacheValue.getOriginServicesList(); + for (Service oldService : oldInstancesList) { + String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); + if (!servicesMap.containsKey(serverConnectorType)) { + servicesMap.put(serverConnectorType, new ArrayList<>()); + } + servicesMap.get(serverConnectorType).add(oldService); + } + } + + // 按照事件来源更新对应的revision + compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); + // 按照事件来源更新对应的列表 + List serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); + serverEventServicesList.clear(); + serverEventServicesList.addAll(discoverResponse.getServicesList()); + // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 + if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { + serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); + } + // Get service information list except polaris. - List extendServiceList = new ArrayList<>(); for (DestroyableServerConnector sc : connector.getServerConnectors()) { if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { Services services = sc.syncGetServices(this); - if (extendServiceList.isEmpty()) { - extendServiceList.addAll(services.getServices()); - } else { - // TODO 多数据源合并去重 + if (services != null) { + compositeRevision.setRevision(sc.getName(), services.getRevision()); + List tempServiceList = services.getServices(); + List tempServerEventServicesList = servicesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); + tempServerEventServicesList.clear(); + for (ServiceInfo serviceInfo : tempServiceList) { + Service service = Service.newBuilder() + .setNamespace(StringValue.of(serviceEventKey.getNamespace())) + .setName(StringValue.of(serviceInfo.getService())) + .build(); + tempServerEventServicesList.add(service); + } } } } - // Merge service information list - List polarisServiceList = discoverResponse.getServicesList(); - for (ServiceInfo i : extendServiceList) { - boolean needAdd = true; - for (Service j : polarisServiceList) { - if (i.getService().equals(j.getName().getValue())) { - needAdd = false; - break; + // Merge service information list if needed. + newDiscoverResponseBuilder.clearServices(); + List finalServiceList = new ArrayList<>(servicesMap.get(serverEventConnectorType)); + for (String type : ORDER_LIST) { + if (!StringUtils.equals(type, serverEventConnectorType)) { + List services = servicesMap.get(type); + if (CollectionUtils.isNotEmpty(services)) { + for (Service newService : services) { + boolean needAdd = true; + for (Service existService : finalServiceList) { + if (StringUtils.equals(newService.getName().getValue(), existService.getName().getValue())) { + needAdd = false; + break; + } + } + if (needAdd) { + finalServiceList.add(newService); + } + } } - } - if (needAdd) { - Service service = Service.newBuilder() - .setNamespace(StringValue.of(serviceEventKey.getNamespace())) - .setName(StringValue.of(i.getService())) - .build(); - newDiscoverResponseBuilder.addServices(service); + newDiscoverResponseBuilder.addAllServices(finalServiceList); } } if (!newDiscoverResponseBuilder.getServicesList().isEmpty()) { @@ -287,7 +382,11 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { if (null == serverEvent.getError()) { successUpdates.addAndGet(1); } - return getEventHandler().onEventUpdate(serverEvent); + boolean svcDeleted = getEventHandler().onEventUpdate(serverEvent); + if (!svcDeleted && subTask != null) { + subTask.setType(ServiceUpdateTaskConstant.Type.FIRST, ServiceUpdateTaskConstant.Type.LONG_RUNNING); + } + return svcDeleted; } public boolean notifyServerEventWithRevisionChecking(ServerEvent serverEvent, String revision) { diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java index fe48a8203..e4939ef4c 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java @@ -24,32 +24,30 @@ import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.ConsulRawClient; import com.ecwid.consul.v1.OperationException; -import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.agent.model.NewService; import com.ecwid.consul.v1.agent.model.NewService.Check; -import com.ecwid.consul.v1.health.model.HealthService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.reflect.TypeToken; import com.tencent.polaris.api.config.global.ServerConnectorConfig; import com.tencent.polaris.api.exception.ErrorCode; import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.exception.RetriableException; -import com.tencent.polaris.api.exception.ServerErrorResponseException; import com.tencent.polaris.api.plugin.PluginType; import com.tencent.polaris.api.plugin.common.InitContext; import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.plugin.server.*; -import com.tencent.polaris.api.pojo.*; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; -import com.tencent.polaris.client.pojo.ServicesByProto; import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector; -import com.tencent.polaris.plugins.connector.common.ServiceInstancesResponse; import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.consul.service.ConsulService; +import com.tencent.polaris.plugins.connector.consul.service.InstanceService; +import com.tencent.polaris.plugins.connector.consul.service.ServiceService; import org.slf4j.Logger; import java.util.*; @@ -57,8 +55,6 @@ import static com.ecwid.consul.json.GsonFactory.getGson; import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; import static com.tencent.polaris.plugins.connector.common.constant.ConsulConstant.MetadataMapKey.*; -import static com.tencent.polaris.plugins.connector.consul.ConsulServerUtils.findHost; -import static com.tencent.polaris.plugins.connector.consul.ConsulServerUtils.getMetadata; /** * An implement of {@link ServerConnector} to connect to Consul Server.It provides methods to manage resources @@ -80,9 +76,6 @@ public class ConsulAPIConnector extends DestroyableServerConnector { */ private boolean initialized = false; - /** - * - */ private boolean ieRegistered = false; private String id; @@ -99,6 +92,8 @@ public class ConsulAPIConnector extends DestroyableServerConnector { private List lastServices = new ArrayList<>(); + private final Map consulServiceMap = new HashMap<>(); + @Override public String getName() { return SERVER_CONNECTOR_CONSUL; @@ -129,6 +124,14 @@ public PluginType getType() { return PluginTypes.SERVER_CONNECTOR.getBaseType(); } + public Map getConsulServiceMap() { + return consulServiceMap; + } + + public ConsulService getConsulService(ServiceEventKey.EventType eventType) { + return consulServiceMap.get(eventType); + } + @Override public void init(InitContext ctx) throws PolarisException { if (!initialized) { @@ -196,9 +199,38 @@ private void initActually(InitContext ctx, ServerConnectorConfig connectorConfig if (metadata.containsKey(QUERY_PASSING_KEY) && StringUtils.isNotBlank(metadata.get(QUERY_PASSING_KEY))) { consulContext.setQueryPassing(Boolean.valueOf(metadata.get(QUERY_PASSING_KEY))); } + if (metadata.containsKey(WAIT_TIME_KEY) && StringUtils.isNotBlank(metadata.get(WAIT_TIME_KEY))) { + String waitTimeStr = metadata.get(WAIT_TIME_KEY); + try { + int waitTime = Integer.parseInt(waitTimeStr); + consulContext.setWaitTime(waitTime); + } catch (Exception e) { + LOG.warn("wait time string {} is not integer.", waitTimeStr, e); + } + } + if (metadata.containsKey(CONSUL_ERROR_SLEEP_KEY) && StringUtils.isNotBlank(metadata.get(CONSUL_ERROR_SLEEP_KEY))) { + String consulErrorSleepStr = metadata.get(CONSUL_ERROR_SLEEP_KEY); + try { + long consulErrorSleep = Long.parseLong(consulErrorSleepStr); + consulContext.setConsulErrorSleep(consulErrorSleep); + } catch (Exception e) { + LOG.warn("delay string {} is not integer.", consulErrorSleepStr, e); + } + } + + // init consul service + consulServiceMap.put(ServiceEventKey.EventType.INSTANCE, new InstanceService(consulClient, consulRawClient, consulContext, "consul-instance", mapper)); + consulServiceMap.put(ServiceEventKey.EventType.SERVICE, new ServiceService(consulClient, consulRawClient, consulContext, "consul-service", mapper)); initialized = true; } + @Override + protected void doDestroy() { + for (ConsulService consulService : consulServiceMap.values()) { + consulService.destroy(); + } + } + @Override public void postContextInit(Extensions ctx) throws PolarisException { // do nothing @@ -346,86 +378,6 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException { } } - @Override - public ServiceInstancesResponse syncGetServiceInstances(ServiceUpdateTask serviceUpdateTask) { - List instanceList = new ArrayList<>(); - String serviceId = serviceUpdateTask.getServiceEventKey().getService(); - String tag = consulContext.getQueryTag(); - String token = consulContext.getAclToken(); - boolean onlyPassing = consulContext.getQueryPassing(); - UrlParameters tokenParam = StringUtils.isNotBlank(token) ? new SingleUrlParameters("token", token) : null; - UrlParameters tagParams = StringUtils.isNotBlank(tag) ? new SingleUrlParameters("tag", tag) : null; - UrlParameters passingParams = onlyPassing ? new SingleUrlParameters("passing") : null; - UrlParameters nsTypeParam = new SingleUrlParameters("nsType", "DEF_AND_GLOBAL"); - - try { - HttpResponse rawResponse = consulRawClient.makeGetRequest("/v1/health/service/" + serviceId, tagParams, - passingParams, QueryParams.DEFAULT, tokenParam, nsTypeParam); - List value; - if (rawResponse.getStatusCode() == 200) { - value = getGson().fromJson(rawResponse.getContent(), - new TypeToken>() { - }.getType()); - } else { - String rawResponseStr = ""; - try { - rawResponseStr = mapper.writeValueAsString(rawResponse); - } catch (JsonProcessingException ignore) { - } - LOG.error("get service server list occur error. serviceId: {}. RawResponse: {}", serviceId, - rawResponseStr); - throw new OperationException(rawResponse); - } - if (CollectionUtils.isNotEmpty(value)) { - for (HealthService service : value) { - DefaultInstance instance = new DefaultInstance(); - String host = findHost(service); - instance.setId(service.getService().getId()); - instance.setService(service.getService().getService()); - instance.setHost(host); - instance.setPort(service.getService().getPort()); - instance.setMetadata(getMetadata(service)); - instance.setHealthy(true); - instance.setIsolated(false); - instanceList.add(instance); - } - } - return new ServiceInstancesResponse(String.valueOf(rawResponse.getConsulIndex()), instanceList); - } catch (ConsulException e) { - LOG.error("Get service instances of {} sync failed.", serviceId, e); - throw ServerErrorResponseException.build(ErrorCode.SERVER_USER_ERROR.ordinal(), - String.format("Get service instances of %s sync failed.", - serviceUpdateTask.getServiceEventKey().getServiceKey())); - } - } - - @Override - public Services syncGetServices(ServiceUpdateTask serviceUpdateTask) { - Services services = new ServicesByProto(new ArrayList<>()); - try { - List serviceList; - String aclToken = consulContext.getAclToken(); - if (StringUtils.isNotBlank(aclToken)) { - serviceList = new ArrayList<>(consulClient.getCatalogServices(QueryParams.DEFAULT, aclToken).getValue() - .keySet()); - } else { - serviceList = new ArrayList<>(consulClient.getCatalogServices(QueryParams.DEFAULT).getValue() - .keySet()); - } - - for (String s : serviceList) { - ServiceInfo serviceInfo = new ServiceInfo(); - serviceInfo.setService(s); - services.getServices().add(serviceInfo); - } - } catch (ConsulException e) { - throw ServerErrorResponseException.build(ErrorCode.SERVER_USER_ERROR.ordinal(), - String.format("Get services of %s instances sync failed.", - serviceUpdateTask.getServiceEventKey().getServiceKey())); - } - return services; - } - @Override public ReportClientResponse reportClient(ReportClientRequest req) throws PolarisException { return null; diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulContext.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulContext.java index 3feaed38d..de9e683a1 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulContext.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulContext.java @@ -52,6 +52,16 @@ public class ConsulContext { private Boolean queryPassing; + private long consulErrorSleep; + + /** + * The number of seconds to wait (or block) for watch query, defaults to 55. + * Needs to be less than default ConsulClient (defaults to 60). To increase + * ConsulClient timeout create a ConsulClient bean with a custom ConsulRawClient + * with a custom HttpClient. + */ + private int waitTime; + public ConsulContext() { serviceName = ""; instanceId = ""; @@ -63,6 +73,8 @@ public ConsulContext() { checkId = ""; queryTag = ""; queryPassing = true; + consulErrorSleep = 60000L; + waitTime = 55; } public ServerConnectorConfig getConnectorConfig() { @@ -152,4 +164,20 @@ public Boolean getQueryPassing() { public void setQueryPassing(Boolean queryPassing) { this.queryPassing = queryPassing; } + + public long getConsulErrorSleep() { + return consulErrorSleep; + } + + public void setConsulErrorSleep(long consulErrorSleep) { + this.consulErrorSleep = consulErrorSleep; + } + + public int getWaitTime() { + return waitTime; + } + + public void setWaitTime(int waitTime) { + this.waitTime = waitTime; + } } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServerUtils.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServerUtils.java index 8e6ab1797..2b2985680 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServerUtils.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServerUtils.java @@ -29,6 +29,9 @@ import java.util.List; import java.util.Map; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.SERVER_CONNECTOR_TYPE; + /** * Copy from spring-cloud-consul-discovery. * {@link org.springframework.cloud.consul.discovery.ConsulServerUtils} @@ -76,6 +79,7 @@ public static Map getMetadata(HealthService healthService) { metadata.put(entry.getKey(), entry.getValue()); } } + metadata.put(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_CONSUL); return metadata; } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java new file mode 100644 index 000000000..2aa770694 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulServiceUpdateTask.java @@ -0,0 +1,73 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.consul; + +import com.tencent.polaris.api.plugin.server.ServerEvent; +import com.tencent.polaris.api.plugin.server.ServiceEventHandler; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector; +import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant; +import com.tencent.polaris.plugins.connector.consul.service.ConsulService; +import org.slf4j.Logger; + +/** + * Consul service update task. + * + * @author Haotian Zhang + */ +public class ConsulServiceUpdateTask extends ServiceUpdateTask { + + private static final Logger LOG = LoggerFactory.getLogger(ConsulServiceUpdateTask.class); + + public ConsulServiceUpdateTask(ServiceEventHandler handler, + DestroyableServerConnector connector) { + super(handler, connector); + } + + @Override + public void execute() { + execute(this); + } + + @Override + public void execute(ServiceUpdateTask serviceUpdateTask) { + if (serviceUpdateTask.getTaskType() == ServiceUpdateTaskConstant.Type.FIRST) { + LOG.info("[ConsulAPIConnector]start to run first task {}", serviceUpdateTask); + } else { + LOG.debug("[ConsulAPIConnector]start to run task {}", serviceUpdateTask); + } + if (serverConnector instanceof ConsulAPIConnector) { + ConsulAPIConnector consulAPIConnector = (ConsulAPIConnector) serverConnector; + ConsulService consulService = consulAPIConnector.getConsulService(serviceUpdateTask.getServiceEventKey().getEventType()); + if (consulService != null) { + consulService.asyncSendRequest(serviceUpdateTask); + } + } + } + + @Override + protected void handle(Throwable throwable) { + LOG.error("Consul service task execute error.", throwable); + } + + @Override + public boolean notifyServerEvent(ServerEvent serverEvent) { + return false; + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java new file mode 100644 index 000000000..5f7d4822b --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ConsulService.java @@ -0,0 +1,77 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.consul.service; + +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.ConsulRawClient; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.tencent.polaris.api.control.Destroyable; +import com.tencent.polaris.api.utils.ThreadPoolUtils; +import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.consul.ConsulContext; +import org.slf4j.Logger; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author Haotian Zhang + */ +public abstract class ConsulService extends Destroyable { + + private static final Logger LOG = LoggerFactory.getLogger(ConsulService.class); + + protected final ConsulClient consulClient; + + protected final ConsulRawClient consulRawClient; + + protected final ConsulContext consulContext; + + protected final ObjectMapper mapper; + + protected final ExecutorService refreshExecutor; + + public ConsulService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, + String threadName, ObjectMapper mapper) { + this.consulClient = consulClient; + this.consulRawClient = consulRawClient; + this.consulContext = consulContext; + this.mapper = mapper; + NamedThreadFactory threadFactory = new NamedThreadFactory(threadName); + this.refreshExecutor = Executors.newFixedThreadPool(8, threadFactory); + } + + public void asyncSendRequest(ServiceUpdateTask serviceUpdateTask) { + this.refreshExecutor.execute(() -> { + try { + sendRequest(serviceUpdateTask); + } catch (Throwable throwable) { + LOG.error("Send request with throwable.", throwable); + } + }); + } + + protected abstract void sendRequest(ServiceUpdateTask serviceUpdateTask); + + @Override + protected void doDestroy() { + ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{refreshExecutor}); + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java new file mode 100644 index 000000000..13eaafc18 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/InstanceService.java @@ -0,0 +1,193 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.consul.service; + +import com.ecwid.consul.ConsulException; +import com.ecwid.consul.SingleUrlParameters; +import com.ecwid.consul.UrlParameters; +import com.ecwid.consul.transport.HttpResponse; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.ConsulRawClient; +import com.ecwid.consul.v1.OperationException; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.health.model.HealthService; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.reflect.TypeToken; +import com.google.protobuf.BoolValue; +import com.google.protobuf.StringValue; +import com.google.protobuf.UInt32Value; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; +import com.tencent.polaris.api.exception.ServerErrorResponseException; +import com.tencent.polaris.api.plugin.server.ServerEvent; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.consul.ConsulContext; +import com.tencent.polaris.specification.api.v1.model.ModelProto; +import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; +import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.ecwid.consul.json.GsonFactory.getGson; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; +import static com.tencent.polaris.plugins.connector.consul.ConsulServerUtils.findHost; +import static com.tencent.polaris.plugins.connector.consul.ConsulServerUtils.getMetadata; + +/** + * @author Haotian Zhang + */ +public class InstanceService extends ConsulService { + + private static final Logger LOG = LoggerFactory.getLogger(InstanceService.class); + + private final Map serviceConsulIndex = new ConcurrentHashMap<>(); + + public InstanceService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, + String threadName, ObjectMapper mapper) { + super(consulClient, consulRawClient, consulContext, threadName, mapper); + } + + @Override + public void sendRequest(ServiceUpdateTask serviceUpdateTask) { + String namespace = serviceUpdateTask.getServiceEventKey().getNamespace(); + String serviceId = serviceUpdateTask.getServiceEventKey().getService(); + String tag = consulContext.getQueryTag(); + String token = consulContext.getAclToken(); + boolean onlyPassing = consulContext.getQueryPassing(); + UrlParameters tokenParam = StringUtils.isNotBlank(token) ? new SingleUrlParameters("token", token) : null; + UrlParameters tagParams = StringUtils.isNotBlank(tag) ? new SingleUrlParameters("tag", tag) : null; + UrlParameters passingParams = onlyPassing ? new SingleUrlParameters("passing") : null; + UrlParameters nsTypeParam = new SingleUrlParameters("nsType", "DEF_AND_GLOBAL"); + Long index = getServersConsulIndex(serviceId); + int code = ServerCodes.DATA_NO_CHANGE; + QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), index); + try { + LOG.debug("Begin Get service instances of {} sync", serviceId); + HttpResponse rawResponse = consulRawClient.makeGetRequest("/v1/health/service/" + serviceId, tagParams, + passingParams, tokenParam, nsTypeParam, queryParams); + if (rawResponse != null) { + if (!index.equals(rawResponse.getConsulIndex())) { + code = ServerCodes.EXECUTE_SUCCESS; + } + LOG.debug("raw response: " + rawResponse.getContent() + " ; onlyPassing: " + onlyPassing); + List value; + if (rawResponse.getStatusCode() == 200) { + value = getGson().fromJson(rawResponse.getContent(), + new TypeToken>() { + }.getType()); + } else { + String rawResponseStr = ""; + try { + rawResponseStr = mapper.writeValueAsString(rawResponse); + } catch (JsonProcessingException ignore) { + } + LOG.error("get service server list occur error. serviceId: {}. RawResponse: {}", serviceId, + rawResponseStr); + throw new OperationException(rawResponse); + } + ServiceProto.Service.Builder newServiceBuilder = ServiceProto.Service.newBuilder(); + newServiceBuilder.setNamespace(StringValue.of(namespace)); + newServiceBuilder.setName(StringValue.of(serviceUpdateTask.getServiceEventKey().getService())); + newServiceBuilder.setRevision(StringValue.of(String.valueOf(rawResponse.getConsulIndex()))); + ServiceProto.Service service = newServiceBuilder.build(); + List instanceList = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(value)) { + for (HealthService healthService : value) { + ServiceProto.Instance.Builder instanceBuilder = ServiceProto.Instance.newBuilder() + .setNamespace(StringValue.of(namespace)) + .setService(StringValue.of(serviceId)) + .setHost(StringValue.of(findHost(healthService))) + .setPort(UInt32Value.of(healthService.getService().getPort())) + .setHealthy(BoolValue.of(true)) + .setIsolate(BoolValue.of(false)); + // set Id + if (StringUtils.isNotBlank(healthService.getService().getId())) { + instanceBuilder.setId(StringValue.of(healthService.getService().getId())); + } else { + String id = + serviceId + "-" + findHost(healthService).replace(".", "-") + "-" + healthService.getService().getPort(); + instanceBuilder.setId(StringValue.of(id)); + LOG.info("Instance with name {} host {} port {} doesn't have id.", serviceId + , findHost(healthService), healthService.getService().getPort()); + } + // set location + instanceBuilder.setLocation(ModelProto.Location.newBuilder().build()); + // set metadata + Map metadata = getMetadata(healthService); + if (CollectionUtils.isNotEmpty(metadata)) { + instanceBuilder.putAllMetadata(metadata); + } + instanceList.add(instanceBuilder.build()); + } + } + + ResponseProto.DiscoverResponse.Builder newDiscoverResponseBuilder = ResponseProto.DiscoverResponse.newBuilder(); + newDiscoverResponseBuilder.setService(service); + newDiscoverResponseBuilder.addAllInstances(instanceList); + newDiscoverResponseBuilder.setCode(UInt32Value.of(code)); + + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL); + boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent); + // 即使无服务,也要更新 index + if (rawResponse.getConsulIndex() != null) { + setServersConsulIndex(serviceId, index, rawResponse.getConsulIndex()); + } + if (!svcDeleted) { + serviceUpdateTask.addUpdateTaskSet(); + } + } + } catch (ConsulException e) { + LOG.error("Get service instances of {} sync failed. Will sleep for {} ms.", serviceId, consulContext.getConsulErrorSleep(), e); + try { + Thread.sleep(consulContext.getConsulErrorSleep()); + } catch (Exception e1) { + LOG.error("error in sleep, msg: " + e.getMessage()); + } + PolarisException error = ServerErrorResponseException.build(ErrorCode.NETWORK_ERROR.getCode(), + String.format("Get service instances of %s sync failed.", + serviceUpdateTask.getServiceEventKey().getServiceKey())); + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), null, error, SERVER_CONNECTOR_CONSUL); + serviceUpdateTask.notifyServerEvent(serverEvent); + } catch (Throwable throwable) { + LOG.error("Get service instances of {} sync failed.", serviceId, throwable); + } + } + + private Long getServersConsulIndex(String serviceId) { + Long index = serviceConsulIndex.get(serviceId); + if (index != null) { + return index; + } + setServersConsulIndex(serviceId, null, -1L); + return -1L; + } + + private void setServersConsulIndex(String serviceId, Long lastIndex, Long newIndex) { + LOG.debug("serviceId: " + serviceId, "; lastIndex: " + lastIndex + "; newIndex: " + newIndex); + serviceConsulIndex.put(serviceId, newIndex); + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java new file mode 100644 index 000000000..f5f5022bb --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/ServiceService.java @@ -0,0 +1,126 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.consul.service; + +import com.ecwid.consul.ConsulException; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.ConsulRawClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.StringValue; +import com.google.protobuf.UInt32Value; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; +import com.tencent.polaris.api.exception.ServerErrorResponseException; +import com.tencent.polaris.api.plugin.server.ServerEvent; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.consul.ConsulContext; +import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; +import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.SERVER_CONNECTOR_TYPE; + +/** + * @author Haotian Zhang + */ +public class ServiceService extends ConsulService { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceService.class); + + private final AtomicLong catalogConsulIndex = new AtomicLong(-1L); + + public ServiceService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, + String threadName, ObjectMapper mapper) { + super(consulClient, consulRawClient, consulContext, threadName, mapper); + } + + @Override + public void sendRequest(ServiceUpdateTask serviceUpdateTask) { + try { + Long index = catalogConsulIndex.get(); + QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), index); + String aclToken = consulContext.getAclToken(); + Response>> response; + int code = ServerCodes.DATA_NO_CHANGE; + if (StringUtils.isNotBlank(aclToken)) { + response = consulClient.getCatalogServices(queryParams, aclToken); + } else { + response = consulClient.getCatalogServices(queryParams); + } + + Long consulIndex = response.getConsulIndex(); + if (!index.equals(consulIndex)) { + code = ServerCodes.EXECUTE_SUCCESS; + } + + String namespace = serviceUpdateTask.getServiceEventKey().getNamespace(); + + ServiceProto.Service.Builder newServiceBuilder = ServiceProto.Service.newBuilder(); + newServiceBuilder.setNamespace(StringValue.of(namespace)); + newServiceBuilder.setName(StringValue.of(serviceUpdateTask.getServiceEventKey().getService())); + newServiceBuilder.setRevision(StringValue.of(String.valueOf(consulIndex))); + ServiceProto.Service newService = newServiceBuilder.build(); + + List orginalServiceList = new ArrayList<>(response.getValue().keySet()); + List serviceList = new ArrayList<>(); + for (String s : orginalServiceList) { + ServiceProto.Service service = ServiceProto.Service.newBuilder() + .setNamespace(StringValue.of(namespace)) + .setName(StringValue.of(s)) + .putMetadata(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_CONSUL).build(); + serviceList.add(service); + } + + ResponseProto.DiscoverResponse.Builder newDiscoverResponseBuilder = ResponseProto.DiscoverResponse.newBuilder(); + newDiscoverResponseBuilder.setService(newService); + newDiscoverResponseBuilder.addAllServices(serviceList); + newDiscoverResponseBuilder.setCode(UInt32Value.of(code)); + + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL); + boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent); + if (consulIndex != null) { + catalogConsulIndex.set(consulIndex); + } + if (!svcDeleted) { + serviceUpdateTask.addUpdateTaskSet(); + } + } catch (ConsulException e) { + LOG.error("Get services sync failed. Will sleep for {} ms.", consulContext.getConsulErrorSleep(), e); + try { + Thread.sleep(consulContext.getConsulErrorSleep()); + } catch (Exception e1) { + LOG.error("error in sleep, msg: " + e.getMessage()); + } + PolarisException error = ServerErrorResponseException.build(ErrorCode.NETWORK_ERROR.getCode(), + "Get services sync failed."); + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), null, error, SERVER_CONNECTOR_CONSUL); + serviceUpdateTask.notifyServerEvent(serverEvent); + } + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosConnector.java b/polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosConnector.java index b4f886a4a..c0a5ac782 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-nacos/src/main/java/com/tencent/polaris/plugins/connector/nacos/NacosConnector.java @@ -28,7 +28,6 @@ import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.common.utils.MD5Utils; import com.tencent.polaris.api.config.global.ServerConnectorConfig; -import com.tencent.polaris.api.config.plugin.DefaultPlugins; import com.tencent.polaris.api.exception.ErrorCode; import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.exception.RetriableException; @@ -37,19 +36,8 @@ import com.tencent.polaris.api.plugin.common.InitContext; import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.server.CommonProviderRequest; -import com.tencent.polaris.api.plugin.server.CommonProviderResponse; -import com.tencent.polaris.api.plugin.server.ReportClientRequest; -import com.tencent.polaris.api.plugin.server.ReportClientResponse; -import com.tencent.polaris.api.plugin.server.ReportServiceContractRequest; -import com.tencent.polaris.api.plugin.server.ReportServiceContractResponse; -import com.tencent.polaris.api.plugin.server.ServerConnector; -import com.tencent.polaris.api.plugin.server.ServiceEventHandler; -import com.tencent.polaris.api.pojo.DefaultInstance; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.api.pojo.ServiceInfo; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.pojo.Services; +import com.tencent.polaris.api.plugin.server.*; +import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.pojo.ServicesByProto; @@ -61,21 +49,14 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static com.alibaba.nacos.api.common.Constants.DEFAULT_GROUP; import static com.alibaba.nacos.api.common.Constants.GROUP; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_NACOS; +import static com.tencent.polaris.plugins.connector.common.constant.ConnectorConstant.SERVER_CONNECTOR_TYPE; /** * An implement of {@link ServerConnector} to connect to Nacos Server. @@ -140,7 +121,7 @@ public class NacosConnector extends DestroyableServerConnector { @Override public String getName() { - return DefaultPlugins.SERVER_CONNECTOR_NACOS; + return SERVER_CONNECTOR_NACOS; } @Override @@ -154,7 +135,7 @@ public void init(InitContext ctx) throws PolarisException { List serverConnectorConfigs = ctx.getConfig().getGlobal().getServerConnectors(); if (CollectionUtils.isNotEmpty(serverConnectorConfigs)) { for (ServerConnectorConfigImpl serverConnectorConfig : serverConnectorConfigs) { - if (DefaultPlugins.SERVER_CONNECTOR_NACOS.equals(serverConnectorConfig.getProtocol())) { + if (SERVER_CONNECTOR_NACOS.equals(serverConnectorConfig.getProtocol())) { initActually(ctx, serverConnectorConfig); } } @@ -357,6 +338,7 @@ public ServiceInstancesResponse syncGetServiceInstances(ServiceUpdateTask servic String region = instance.getMetadata().getOrDefault("region", ""); String zone = instance.getMetadata().getOrDefault("zone", ""); String campus = instance.getMetadata().getOrDefault("campus", ""); + instance.getMetadata().put(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_NACOS); if (StringUtils.isNotEmpty(region)) { instance.setRegion(region); @@ -411,6 +393,8 @@ public Services syncGetServices(ServiceUpdateTask serviceUpdateTask) { ServiceInfo serviceInfo = new ServiceInfo(); serviceInfo.setNamespace(namespace); serviceInfo.setService(name); + serviceInfo.setMetadata(new HashMap<>()); + serviceInfo.getMetadata().put(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_NACOS); services.getServices().add(serviceInfo); }); diff --git a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcServiceUpdateTask.java index 61d5045a7..3f31e4db9 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcServiceUpdateTask.java @@ -27,9 +27,10 @@ import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status; import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type; +import org.slf4j.Logger; + import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; public class GrpcServiceUpdateTask extends ServiceUpdateTask { @@ -51,6 +52,7 @@ public void execute() { execute(this); } + @Override public void execute(ServiceUpdateTask serviceUpdateTask) { try { if (serviceUpdateTask.getTaskType() == Type.FIRST) { diff --git a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/test/java/com/tencent/polaris/plugins/connector/grpc/SpecStreamClientTest.java b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/test/java/com/tencent/polaris/plugins/connector/grpc/SpecStreamClientTest.java index d5b7224bf..4ee48a456 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/test/java/com/tencent/polaris/plugins/connector/grpc/SpecStreamClientTest.java +++ b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/test/java/com/tencent/polaris/plugins/connector/grpc/SpecStreamClientTest.java @@ -125,7 +125,7 @@ public MockServiceUpdateTask(ServiceEventHandler handler, BiConsumer3.21.7 2.6 1.15 - 29.0-android + 32.0.1-jre 1.7.25 1.2.13 2.19.0