Skip to content

feat:support consul service update task. #525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,6 +37,8 @@ public class ServicesByProto implements Services, RegistryCacheValue {

private final List<ServiceInfo> services;

private final List<ServiceProto.Service> originServicesList;

private final boolean initialized;

private final boolean loadedFromFile;
Expand All @@ -46,13 +49,15 @@ public class ServicesByProto implements Services, RegistryCacheValue {

public ServicesByProto() {
this.services = Collections.emptyList();
this.originServicesList = Collections.emptyList();
this.initialized = false;
this.loadedFromFile = false;
this.hashCode = 0;
}

public ServicesByProto(List<ServiceInfo> services) {
this.services = services;
this.originServicesList = new ArrayList<>();
this.initialized = true;
this.loadedFromFile = false;
this.hashCode = 0;
Expand All @@ -62,6 +67,7 @@ public ServicesByProto(ResponseProto.DiscoverResponse response, boolean loadFrom
List<ServiceProto.Service> tmpServices = response.getServicesList();

this.services = new ArrayList<>();
this.originServicesList = new ArrayList<>();
this.svcKey = new ServiceKey("", "");

if (CollectionUtils.isNotEmpty(tmpServices)) {
Expand All @@ -74,10 +80,11 @@ public ServicesByProto(ResponseProto.DiscoverResponse response, boolean loadFrom
.metadata(service.getMetadataMap())
.revision(service.getRevision().getValue())
.build());
originServicesList.add(service);
});
}

this.hashCode = Objects.hash(response.getServicesList());
this.hashCode = Objects.hash(tmpServices);
this.initialized = true;
this.loadedFromFile = loadFromFile;
}
Expand Down Expand Up @@ -115,18 +122,23 @@ public List<ServiceInfo> getServices() {
return services;
}

public List<ServiceProto.Service> getOriginServicesList() {
return originServicesList;
}

public int getHashCode() {
return hashCode;
}

@Override
public String toString() {
return "ServicesByProto{" +
"svcKey=" + svcKey +
", services=" + services +
"services=" + services +
", originServicesList=" + originServicesList +
", initialized=" + initialized +
", loadedFromFile=" + loadedFromFile +
", hashCode=" + hashCode +
", svcKey=" + svcKey +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,13 @@
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.plugin.route.RouteInfo;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.*;
import com.tencent.polaris.api.pojo.ServiceEventKey.EventType;
import com.tencent.polaris.api.pojo.ServiceEventKeysProvider;
import com.tencent.polaris.api.pojo.ServiceInfo;
import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.pojo.SourceService;
import com.tencent.polaris.api.rpc.Criteria;
import com.tencent.polaris.api.rpc.GetAllInstancesRequest;
import com.tencent.polaris.api.rpc.GetHealthyInstancesRequest;
import com.tencent.polaris.api.rpc.GetInstancesRequest;
import com.tencent.polaris.api.rpc.GetOneInstanceRequest;
import com.tencent.polaris.api.rpc.RequestBaseEntity;
import com.tencent.polaris.api.rpc.*;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.client.flow.FlowControlParam;

import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -67,7 +58,7 @@ public class CommonInstancesRequest implements ServiceEventKeysProvider, FlowCon
/**
* 构造函数
*
* @param request 请求
* @param request 请求
* @param configuration 配置
*/
public CommonInstancesRequest(GetAllInstancesRequest request, Configuration configuration) {
Expand All @@ -84,17 +75,15 @@ public CommonInstancesRequest(GetAllInstancesRequest request, Configuration conf
/**
* 构造函数,获取健康的全部实例。只保留:isolatedRouter,recoverRouter
*
* @param request 请求
* @param request 请求
* @param configuration 配置
*/
public CommonInstancesRequest(GetHealthyInstancesRequest request, Configuration configuration) {
ServiceKey dstSvcKey = new ServiceKey(request.getNamespace(), request.getService());
dstInstanceEventKey = new ServiceEventKey(dstSvcKey, EventType.INSTANCE);
svcEventKeys.add(dstInstanceEventKey);

dstRuleEventKey = new ServiceEventKey(dstSvcKey, EventType.ROUTING);
svcEventKeys.add(dstRuleEventKey);

dstRuleEventKey = null;
srcRuleEventKey = null;

ServiceInfo dstServiceInfo = new ServiceInfo();
Expand Down Expand Up @@ -127,7 +116,7 @@ public CommonInstancesRequest(GetHealthyInstancesRequest request, Configuration
/**
* 构造函数
*
* @param request 请求
* @param request 请求
* @param configuration 配置
*/
public CommonInstancesRequest(GetOneInstanceRequest request, Configuration configuration) {
Expand Down Expand Up @@ -160,7 +149,7 @@ public CommonInstancesRequest(GetOneInstanceRequest request, Configuration confi
/**
* 构造函数
*
* @param request 请求
* @param request 请求
* @param configuration 配置
*/
public CommonInstancesRequest(GetInstancesRequest request, Configuration configuration) {
Expand Down Expand Up @@ -193,7 +182,7 @@ public CommonInstancesRequest(GetInstancesRequest request, Configuration configu
}

public CommonInstancesRequest(ServiceEventKey dstInstanceEventKey, ServiceEventKey dstRuleEventKey, ServiceEventKey srcRuleEventKey,
RouteInfo routeInfo, Criteria criteria, RequestBaseEntity request, Configuration configuration) {
RouteInfo routeInfo, Criteria criteria, RequestBaseEntity request, Configuration configuration) {
this.srcRuleEventKey = srcRuleEventKey;
if (null != srcRuleEventKey) {
svcEventKeys.add(srcRuleEventKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.util.Optional;

import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC;

/**
* 服务变更事件
*
Expand All @@ -44,16 +46,23 @@ public class ServerEvent {
private Object value;
/**
* Polaris的版本号
*
* <p>
* 如果修改了value中的版本号,那么将原来Polaris的版本号保存在这里,
* 主要用于 CompositeServiceUpdateTask。
*/
private Optional<String> polarisRevision = Optional.empty();

private String connectorType;

public ServerEvent(ServiceEventKey serviceEventKey, Object value, PolarisException error) {
this(serviceEventKey, value, error, SERVER_CONNECTOR_GRPC);
}

public ServerEvent(ServiceEventKey serviceEventKey, Object value, PolarisException error, String connectorType) {
this.serviceEventKey = serviceEventKey;
this.value = value;
this.error = error;
this.connectorType = connectorType;
}

public ServiceEventKey getServiceEventKey() {
Expand Down Expand Up @@ -83,4 +92,12 @@ public PolarisException getError() {
public void setError(PolarisException error) {
this.error = error;
}

public String getConnectorType() {
return connectorType;
}

public void setConnectorType(String connectorType) {
this.connectorType = connectorType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -87,6 +84,9 @@ public class ConsulConfigFileConnector implements ConfigFileConnector {

@Override
public void init(InitContext ctx) throws PolarisException {
if (!Objects.equals(ctx.getConfig().getConfigFile().getServerConnector().getConnectorType(), CONSUL_FILE_CONNECTOR_TYPE)) {
return;
}
if (!initialized) {
// init consul client
ConnectorConfigImpl connectorConfig = ctx.getConfig().getConfigFile().getServerConnector();
Expand Down Expand Up @@ -235,7 +235,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi
if (!newModifyIndex.equals(currentModifyIndex)) {
LOGGER.info("KeyPrefix '{}' has new index {} and new modify index {} with old index {} and old modify index {}",
keyPrefix, newIndex, newModifyIndex, currentIndex, currentModifyIndex);
} else if (LOGGER.isDebugEnabled()) {
} else {
code = CodeProto.Code.DataNoChange.getNumber();
message = "config data is no change";
LOGGER.debug("KeyPrefix '{}' not modified with new index {}, index {} and modify index {}",
Expand All @@ -244,7 +244,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi
// 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry
this.consulIndexes.put(keyPrefix, newIndex);
this.consulModifyIndexes.put(keyPrefix, newModifyIndex);
} else if (LOGGER.isDebugEnabled()) {
} else {
code = CodeProto.Code.DataNoChange.getNumber();
message = "config data is no change";
LOGGER.debug("KeyPrefix '{}' unchanged with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>polaris-plugins-connector</artifactId>
<groupId>com.tencent.polaris</groupId>
Expand All @@ -17,6 +17,11 @@
<description>Polaris Plugins Client Connector Common JAR</description>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status;
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type;
import org.slf4j.Logger;

import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/**
* Abstract service update task class.
Expand Down Expand Up @@ -86,6 +87,10 @@ public boolean setStatus(Status last, Status current) {
return taskStatus.compareAndSet(last, current);
}

public void setLastUpdateTime(long currentTime) {
lastUpdateTime.set(currentTime);
}

public Type getTaskType() {
return taskType.get();
}
Expand All @@ -105,10 +110,17 @@ public void run() {

/**
* Business to be executed.
*
* @throws Throwable
*/
protected abstract void execute() throws Throwable;
public void execute() {

}

/**
* Business with serviceUpdateTask to be executed.
*/
public void execute(ServiceUpdateTask serviceUpdateTask) {

}

/**
* Throwable to be handled.
Expand Down Expand Up @@ -149,13 +161,18 @@ public boolean needUpdate() {
}

@Override
@SuppressWarnings("checkstyle:all")
public String toString() {
return "ServiceUpdateTask{" +
"taskType=" + taskType.get() +
"serviceEventHandler=" + serviceEventHandler +
", serverConnector=" + serverConnector +
", targetClusterType=" + targetClusterType.get() +
", taskType=" + taskType.get() +
", taskStatus=" + taskStatus.get() +
", serviceEventKey=" + serviceEventKey +
", targetClusterType=" + targetClusterType.get() +
", lastUpdateTime=" + lastUpdateTime +
", successUpdates=" + successUpdates +
", refreshIntervalMs=" + refreshIntervalMs +
", eventHandler=" + eventHandler +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.tencent.polaris.plugins.connector.common.constant;

import com.google.common.collect.Lists;

import java.util.List;

import static com.tencent.polaris.api.config.plugin.DefaultPlugins.*;

/**
* @author Haotian Zhang
*/
public interface ConnectorConstant {

String SERVER_CONNECTOR_TYPE = "SERVER_CONNECTOR_TYPE";

List<String> ORDER_LIST = Lists.newArrayList(SERVER_CONNECTOR_GRPC, SERVER_CONNECTOR_CONSUL, SERVER_CONNECTOR_NACOS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ interface MetadataMapKey {
String IP_ADDRESS_KEY = "IP_ADDRESS_KEY";

String PREFER_IP_ADDRESS_KEY = "PREFER_IP_ADDRESS_KEY";

String TAGS_KEY = "TAGS_KEY";

String CHECK_KEY = "CHECK_KEY";

String QUERY_TAG_KEY = "QUERY_TAG_KEY";

String QUERY_PASSING_KEY = "QUERY_PASSING_KEY";

String WAIT_TIME_KEY = "waitTime";

String CONSUL_ERROR_SLEEP_KEY = "consulErrorSleep";
}

}
Loading
Loading