From 12a08d5c0a9c289fba7c4eb6881f6eed6fb9a77c Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Fri, 24 May 2024 13:40:22 +0800 Subject: [PATCH 1/9] feat:support consul config. --- .../api/config/plugin/DefaultPlugins.java | 14 + .../api/config/verify/DefaultValues.java | 450 ++++++++-------- .../configuration/ConnectorConfigImpl.java | 241 ++++----- .../internal/ConfigFilePersistentHandler.java | 482 +++++++++--------- .../polaris-configuration-factory/pom.xml | 6 + .../src/test/resources/polaris.yml | 2 +- polaris-dependencies/pom.xml | 6 + .../consul-configuration-connector/pom.xml | 44 ++ .../consul/ConsulConfigConstants.java | 35 ++ .../connector/consul/ConsulConfigContext.java | 87 ++++ .../consul/ConsulConfigFileConnector.java | 336 ++++++++++++ .../connector/consul/RefreshEventData.java | 74 +++ .../consul/utils/ConsulConfigFileUtils.java | 40 ++ ...i.plugin.configuration.ConfigFileConnector | 1 + .../LocalFileConfigFileConnector.java | 466 ++++++++--------- .../polaris/PolarisConfigFileConnector.java | 4 +- .../pom.xml | 1 + 17 files changed, 1449 insertions(+), 840 deletions(-) create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/pom.xml create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/RefreshEventData.java create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/utils/ConsulConfigFileUtils.java create mode 100644 polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.configuration.ConfigFileConnector diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java index 30cab6710..5e2ad602f 100644 --- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java +++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java @@ -65,4 +65,18 @@ public interface DefaultPlugins { */ String CIRCUIT_BREAKER_COMPOSITE = "composite"; + /** + * polaris 配置中心连接器插件名 + */ + String POLARIS_FILE_CONNECTOR_TYPE = "polaris"; + + /** + * 本地配置连接器插件名 + */ + String LOCAL_FILE_CONNECTOR_TYPE = "local"; + + /** + * consul 配置中心连接器插件名 + */ + String CONSUL_FILE_CONNECTOR_TYPE = "consul"; } diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/verify/DefaultValues.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/verify/DefaultValues.java index dcf45052b..fbb8a5a1d 100644 --- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/verify/DefaultValues.java +++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/verify/DefaultValues.java @@ -31,231 +31,229 @@ */ public interface DefaultValues { - /** - * 默认API调用的超时时间, 1s - */ - long DEFAULT_API_INVOKE_TIMEOUT_MS = Duration.parse("PT5S").toMillis(); - - /** - * 默认API重试间隔, 500ms - */ - long DEFAULT_API_RETRY_INTERVAL_MS = 500; - - - /** - * 默认API上报间隔, 30s - */ - long DEFAULT_API_REPORT_INTERVAL_MS = Duration.parse("PT30S").toMillis(); - - /** - * 默认的服务超时淘汰时间,1H - */ - long DEFAULT_SERVICE_EXPIRE_TIME_MS = Duration.parse("PT1H").toMillis(); - - /** - * 最小服务超时淘汰时间,5s - */ - long MIN_SERVICE_EXPIRE_TIME_MS = Duration.parse("PT5S").toMillis(); - - /** - * 默认的服务刷新间隔, 2s - */ - long MIN_SERVICE_REFRESH_INTERVAL_MS = Duration.parse("PT2S").toMillis(); - - /** - * 默认消息的超时时间 - */ - long DEFAULT_SERVER_MSG_TIMEOUT_MS = 1000; - - /** - * 默认消息的超时时间 - */ - long DEFAULT_SERVER_SERVICE_REFRESH_INTERVAL_MS = 60000; - - /** - * 默认SDK往Server连接超时时间间隔 - */ - long DEFAULT_SERVER_CONNECT_TIMEOUT_MS = 50; - - /** - * 默认发送队列的buffer大小,支持的最大瞬时并发度,默认10W - */ - int DEFAULT_REQUEST_QUEUE_SIZE = 100000; - - /** - * 默认server的切换时间时间 - */ - long DEFAULT_SERVER_SWITCH_INTERVAL_MS = Duration.parse("PT10M").toMillis(); - - /** - * 默认空闲连接过期时间 - */ - long DEFAULT_CONNECTION_IDLE_TIMEOUT_MS = Duration.parse("PT60S").toMillis(); - - /** - * 默认缓存持久化目录 - */ - String DEFAULT_CACHE_PERSIST_DIR = "./polaris/backup"; - - String CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR = DEFAULT_CACHE_PERSIST_DIR + "/config"; - - /** - * 是否打开熔断能力,默认true - */ - boolean DEFAULT_CIRCUIT_BREAKER_ENABLE = true; - - /** - * 是否打开健康探测能力,默认true - */ - boolean DEFAULT_OUTLIER_DETECT_ENABLE = true; - - /** - * 默认熔断节点检查周期 - */ - long DEFAULT_CIRCUIT_BREAKER_CHECK_PERIOD_MS = 30 * 1000; - - /** - * 最小节点检查周期 - */ - long MIN_CIRCUIT_BREAKER_CHECK_PERIOD_MS = 1000; - - /** - * 默认熔断周期,被熔断后多久变为半开 - */ - long DEFAULT_SLEEP_WINDOW_MS = 30 * 1000; - - /** - * 最小熔断周期 - */ - long MIN_SLEEP_WINDOW_MS = 1000; - - /** - * 默认恢复周期,半开后按多久的统计窗口进行恢复统计 - */ - long DEFAULT_RECOVER_WINDOW_MS = 60 * 1000; - - /** - * 最小恢复周期 - */ - long MIN_RECOVER_WINDOW_MS = 10 * 6000; - - /** - * 默认恢复统计的滑桶数 - */ - int DEFAULT_RECOVER_NUM_BUCKETS = 10; - - /** - * 最小恢复统计的滑桶数 - */ - int MIN_RECOVER_NUM_BUCKETS = 1; - - /** - * 半开状态后分配的探测请求数 - */ - int DEFAULT_REQUEST_COUNT_AFTER_HALF_OPEN = 10; - - /** - * 半开状态后恢复的成功请求数 - */ - int DEFAULT_SUCCESS_COUNT_AFTER_HALF_OPEN = 8; - - /** - * 默认的服务端连接器插件 - */ - String DEFAULT_SERVERCONNECTOR = DefaultPlugins.SERVER_CONNECTOR_GRPC; - - /** - * 默认本地缓存策略 - */ - String DEFAULT_LOCALCACHE = DefaultPlugins.LOCAL_REGISTRY_IN_MEMORY; - - /** - * 默认负载均衡器 - */ - String DEFAULT_LOADBALANCER = LoadBalanceConfig.LOAD_BALANCE_WEIGHTED_RANDOM; - - /** - * 默认错误率熔断器 - */ - String DEFAULT_CIRCUITBREAKER_ERRRATE = DefaultPlugins.CIRCUIT_BREAKER_ERROR_RATE; - - /** - * 默认持续错误熔断器 - */ - String DEFAULT_CIRCUITBREAKER_ERRCOUNT = DefaultPlugins.CIRCUIT_BREAKER_ERROR_COUNT; - - /** - * 默认健康探测手段,tcp - */ - String DEFAULT_HEALTH_CHECKER_TCP = "tcp"; - - /** - * 默认健康探测手段,udp - */ - String DEFAULT_HEALTH_CHECKER_UDP = "udp"; - - /** - * 默认健康探测手段,http - */ - String DEFAULT_HEALTH_CHECKER_HTTP = "http"; - - /** - * 默认权重值 - */ - int DEFAULT_WEIGHT = 100; - - /** - * 默认负载均衡放开限制的故障节点上限阈值 - */ - double DEFAULT_MAX_EJECT_PRECENT_THRESHOLD = 0.9; - - /** - * 默认最大重试次数,默认不重试 - */ - int DEFAULT_MAX_RETRY_TIMES = 0; - - /** - * 默认缓存最大写重试次数 - */ - int DEFAULT_PERSIST_MAX_WRITE_RETRY = 1; - - /** - * 默认缓存最大读重试次数 - */ - int DEFAULT_PERSIST_MAX_READ_RETRY = 0; - - /** - * 默认缓存持久化时间间隔 - */ - long DEFAULT_PERSIST_RETRY_INTERVAL_MS = 500; - - /** - * 默认健康探测周期 - */ - long DEFAULT_OUTLIER_DETECT_INTERVAL_MS = Duration.parse("PT10S").toMillis(); - - /** - * 最小定时任务轮询周期 - */ - long MIN_TIMING_INTERVAL_MS = 100; - - String DEFAULT_SYSTEM_NAMESPACE = "Polaris"; - String DEFAULT_SYSTEM_REFRESH_INTERVAL = "10m"; - - String DEFAULT_BUILTIN_DISCOVER = "polaris.builtin"; - - /** - * 默认连接器协议 - */ - String DEFAULT_DISCOVER_PROTOCOL = "grpc"; - - /** - * 默认启用本地缓存机制 - */ - boolean DEFAULT_PERSIST_ENABLE = true; - - String LOCAL_FILE_CONNECTOR_TYPE = "localFile"; - - String PATTERN_CONFIG_FILE = "%s#%s#%s.yaml"; + /** + * 默认API调用的超时时间, 1s + */ + long DEFAULT_API_INVOKE_TIMEOUT_MS = Duration.parse("PT5S").toMillis(); + + /** + * 默认API重试间隔, 500ms + */ + long DEFAULT_API_RETRY_INTERVAL_MS = 500; + + + /** + * 默认API上报间隔, 30s + */ + long DEFAULT_API_REPORT_INTERVAL_MS = Duration.parse("PT30S").toMillis(); + + /** + * 默认的服务超时淘汰时间,1H + */ + long DEFAULT_SERVICE_EXPIRE_TIME_MS = Duration.parse("PT1H").toMillis(); + + /** + * 最小服务超时淘汰时间,5s + */ + long MIN_SERVICE_EXPIRE_TIME_MS = Duration.parse("PT5S").toMillis(); + + /** + * 默认的服务刷新间隔, 2s + */ + long MIN_SERVICE_REFRESH_INTERVAL_MS = Duration.parse("PT2S").toMillis(); + + /** + * 默认消息的超时时间 + */ + long DEFAULT_SERVER_MSG_TIMEOUT_MS = 1000; + + /** + * 默认消息的超时时间 + */ + long DEFAULT_SERVER_SERVICE_REFRESH_INTERVAL_MS = 60000; + + /** + * 默认SDK往Server连接超时时间间隔 + */ + long DEFAULT_SERVER_CONNECT_TIMEOUT_MS = 50; + + /** + * 默认发送队列的buffer大小,支持的最大瞬时并发度,默认10W + */ + int DEFAULT_REQUEST_QUEUE_SIZE = 100000; + + /** + * 默认server的切换时间时间 + */ + long DEFAULT_SERVER_SWITCH_INTERVAL_MS = Duration.parse("PT10M").toMillis(); + + /** + * 默认空闲连接过期时间 + */ + long DEFAULT_CONNECTION_IDLE_TIMEOUT_MS = Duration.parse("PT60S").toMillis(); + + /** + * 默认缓存持久化目录 + */ + String DEFAULT_CACHE_PERSIST_DIR = "./polaris/backup"; + + String CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR = DEFAULT_CACHE_PERSIST_DIR + "/config"; + + /** + * 是否打开熔断能力,默认true + */ + boolean DEFAULT_CIRCUIT_BREAKER_ENABLE = true; + + /** + * 是否打开健康探测能力,默认true + */ + boolean DEFAULT_OUTLIER_DETECT_ENABLE = true; + + /** + * 默认熔断节点检查周期 + */ + long DEFAULT_CIRCUIT_BREAKER_CHECK_PERIOD_MS = 30 * 1000; + + /** + * 最小节点检查周期 + */ + long MIN_CIRCUIT_BREAKER_CHECK_PERIOD_MS = 1000; + + /** + * 默认熔断周期,被熔断后多久变为半开 + */ + long DEFAULT_SLEEP_WINDOW_MS = 30 * 1000; + + /** + * 最小熔断周期 + */ + long MIN_SLEEP_WINDOW_MS = 1000; + + /** + * 默认恢复周期,半开后按多久的统计窗口进行恢复统计 + */ + long DEFAULT_RECOVER_WINDOW_MS = 60 * 1000; + + /** + * 最小恢复周期 + */ + long MIN_RECOVER_WINDOW_MS = 10 * 6000; + + /** + * 默认恢复统计的滑桶数 + */ + int DEFAULT_RECOVER_NUM_BUCKETS = 10; + + /** + * 最小恢复统计的滑桶数 + */ + int MIN_RECOVER_NUM_BUCKETS = 1; + + /** + * 半开状态后分配的探测请求数 + */ + int DEFAULT_REQUEST_COUNT_AFTER_HALF_OPEN = 10; + + /** + * 半开状态后恢复的成功请求数 + */ + int DEFAULT_SUCCESS_COUNT_AFTER_HALF_OPEN = 8; + + /** + * 默认的服务端连接器插件 + */ + String DEFAULT_SERVERCONNECTOR = DefaultPlugins.SERVER_CONNECTOR_GRPC; + + /** + * 默认本地缓存策略 + */ + String DEFAULT_LOCALCACHE = DefaultPlugins.LOCAL_REGISTRY_IN_MEMORY; + + /** + * 默认负载均衡器 + */ + String DEFAULT_LOADBALANCER = LoadBalanceConfig.LOAD_BALANCE_WEIGHTED_RANDOM; + + /** + * 默认错误率熔断器 + */ + String DEFAULT_CIRCUITBREAKER_ERRRATE = DefaultPlugins.CIRCUIT_BREAKER_ERROR_RATE; + + /** + * 默认持续错误熔断器 + */ + String DEFAULT_CIRCUITBREAKER_ERRCOUNT = DefaultPlugins.CIRCUIT_BREAKER_ERROR_COUNT; + + /** + * 默认健康探测手段,tcp + */ + String DEFAULT_HEALTH_CHECKER_TCP = "tcp"; + + /** + * 默认健康探测手段,udp + */ + String DEFAULT_HEALTH_CHECKER_UDP = "udp"; + + /** + * 默认健康探测手段,http + */ + String DEFAULT_HEALTH_CHECKER_HTTP = "http"; + + /** + * 默认权重值 + */ + int DEFAULT_WEIGHT = 100; + + /** + * 默认负载均衡放开限制的故障节点上限阈值 + */ + double DEFAULT_MAX_EJECT_PRECENT_THRESHOLD = 0.9; + + /** + * 默认最大重试次数,默认不重试 + */ + int DEFAULT_MAX_RETRY_TIMES = 0; + + /** + * 默认缓存最大写重试次数 + */ + int DEFAULT_PERSIST_MAX_WRITE_RETRY = 1; + + /** + * 默认缓存最大读重试次数 + */ + int DEFAULT_PERSIST_MAX_READ_RETRY = 0; + + /** + * 默认缓存持久化时间间隔 + */ + long DEFAULT_PERSIST_RETRY_INTERVAL_MS = 500; + + /** + * 默认健康探测周期 + */ + long DEFAULT_OUTLIER_DETECT_INTERVAL_MS = Duration.parse("PT10S").toMillis(); + + /** + * 最小定时任务轮询周期 + */ + long MIN_TIMING_INTERVAL_MS = 100; + + String DEFAULT_SYSTEM_NAMESPACE = "Polaris"; + String DEFAULT_SYSTEM_REFRESH_INTERVAL = "10m"; + + String DEFAULT_BUILTIN_DISCOVER = "polaris.builtin"; + + /** + * 默认连接器协议 + */ + String DEFAULT_DISCOVER_PROTOCOL = "grpc"; + + /** + * 默认启用本地缓存机制 + */ + boolean DEFAULT_PERSIST_ENABLE = true; + + String PATTERN_CONFIG_FILE = "%s#%s#%s.yaml"; } diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/factory/config/configuration/ConnectorConfigImpl.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/factory/config/configuration/ConnectorConfigImpl.java index 431f897c9..b7444260b 100644 --- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/factory/config/configuration/ConnectorConfigImpl.java +++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/factory/config/configuration/ConnectorConfigImpl.java @@ -10,6 +10,8 @@ import com.tencent.polaris.factory.util.ConfigUtils; import com.tencent.polaris.factory.util.TimeStrJsonDeserializer; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.*; + /** * 配置中心连接器配置 * @@ -17,122 +19,127 @@ */ public class ConnectorConfigImpl extends ServerConnectorConfigImpl implements ConnectorConfig { - @JsonProperty - private String connectorType; - - @JsonProperty - private Boolean persistEnable = true; - - @JsonProperty - private String persistDir; - - @JsonProperty - private Integer persistMaxWriteRetry = 1; - - @JsonProperty - private Integer persistMaxReadRetry = 0; - - @JsonProperty - private Boolean fallbackToLocalCache = true; - - @JsonProperty - @JsonDeserialize(using = TimeStrJsonDeserializer.class) - private Long persistRetryInterval = 1000L; - - @JsonProperty - private Integer configFileGroupThreadNum = 10; - - @Override - public void verify() { - ConfigUtils.validateString(connectorType, "configConnectorType"); - if (StringUtils.isBlank(persistDir)) { - persistDir = DefaultValues.CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR; - } - if (!DefaultValues.LOCAL_FILE_CONNECTOR_TYPE.equals(connectorType)) { - super.verify(); - } - } - - @Override - public void setDefault(Object defaultObject) { - if (defaultObject == null) { - return; - } - if (defaultObject instanceof ServerConnectorConfig) { - ServerConnectorConfig serverConnectorConfig = (ServerConnectorConfig) defaultObject; - super.setDefault(serverConnectorConfig); - } - if (defaultObject instanceof ConnectorConfig) { - ConnectorConfig connectorConfig = (ConnectorConfig) defaultObject; - if (connectorType == null) { - this.connectorType = connectorConfig.getConnectorType(); - } - } - } - - @Override - public String getConnectorType() { - return connectorType; - } - - public void setConnectorType(String connectorType) { - this.connectorType = connectorType; - } - - public Boolean getPersistEnable() { - return persistEnable; - } - - public void setPersistEnable(Boolean persistEnable) { - this.persistEnable = persistEnable; - } - - public String getPersistDir() { - return persistDir; - } - - public void setPersistDir(String persistDir) { - this.persistDir = persistDir; - } - - public Integer getPersistMaxWriteRetry() { - return persistMaxWriteRetry; - } - - public void setPersistMaxWriteRetry(Integer persistMaxWriteRetry) { - this.persistMaxWriteRetry = persistMaxWriteRetry; - } - - public Integer getPersistMaxReadRetry() { - return persistMaxReadRetry; - } - - public void setPersistMaxReadRetry(Integer persistMaxReadRetry) { - this.persistMaxReadRetry = persistMaxReadRetry; - } - - public Long getPersistRetryInterval() { - return persistRetryInterval; - } - - public void setPersistRetryInterval(Long persistRetryInterval) { - this.persistRetryInterval = persistRetryInterval; - } - - public Boolean getFallbackToLocalCache() { - return fallbackToLocalCache; - } - - public void setFallbackToLocalCache(Boolean fallbackToLocalCache) { - this.fallbackToLocalCache = fallbackToLocalCache; - } - - public Integer getConfigFileGroupThreadNum() { - return configFileGroupThreadNum; - } - - public void setConfigFileGroupThreadNum(Integer configFileGroupThreadNum) { - this.configFileGroupThreadNum = configFileGroupThreadNum; - } + @JsonProperty + private String connectorType; + + @JsonProperty + private Boolean persistEnable = true; + + @JsonProperty + private String persistDir; + + @JsonProperty + private Integer persistMaxWriteRetry = 1; + + @JsonProperty + private Integer persistMaxReadRetry = 0; + + @JsonProperty + private Boolean fallbackToLocalCache = true; + + @JsonProperty + @JsonDeserialize(using = TimeStrJsonDeserializer.class) + private Long persistRetryInterval = 1000L; + + @JsonProperty + private Integer configFileGroupThreadNum = 10; + + @Override + public void verify() { + ConfigUtils.validateString(connectorType, "configConnectorType"); + if (StringUtils.isBlank(persistDir)) { + persistDir = DefaultValues.CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR; + } + if (!StringUtils.equals(connectorType, POLARIS_FILE_CONNECTOR_TYPE) + && !StringUtils.equals(connectorType, LOCAL_FILE_CONNECTOR_TYPE) + && !StringUtils.equals(connectorType, CONSUL_FILE_CONNECTOR_TYPE)) { + throw new IllegalArgumentException(String.format("Unsupported config data source []%s", connectorType)); + } + if (!LOCAL_FILE_CONNECTOR_TYPE.equals(connectorType)) { + super.verify(); + } + } + + @Override + public void setDefault(Object defaultObject) { + if (defaultObject == null) { + return; + } + if (defaultObject instanceof ServerConnectorConfig) { + ServerConnectorConfig serverConnectorConfig = (ServerConnectorConfig) defaultObject; + super.setDefault(serverConnectorConfig); + } + if (defaultObject instanceof ConnectorConfig) { + ConnectorConfig connectorConfig = (ConnectorConfig) defaultObject; + if (connectorType == null) { + this.connectorType = connectorConfig.getConnectorType(); + } + } + } + + @Override + public String getConnectorType() { + return connectorType; + } + + public void setConnectorType(String connectorType) { + this.connectorType = connectorType; + } + + public Boolean getPersistEnable() { + return persistEnable; + } + + public void setPersistEnable(Boolean persistEnable) { + this.persistEnable = persistEnable; + } + + public String getPersistDir() { + return persistDir; + } + + public void setPersistDir(String persistDir) { + this.persistDir = persistDir; + } + + public Integer getPersistMaxWriteRetry() { + return persistMaxWriteRetry; + } + + public void setPersistMaxWriteRetry(Integer persistMaxWriteRetry) { + this.persistMaxWriteRetry = persistMaxWriteRetry; + } + + public Integer getPersistMaxReadRetry() { + return persistMaxReadRetry; + } + + public void setPersistMaxReadRetry(Integer persistMaxReadRetry) { + this.persistMaxReadRetry = persistMaxReadRetry; + } + + public Long getPersistRetryInterval() { + return persistRetryInterval; + } + + public void setPersistRetryInterval(Long persistRetryInterval) { + this.persistRetryInterval = persistRetryInterval; + } + + public Boolean getFallbackToLocalCache() { + return fallbackToLocalCache; + } + + public void setFallbackToLocalCache(Boolean fallbackToLocalCache) { + this.fallbackToLocalCache = fallbackToLocalCache; + } + + public Integer getConfigFileGroupThreadNum() { + return configFileGroupThreadNum; + } + + public void setConfigFileGroupThreadNum(Integer configFileGroupThreadNum) { + this.configFileGroupThreadNum = configFileGroupThreadNum; + } } diff --git a/polaris-configuration/polaris-configuration-client/src/main/java/com/tencent/polaris/configuration/client/internal/ConfigFilePersistentHandler.java b/polaris-configuration/polaris-configuration-client/src/main/java/com/tencent/polaris/configuration/client/internal/ConfigFilePersistentHandler.java index 67f2d61b6..cafae96f2 100644 --- a/polaris-configuration/polaris-configuration-client/src/main/java/com/tencent/polaris/configuration/client/internal/ConfigFilePersistentHandler.java +++ b/polaris-configuration/polaris-configuration-client/src/main/java/com/tencent/polaris/configuration/client/internal/ConfigFilePersistentHandler.java @@ -18,14 +18,18 @@ package com.tencent.polaris.configuration.client.internal; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.RandomAccessFile; -import java.io.UnsupportedEncodingException; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.tencent.polaris.api.plugin.configuration.ConfigFile; +import com.tencent.polaris.api.utils.ThreadPoolUtils; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.client.util.Utils; +import com.tencent.polaris.factory.util.FileUtils; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; +import org.yaml.snakeyaml.Yaml; + +import java.io.*; import java.net.URLEncoder; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -37,18 +41,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; -import com.tencent.polaris.api.config.verify.DefaultValues; -import com.tencent.polaris.client.api.SDKContext; -import com.tencent.polaris.client.util.NamedThreadFactory; -import com.tencent.polaris.client.util.Utils; -import com.tencent.polaris.api.plugin.configuration.ConfigFile; -import com.tencent.polaris.api.utils.ThreadPoolUtils; -import com.tencent.polaris.factory.util.FileUtils; -import com.tencent.polaris.logging.LoggerFactory; -import org.slf4j.Logger; -import org.yaml.snakeyaml.Yaml; - +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.LOCAL_FILE_CONNECTOR_TYPE; import static com.tencent.polaris.api.config.verify.DefaultValues.PATTERN_CONFIG_FILE; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; @@ -61,256 +54,247 @@ */ public class ConfigFilePersistentHandler { - private static final Logger LOG = LoggerFactory.getLogger(ConfigFilePersistentHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(ConfigFilePersistentHandler.class); - private final String persistDirPath; - private final int maxWriteRetry; - private final int maxReadRetry; - private final long retryInterval; - private final boolean isAllowPersist; - private final String connectorType; - private static final ExecutorService persistExecutor = Executors - .newSingleThreadExecutor(new NamedThreadFactory("configFile-persistent-handler")); + private final String persistDirPath; + private final int maxWriteRetry; + private final int maxReadRetry; + private final long retryInterval; + private final boolean isAllowPersist; + private final String connectorType; + private static final ExecutorService persistExecutor = Executors + .newSingleThreadExecutor(new NamedThreadFactory("configFile-persistent-handler")); - public ConfigFilePersistentHandler(SDKContext sdkContext) throws IOException { - String persistDir = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistDir(); - this.maxReadRetry = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistMaxReadRetry(); - this.maxWriteRetry = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistMaxWriteRetry(); - this.retryInterval = sdkContext.getConfig().getConfigFile().getServerConnector() - .getPersistRetryInterval(); - this.isAllowPersist = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistEnable(); - this.connectorType = sdkContext.getConfig().getConfigFile().getServerConnector().getConnectorType(); - this.persistDirPath = Utils.translatePath(persistDir); - FileUtils.dirPathCheck(this.persistDirPath); - } + public ConfigFilePersistentHandler(SDKContext sdkContext) throws IOException { + String persistDir = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistDir(); + this.maxReadRetry = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistMaxReadRetry(); + this.maxWriteRetry = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistMaxWriteRetry(); + this.retryInterval = sdkContext.getConfig().getConfigFile().getServerConnector() + .getPersistRetryInterval(); + this.isAllowPersist = sdkContext.getConfig().getConfigFile().getServerConnector().getPersistEnable(); + this.connectorType = sdkContext.getConfig().getConfigFile().getServerConnector().getConnectorType(); + this.persistDirPath = Utils.translatePath(persistDir); + FileUtils.dirPathCheck(this.persistDirPath); + } - private boolean isAllowPersistToFile() { - return isAllowPersist && !DefaultValues.LOCAL_FILE_CONNECTOR_TYPE.equals(connectorType); - } + private boolean isAllowPersistToFile() { + return isAllowPersist && !LOCAL_FILE_CONNECTOR_TYPE.equals(connectorType); + } - public void asyncDeleteConfigFile(ConfigFile configFile) { - if (!persistExecutor.isShutdown() && isAllowPersistToFile()) { - persistExecutor.execute(new DeleteTask(configFile)); - } - } + public void asyncDeleteConfigFile(ConfigFile configFile) { + if (!persistExecutor.isShutdown() && isAllowPersistToFile()) { + persistExecutor.execute(new DeleteTask(configFile)); + } + } - /** - * 删除服务缓存数据 - * - * @param configFile config metadata - */ - public void deleteFileConfig(ConfigFile configFile) { - String fileName = configFileToFileName(configFile); - String persistFilePath = persistDirPath + File.separator + fileName; - try { - Files.deleteIfExists(FileSystems.getDefault().getPath(persistFilePath)); - } - catch (IOException e) { - LOG.error("fail to delete cache file {}", persistFilePath); - } - String lockFileName = fileName + ".lock"; - String persistFileLockPath = persistDirPath + File.separator + lockFileName; - try { - Files.deleteIfExists(FileSystems.getDefault().getPath(persistFileLockPath)); - } - catch (IOException e) { - LOG.error("fail to delete cache lock file {}", persistFileLockPath); - } - } + /** + * 删除服务缓存数据 + * + * @param configFile config metadata + */ + public void deleteFileConfig(ConfigFile configFile) { + String fileName = configFileToFileName(configFile); + String persistFilePath = persistDirPath + File.separator + fileName; + try { + Files.deleteIfExists(FileSystems.getDefault().getPath(persistFilePath)); + } catch (IOException e) { + LOG.error("fail to delete cache file {}", persistFilePath); + } + String lockFileName = fileName + ".lock"; + String persistFileLockPath = persistDirPath + File.separator + lockFileName; + try { + Files.deleteIfExists(FileSystems.getDefault().getPath(persistFileLockPath)); + } catch (IOException e) { + LOG.error("fail to delete cache lock file {}", persistFileLockPath); + } + } - public void asyncSaveConfigFile(ConfigFile configFile) { - if (!persistExecutor.isShutdown() && isAllowPersistToFile()) { - persistExecutor.execute(new SaveTask(configFile)); - } - } + public void asyncSaveConfigFile(ConfigFile configFile) { + if (!persistExecutor.isShutdown() && isAllowPersistToFile()) { + persistExecutor.execute(new SaveTask(configFile)); + } + } - /** - * 持久化配置文件 - * - * @param configFile config file - */ - public void saveConfigFile(ConfigFile configFile) { - int retryTimes = 0; - LOG.info("start to save config file {}", configFile); - while (retryTimes <= maxWriteRetry) { - retryTimes++; - Path path = doSaveConfigFile(configFile); - if (null == path) { - continue; - } - LOG.info("end to save config file {} to {}", configFile, path); - return; - } - LOG.error("fail to persist config file {} after retry {}", configFile, retryTimes); - } + /** + * 持久化配置文件 + * + * @param configFile config file + */ + public void saveConfigFile(ConfigFile configFile) { + int retryTimes = 0; + LOG.info("start to save config file {}", configFile); + while (retryTimes <= maxWriteRetry) { + retryTimes++; + Path path = doSaveConfigFile(configFile); + if (null == path) { + continue; + } + LOG.info("end to save config file {} to {}", configFile, path); + return; + } + LOG.error("fail to persist config file {} after retry {}", configFile, retryTimes); + } - private static String configFileToFileName(ConfigFile configFile) { - try { - String encodedNamespace = URLEncoder.encode(configFile.getNamespace(), "UTF-8"); - String encodedFileGroup = URLEncoder.encode(configFile.getFileGroup(), "UTF-8"); - String encodeFileName = URLEncoder.encode(configFile.getFileName(), "UTF-8"); - return String.format(PATTERN_CONFIG_FILE, encodedNamespace, encodedFileGroup, encodeFileName); - } - catch (UnsupportedEncodingException e) { - throw new AssertionError("UTF-8 is unknown"); - } - } + private static String configFileToFileName(ConfigFile configFile) { + try { + String encodedNamespace = URLEncoder.encode(configFile.getNamespace(), "UTF-8"); + String encodedFileGroup = URLEncoder.encode(configFile.getFileGroup(), "UTF-8"); + String encodeFileName = URLEncoder.encode(configFile.getFileName(), "UTF-8"); + return String.format(PATTERN_CONFIG_FILE, encodedNamespace, encodedFileGroup, encodeFileName); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("UTF-8 is unknown"); + } + } - private void writeTmpFile(File persistTmpFile, File persistLockFile, ConfigFile configFile) throws IOException { - try (RandomAccessFile raf = new RandomAccessFile(persistLockFile, "rw"); - FileChannel channel = raf.getChannel()) { - FileLock lock = channel.tryLock(); - if (lock == null) { - throw new IOException("fail to lock file " + persistTmpFile - .getAbsolutePath() + ", ignore and retry later"); - } - //执行保存 - try { - doWriteTmpFile(persistTmpFile, configFile); - } - finally { - lock.release(); - } - } - } + private void writeTmpFile(File persistTmpFile, File persistLockFile, ConfigFile configFile) throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(persistLockFile, "rw"); + FileChannel channel = raf.getChannel()) { + FileLock lock = channel.tryLock(); + if (lock == null) { + throw new IOException("fail to lock file " + persistTmpFile + .getAbsolutePath() + ", ignore and retry later"); + } + //执行保存 + try { + doWriteTmpFile(persistTmpFile, configFile); + } finally { + lock.release(); + } + } + } - private void doWriteTmpFile(File persistTmpFile, ConfigFile configFile) throws IOException { - if (!persistTmpFile.exists()) { - if (!persistTmpFile.createNewFile()) { - LOG.warn("tmp file {} already exists", persistTmpFile.getAbsolutePath()); - } - } - try (FileOutputStream outputFile = new FileOutputStream(persistTmpFile)) { - String jsonAsYaml = new YAMLMapper().writeValueAsString(configFile); - outputFile.write(jsonAsYaml.getBytes(StandardCharsets.UTF_8)); - outputFile.flush(); - } - } + private void doWriteTmpFile(File persistTmpFile, ConfigFile configFile) throws IOException { + if (!persistTmpFile.exists()) { + if (!persistTmpFile.createNewFile()) { + LOG.warn("tmp file {} already exists", persistTmpFile.getAbsolutePath()); + } + } + try (FileOutputStream outputFile = new FileOutputStream(persistTmpFile)) { + String jsonAsYaml = new YAMLMapper().writeValueAsString(configFile); + outputFile.write(jsonAsYaml.getBytes(StandardCharsets.UTF_8)); + outputFile.flush(); + } + } - private Path doSaveConfigFile(ConfigFile configFile) { - String fileName = configFileToFileName(configFile); - String tmpFileName = fileName + ".tmp"; - String lockFileName = fileName + ".lock"; - String persistFilePathStr = persistDirPath + File.separator + fileName; - Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); - File persistTmpFile = new File(persistDirPath + File.separator + tmpFileName); - File persistLockFile = new File(persistDirPath + File.separator + lockFileName); - try { - if (!persistLockFile.exists()) { - if (!persistLockFile.createNewFile()) { - LOG.warn("lock file {} already exists", persistLockFile.getAbsolutePath()); - } - } - writeTmpFile(persistTmpFile, persistLockFile, configFile); - Files.move(FileSystems.getDefault().getPath(persistTmpFile.getAbsolutePath()), - persistPath, REPLACE_EXISTING, ATOMIC_MOVE); - } - catch (IOException e) { - LOG.error("fail to write file :" + persistTmpFile, e); - return null; - } - return persistPath.toAbsolutePath(); - } + private Path doSaveConfigFile(ConfigFile configFile) { + String fileName = configFileToFileName(configFile); + String tmpFileName = fileName + ".tmp"; + String lockFileName = fileName + ".lock"; + String persistFilePathStr = persistDirPath + File.separator + fileName; + Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); + File persistTmpFile = new File(persistDirPath + File.separator + tmpFileName); + File persistLockFile = new File(persistDirPath + File.separator + lockFileName); + try { + if (!persistLockFile.exists()) { + if (!persistLockFile.createNewFile()) { + LOG.warn("lock file {} already exists", persistLockFile.getAbsolutePath()); + } + } + writeTmpFile(persistTmpFile, persistLockFile, configFile); + Files.move(FileSystems.getDefault().getPath(persistTmpFile.getAbsolutePath()), + persistPath, REPLACE_EXISTING, ATOMIC_MOVE); + } catch (IOException e) { + LOG.error("fail to write file :" + persistTmpFile, e); + return null; + } + return persistPath.toAbsolutePath(); + } - /** - * 从缓存目录加载缓存的配置文件 - * - * @param configFile 配置文件 - * @return 配置文件 - */ - public ConfigFile loadPersistedConfigFile(ConfigFile configFile) { - String fileName = configFileToFileName(configFile); - String persistFilePathStr = persistDirPath + File.separator + fileName; - Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); - int retryTimes = 0; - ConfigFile resConfigFile = null; - while (retryTimes <= maxReadRetry) { - retryTimes++; - resConfigFile = loadConfigFile(persistPath.toFile(), configFile); - if (null == resConfigFile) { - Utils.sleepUninterrupted(retryInterval); - continue; - } - break; - } - if (null == resConfigFile) { - LOG.debug("fail to read config file from {} after retry {} times", fileName, retryTimes); - return null; - } - return resConfigFile; - } + /** + * 从缓存目录加载缓存的配置文件 + * + * @param configFile 配置文件 + * @return 配置文件 + */ + public ConfigFile loadPersistedConfigFile(ConfigFile configFile) { + String fileName = configFileToFileName(configFile); + String persistFilePathStr = persistDirPath + File.separator + fileName; + Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); + int retryTimes = 0; + ConfigFile resConfigFile = null; + while (retryTimes <= maxReadRetry) { + retryTimes++; + resConfigFile = loadConfigFile(persistPath.toFile(), configFile); + if (null == resConfigFile) { + Utils.sleepUninterrupted(retryInterval); + continue; + } + break; + } + if (null == resConfigFile) { + LOG.debug("fail to read config file from {} after retry {} times", fileName, retryTimes); + return null; + } + return resConfigFile; + } - private ConfigFile loadConfigFile(File persistFile, ConfigFile configFile) { - if (null == persistFile || !persistFile.exists()) { - return null; - } - InputStream inputStream = null; - InputStreamReader reader = null; - Yaml yaml = new Yaml(); - try { - inputStream = new FileInputStream(persistFile); - reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); - ConfigFile resConfigFile = new ConfigFile(configFile.getNamespace(), - configFile.getFileGroup(), configFile.getFileName()); - Map jsonMap = yaml.load(reader); - resConfigFile.setContent(jsonMap.get("content").toString()); - resConfigFile.setMd5(jsonMap.get("md5").toString()); - resConfigFile.setVersion(Long.valueOf(String.valueOf(jsonMap.get("version")))); - return resConfigFile; - } - catch (IOException e) { - LOG.warn("fail to read file :" + persistFile.getAbsoluteFile(), e); - return null; - } - finally { - if (null != reader) { - try { - reader.close(); - } - catch (IOException e) { - LOG.warn("fail to close reader for :" + persistFile.getAbsoluteFile(), e); - } - } - if (null != inputStream) { - try { - inputStream.close(); - } - catch (IOException e) { - LOG.warn("fail to close stream for :" + persistFile.getAbsoluteFile(), e); - } - } - } - } + private ConfigFile loadConfigFile(File persistFile, ConfigFile configFile) { + if (null == persistFile || !persistFile.exists()) { + return null; + } + InputStream inputStream = null; + InputStreamReader reader = null; + Yaml yaml = new Yaml(); + try { + inputStream = new FileInputStream(persistFile); + reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + ConfigFile resConfigFile = new ConfigFile(configFile.getNamespace(), + configFile.getFileGroup(), configFile.getFileName()); + Map jsonMap = yaml.load(reader); + resConfigFile.setContent(jsonMap.get("content").toString()); + resConfigFile.setMd5(jsonMap.get("md5").toString()); + resConfigFile.setVersion(Long.valueOf(String.valueOf(jsonMap.get("version")))); + return resConfigFile; + } catch (IOException e) { + LOG.warn("fail to read file :" + persistFile.getAbsoluteFile(), e); + return null; + } finally { + if (null != reader) { + try { + reader.close(); + } catch (IOException e) { + LOG.warn("fail to close reader for :" + persistFile.getAbsoluteFile(), e); + } + } + if (null != inputStream) { + try { + inputStream.close(); + } catch (IOException e) { + LOG.warn("fail to close stream for :" + persistFile.getAbsoluteFile(), e); + } + } + } + } - private class DeleteTask implements Runnable { + private class DeleteTask implements Runnable { - private ConfigFile configFile; + private ConfigFile configFile; - public DeleteTask(ConfigFile configFile) { - this.configFile = configFile; - } + public DeleteTask(ConfigFile configFile) { + this.configFile = configFile; + } - @Override - public void run() { - deleteFileConfig(configFile); - } - } + @Override + public void run() { + deleteFileConfig(configFile); + } + } - private class SaveTask implements Runnable { + private class SaveTask implements Runnable { - private ConfigFile configFile; + private ConfigFile configFile; - public SaveTask(ConfigFile configFile) { - this.configFile = configFile; - } + public SaveTask(ConfigFile configFile) { + this.configFile = configFile; + } - @Override - public void run() { - saveConfigFile(configFile); - } - } + @Override + public void run() { + saveConfigFile(configFile); + } + } - protected void doDestroy() { - ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{persistExecutor}); + protected void doDestroy() { + ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{persistExecutor}); } } diff --git a/polaris-configuration/polaris-configuration-factory/pom.xml b/polaris-configuration/polaris-configuration-factory/pom.xml index 801d00f9a..75aaa8057 100644 --- a/polaris-configuration/polaris-configuration-factory/pom.xml +++ b/polaris-configuration/polaris-configuration-factory/pom.xml @@ -38,6 +38,12 @@ ${project.version} + + com.tencent.polaris + consul-configuration-connector + ${project.version} + + com.tencent.polaris diff --git a/polaris-configuration/polaris-configuration-factory/src/test/resources/polaris.yml b/polaris-configuration/polaris-configuration-factory/src/test/resources/polaris.yml index e9b1bc8ca..e7d4edf52 100644 --- a/polaris-configuration/polaris-configuration-factory/src/test/resources/polaris.yml +++ b/polaris-configuration/polaris-configuration-factory/src/test/resources/polaris.yml @@ -22,7 +22,7 @@ consumer: config: serverConnector: connectorType: polaris - #connectorType: localFile + #connectorType: local #persistEnable: true #fallbackToLocalCache: true #persistDir: test/custom/config diff --git a/polaris-dependencies/pom.xml b/polaris-dependencies/pom.xml index b63cb5453..b1031f382 100644 --- a/polaris-dependencies/pom.xml +++ b/polaris-dependencies/pom.xml @@ -206,6 +206,12 @@ ${project.version} + + com.tencent.polaris + consul-configuration-connector + ${project.version} + + com.tencent.polaris diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/pom.xml b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/pom.xml new file mode 100644 index 000000000..8521b10ca --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/pom.xml @@ -0,0 +1,44 @@ + + + + polaris-plugins-configuration-connector + com.tencent.polaris + ${revision} + ../pom.xml + + 4.0.0 + + consul-configuration-connector + + + + com.tencent.polaris + connector-common + ${project.version} + + + com.ecwid.consul + consul-api + ${consul.version} + + + servlet-api + javax.servlet + + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java new file mode 100644 index 000000000..22f70fdb7 --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java @@ -0,0 +1,35 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.configuration.connector.consul; + +/** + * Constants for consul config. + * + * @author Haotian Zhang + */ +public interface ConsulConfigConstants { + + String WAIT_TIME_KEY = "waitTime"; + + String DELAY_KEY = "delay"; + + /** + * 空值CONSUL Index常量,设为-1L时,Consul会立即返回 + */ + Long EMPTY_VALUE_CONSUL_INDEX = -1L; +} diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java new file mode 100644 index 000000000..8a21317af --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java @@ -0,0 +1,87 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.configuration.connector.consul; + +import com.tencent.polaris.api.config.global.ServerConnectorConfig; + +/** + * Context of consul config server connector. + * + * @author Haotian Zhang + */ +public class ConsulConfigContext { + + private ServerConnectorConfig connectorConfig; + + /** + * The number of seconds to wait (or block) for watch query, defaults to 55. + * Needs to be less than default ConsulClient (defaults to 60). To increase + * ConsulClient timeout create a ConsulClient bean with a custom ConsulRawClient + * with a custom HttpClient. + */ + private int waitTime = 55; + + /** + * The value of the fixed delay for the watch in millis. Defaults to 1000. + */ + private int delay = 1000; + + private String aclToken = ""; + + public ServerConnectorConfig getConnectorConfig() { + return connectorConfig; + } + + public void setConnectorConfig(ServerConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + public int getWaitTime() { + return this.waitTime; + } + + public void setWaitTime(int waitTime) { + this.waitTime = waitTime; + } + + public int getDelay() { + return this.delay; + } + + public void setDelay(int delay) { + this.delay = delay; + } + + public String getAclToken() { + return aclToken; + } + + public void setAclToken(String aclToken) { + this.aclToken = aclToken; + } + + @Override + public String toString() { + return "ConsulConfigContext{" + + "connectorConfig=" + connectorConfig + + ", waitTime=" + waitTime + + ", delay=" + delay + + ", aclToken='" + aclToken + '\'' + + '}'; + } +} diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java new file mode 100644 index 000000000..22544d6f1 --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -0,0 +1,336 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.configuration.connector.consul; + +import com.ecwid.consul.v1.*; +import com.ecwid.consul.v1.kv.model.GetValue; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; +import com.tencent.polaris.api.exception.ServerErrorResponseException; +import com.tencent.polaris.api.plugin.PluginType; +import com.tencent.polaris.api.plugin.common.InitContext; +import com.tencent.polaris.api.plugin.common.PluginTypes; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.configuration.ConfigFile; +import com.tencent.polaris.api.plugin.configuration.ConfigFileConnector; +import com.tencent.polaris.api.plugin.configuration.ConfigFileResponse; +import com.tencent.polaris.api.plugin.configuration.ConfigPublishFile; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.factory.config.configuration.ConnectorConfigImpl; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.configuration.connector.consul.utils.ConsulConfigFileUtils; +import com.tencent.polaris.specification.api.v1.model.CodeProto; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.CONSUL_FILE_CONNECTOR_TYPE; + +/** + * Consul config file connector. + * + * @author Haotian Zhang + */ +public class ConsulConfigFileConnector implements ConfigFileConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigFileConnector.class); + + /** + * If server connector initialized. + */ + private boolean initialized = false; + + private final AtomicBoolean running = new AtomicBoolean(false); + + private ConsulClient consulClient; + + private ScheduledExecutorService scheduledExecutorService; + + private final Map> watchFutures = new ConcurrentHashMap<>(); + + public final Map consulIndexes = new ConcurrentHashMap<>(); + + public final Map consulModifyIndexes = new ConcurrentHashMap<>(); + + private ConsulConfigContext consulConfigContext; + + /** + * config file changed queue + */ + private final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1024); + + @Override + public void init(InitContext ctx) throws PolarisException { + if (!initialized) { + // init consul client + ConnectorConfigImpl connectorConfig = ctx.getConfig().getConfigFile().getServerConnector(); + String address = connectorConfig.getAddresses().get(0); + int lastIndex = address.lastIndexOf(":"); + String agentHost = address.substring(0, lastIndex); + int agentPort = Integer.parseInt(address.substring(lastIndex + 1)); + LOGGER.info("Connect to consul config server : [{}].", address); + consulClient = new ConsulClient(new ConsulRawClient(agentHost, agentPort)); + + // init consul config context. + consulConfigContext = new ConsulConfigContext(); + Map metadata = connectorConfig.getMetadata(); + if (CollectionUtils.isNotEmpty(metadata)) { + String waitTimeStr = metadata.get(ConsulConfigConstants.WAIT_TIME_KEY); + if (StringUtils.isNotBlank(waitTimeStr)) { + try { + int waitTime = Integer.parseInt(waitTimeStr); + consulConfigContext.setWaitTime(waitTime); + } catch (Exception e) { + LOGGER.warn("wait time string {} is not integer.", waitTimeStr, e); + } + } + + String delayStr = metadata.get(ConsulConfigConstants.DELAY_KEY); + if (StringUtils.isNotBlank(delayStr)) { + try { + int delay = Integer.parseInt(delayStr); + consulConfigContext.setDelay(delay); + } catch (Exception e) { + LOGGER.warn("delay string {} is not integer.", delayStr, e); + } + } + + String tokenStr = connectorConfig.getToken(); + if (StringUtils.isNotBlank(tokenStr)) { + consulConfigContext.setAclToken(tokenStr); + } + } + + // init watch executor. + this.scheduledExecutorService = Executors.newScheduledThreadPool(8, new NamedThreadFactory("consul-configuration-watch")); + + initialized = true; + LOGGER.info("Consul config file connector is initialized."); + } else { + LOGGER.warn("Consul config file connector is already initialized."); + } + } + + @Override + public ConfigFileResponse getConfigFile(ConfigFile configFile) { + if (this.running.get()) { + String keyPrefix = ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile); + if (!watchFutures.containsKey(keyPrefix)) { + this.watchFutures.put(keyPrefix, this.scheduledExecutorService.scheduleWithFixedDelay(() -> { + try { + ConfigFileResponse configFileResponse = getKVValues(configFile, keyPrefix); + if (configFileResponse != null && configFileResponse.getCode() == CodeProto.Code.ExecuteSuccess.getNumber()) { + blockingQueue.offer(new RefreshEventData(keyPrefix, configFile, configFileResponse)); + } else { + if (configFileResponse != null) { + LOGGER.debug("Watch consul config '{}' with {}.", keyPrefix, configFileResponse.getMessage()); + } else { + LOGGER.debug("Watch consul config '{}' do nothing.", keyPrefix); + } + } + } catch (Exception exception) { + LOGGER.error("Watch consul config '{}' failed.", keyPrefix, exception); + } + }, consulConfigContext.getDelay(), consulConfigContext.getDelay(), TimeUnit.MICROSECONDS)); + } + return getKVValues(configFile, keyPrefix); + } + return new ConfigFileResponse(ServerCodes.NOT_FOUND_RESOURCE, "config file not found.", null); + } + + private ConfigFileResponse getKVValues(ConfigFile configFile, String keyPrefix) { + // 使用default值逻辑处理 + Long currentIndex = this.consulIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + Long currentModifyIndex = this.consulModifyIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + + // use the consul ACL token if found + String aclToken = consulConfigContext.getAclToken(); + if (StringUtils.isEmpty(aclToken)) { + aclToken = null; + } + + try { + Response> response = this.consulClient.getKVValues(keyPrefix, aclToken, + new QueryParams(consulConfigContext.getWaitTime(), currentIndex)); + ConfigFile resultConfigFile = new ConfigFile(configFile.getNamespace(), configFile.getFileGroup(), configFile.getFileName()); + return handleResponse(resultConfigFile, keyPrefix, currentIndex, currentModifyIndex, response); + } catch (OperationException operationException) { + handleOperationException(keyPrefix, currentIndex, currentModifyIndex, operationException); + } catch (Exception exception) { + handleException(keyPrefix, currentIndex, currentModifyIndex, exception); + } + return null; + } + + private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefix, Long currentIndex, + Long currentModifyIndex, Response> response) { + if (response.getValue() == null) { + LOGGER.warn("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); + return new ConfigFileResponse(CodeProto.Code.NotFoundResource.getNumber(), "config file not found.", null); + } + + LOGGER.debug("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); + int code = CodeProto.Code.ExecuteSuccess.getNumber(); + String message = "execute success"; + if (response.getValue() != null) { + Long newIndex = response.getConsulIndex(); + // 新增一个ModifyIndex用于记录实际内容变更 + Long newModifyIndex = CollectionUtils.isEmpty(response.getValue()) ? ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX + : response.getValue().get(0).getModifyIndex(); + // 由于精确监听,直接判断newIndex与currentIndex一致性 + if (newIndex != null && !newIndex.equals(currentIndex)) { + // 根据currentModifyIndex和newModifyIndex判断内容是否实际发生了变化 + if (!newModifyIndex.equals(currentModifyIndex)) { + LOGGER.info("KeyPrefix '{}' has new index {} and modify index {} with old index {} and old modify index {}", + keyPrefix, newIndex, newModifyIndex, currentIndex, currentModifyIndex); + + } else if (LOGGER.isDebugEnabled()) { + code = CodeProto.Code.DataNoChange.getNumber(); + message = "config data is no change"; + LOGGER.debug("KeyPrefix '{}' not modified with new index {}, index {} and modify index {}", + keyPrefix, newIndex, currentIndex, currentModifyIndex); + } + // 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry + this.consulIndexes.put(keyPrefix, newIndex); + } else if (LOGGER.isDebugEnabled()) { + code = CodeProto.Code.DataNoChange.getNumber(); + message = "config data is no change"; + LOGGER.debug("KeyPrefix '{}' unchanged with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); + } + } + transferFromGetValueList(configFile, response.getValue()); + return new ConfigFileResponse(code, message, configFile); + } + + private void transferFromGetValueList(ConfigFile configFile, List getValueList) { + if (CollectionUtils.isEmpty(getValueList)) { + return; + } + + // 只取第一个 + GetValue firstValue = getValueList.get(0); + String decodedValue = firstValue.getDecodedValue(); + configFile.setContent(decodedValue); + configFile.setMd5(DigestUtils.md5Hex(decodedValue)); + configFile.setVersion(firstValue.getModifyIndex()); + } + + private void handleOperationException(String keyPrefix, Long currentIndex, + Long currentModifyIndex, OperationException operationException) { + LOGGER.error("KeyPrefix '{}' with operation exception with index {} and modify index {}.", + keyPrefix, currentIndex, currentModifyIndex, operationException); + throw ServerErrorResponseException.build(CodeProto.Code.ExecuteException.getNumber(), operationException.toString()); + } + + private void handleException(String keyPrefix, Long currentIndex, Long currentModifyIndex, Exception exception) { + LOGGER.error("KeyPrefix '{}' with exception with index {} and modify index {}.", + keyPrefix, currentIndex, currentModifyIndex, exception); + throw ServerErrorResponseException.build(CodeProto.Code.ExecuteException.getNumber(), exception.toString()); + } + + @Override + public ConfigFileResponse watchConfigFiles(List configFiles) { + try { + while (true) { + RefreshEventData refreshEventData = blockingQueue.take(); + Optional optional = configFiles.stream() + .filter(configFile -> StringUtils.equals(refreshEventData.getKeyPrefix(), ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile))) + .findFirst(); + if (optional.isPresent()) { + return refreshEventData.getConfigFileResponse(); + } + } + } catch (InterruptedException e) { + LOGGER.warn("watch consul config file interrupt.", e); + } + return null; + } + + @Override + public String getName() { + return CONSUL_FILE_CONNECTOR_TYPE; + } + + @Override + public PluginType getType() { + return PluginTypes.CONFIG_FILE_CONNECTOR.getBaseType(); + } + + @Override + public void postContextInit(Extensions extensions) throws PolarisException { + if (!this.running.compareAndSet(false, true)) { + throw new PolarisException(ErrorCode.PLUGIN_ERROR, "start consul config file connector failed."); + } + } + + @Override + public void destroy() { + if (this.running.compareAndSet(true, false)) { + // cancel watch future. + if (CollectionUtils.isEmpty(this.watchFutures)) { + for (ScheduledFuture watchFuture : this.watchFutures.values()) { + watchFuture.cancel(true); + } + } + + // shutdown scheduledExecutorService. + if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) { + this.scheduledExecutorService.shutdown(); + try { + if (!this.scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS)) { + this.scheduledExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOGGER.error("shutdown scheduledExecutorService failed.", e); + this.scheduledExecutorService.shutdownNow(); + } + } + LOGGER.info("Consul config file connector is destroyed."); + } else { + LOGGER.info("Consul config file connector is not in running state."); + } + } + + @Override + public ConfigFileResponse createConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support createConfigFile"); + } + + @Override + public ConfigFileResponse updateConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support updateConfigFile"); + } + + @Override + public ConfigFileResponse releaseConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support releaseConfigFile"); + } + + @Override + public ConfigFileResponse upsertAndPublishConfigFile(ConfigPublishFile request) { + throw new UnsupportedOperationException("not support upsertAndPublishConfigFile"); + } +} diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/RefreshEventData.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/RefreshEventData.java new file mode 100644 index 000000000..3f166c798 --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/RefreshEventData.java @@ -0,0 +1,74 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.configuration.connector.consul; + +import com.tencent.polaris.api.plugin.configuration.ConfigFile; +import com.tencent.polaris.api.plugin.configuration.ConfigFileResponse; + +import java.util.Objects; + +/** + * @author Haotian Zhang + */ +public class RefreshEventData { + private final String keyPrefix; + + private final ConfigFile configFile; + + private final ConfigFileResponse configFileResponse; + + public RefreshEventData(String keyPrefix, ConfigFile configFile, ConfigFileResponse configFileResponse) { + this.keyPrefix = keyPrefix; + this.configFile = configFile; + this.configFileResponse = configFileResponse; + } + + public String getKeyPrefix() { + return keyPrefix; + } + + public ConfigFile getConfigFile() { + return configFile; + } + + public ConfigFileResponse getConfigFileResponse() { + return configFileResponse; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RefreshEventData that = (RefreshEventData) o; + return Objects.equals(keyPrefix, that.keyPrefix) && Objects.equals(configFile, that.configFile) && Objects.equals(configFileResponse, that.configFileResponse); + } + + @Override + public int hashCode() { + return Objects.hash(keyPrefix, configFile, configFileResponse); + } + + @Override + public String toString() { + return "RefreshEventData{" + + "keyPrefix='" + keyPrefix + '\'' + + ", configFile=" + configFile + + ", configFileResponse=" + configFileResponse + + '}'; + } +} diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/utils/ConsulConfigFileUtils.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/utils/ConsulConfigFileUtils.java new file mode 100644 index 000000000..5efef6729 --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/utils/ConsulConfigFileUtils.java @@ -0,0 +1,40 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.configuration.connector.consul.utils; + +import com.tencent.polaris.api.plugin.configuration.ConfigFile; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + +/** + * @author Haotian Zhang + */ +public class ConsulConfigFileUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigFileUtils.class); + + public static String toConsulKVKeyPrefix(ConfigFile configFile) { + String key = "/" + configFile.getNamespace() + + "/" + configFile.getFileGroup() + + "/" + configFile.getFileName(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Consul config file key: {}", key); + } + return key; + } +} diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.configuration.ConfigFileConnector b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.configuration.ConfigFileConnector new file mode 100644 index 000000000..fc4a4ed5a --- /dev/null +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.configuration.ConfigFileConnector @@ -0,0 +1 @@ +com.tencent.polaris.plugins.configuration.connector.consul.ConsulConfigFileConnector diff --git a/polaris-plugins/polaris-plugins-configuration-connector/local-file-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/localfile/LocalFileConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/local-file-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/localfile/LocalFileConfigFileConnector.java index e6f76fab0..b61485e0f 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/local-file-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/localfile/LocalFileConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/local-file-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/localfile/LocalFileConfigFileConnector.java @@ -17,32 +17,6 @@ package com.tencent.polaris.plugins.configuration.connector.localfile; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.nio.file.ClosedWatchServiceException; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import com.tencent.polaris.api.config.verify.DefaultValues; import com.tencent.polaris.api.exception.ErrorCode; import com.tencent.polaris.api.exception.PolarisException; @@ -62,251 +36,251 @@ import org.slf4j.Logger; import org.yaml.snakeyaml.Yaml; -import static com.tencent.polaris.api.config.verify.DefaultValues.LOCAL_FILE_CONNECTOR_TYPE; +import java.io.*; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.*; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.LOCAL_FILE_CONNECTOR_TYPE; import static com.tencent.polaris.api.config.verify.DefaultValues.PATTERN_CONFIG_FILE; -import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; -import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; -import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.*; /** * @author DoubleLuXu 2022-09-20 */ public class LocalFileConfigFileConnector implements ConfigFileConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileConfigFileConnector.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileConfigFileConnector.class); - private ExecutorService executorService; - private WatchService watcher; - private String persistDirPath; - private Path dir; - /** - * config file changed queue - */ - private BlockingQueue blockingQueue = new ArrayBlockingQueue(1024); + private ExecutorService executorService; + private WatchService watcher; + private String persistDirPath; + private Path dir; + /** + * config file changed queue + */ + private BlockingQueue blockingQueue = new ArrayBlockingQueue(1024); - @Override - public void init(InitContext ctx) throws PolarisException { - if (!Objects.equals(ctx.getConfig().getConfigFile().getServerConnector().getConnectorType(), LOCAL_FILE_CONNECTOR_TYPE)) { - return; - } - String dirPath = ctx.getConfig().getConfigFile().getServerConnector().getPersistDir(); - if (StringUtils.isBlank(dirPath)) { - dirPath = DefaultValues.CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR; - } - this.persistDirPath = Utils.translatePath(dirPath); - this.dir = Paths.get(this.persistDirPath); - try { - FileUtils.dirPathCheck(this.persistDirPath); - } - catch (IOException ex) { - throw new PolarisException(ErrorCode.INVALID_CONFIG, ex.getMessage(), ex); - } - this.executorService = Executors.newSingleThreadExecutor(); - try { - this.watcher = FileSystems.getDefault().newWatchService(); - this.dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); - LOGGER.info("init local file config connector,watch dir:[{}].", persistDirPath); - } - catch (IOException e) { - LOGGER.error("file system watch path: " + persistDirPath + " error.", e); - throw new PolarisException(ErrorCode.UNKNOWN_SERVER_ERROR, ""); - } - watchFileChange(); - } + @Override + public void init(InitContext ctx) throws PolarisException { + if (!Objects.equals(ctx.getConfig().getConfigFile().getServerConnector().getConnectorType(), LOCAL_FILE_CONNECTOR_TYPE)) { + return; + } + String dirPath = ctx.getConfig().getConfigFile().getServerConnector().getPersistDir(); + if (StringUtils.isBlank(dirPath)) { + dirPath = DefaultValues.CONFIG_FILE_DEFAULT_CACHE_PERSIST_DIR; + } + this.persistDirPath = Utils.translatePath(dirPath); + this.dir = Paths.get(this.persistDirPath); + try { + FileUtils.dirPathCheck(this.persistDirPath); + } catch (IOException ex) { + throw new PolarisException(ErrorCode.INVALID_CONFIG, ex.getMessage(), ex); + } + this.executorService = Executors.newSingleThreadExecutor(); + try { + this.watcher = FileSystems.getDefault().newWatchService(); + this.dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + LOGGER.info("init local file config connector,watch dir:[{}].", persistDirPath); + } catch (IOException e) { + LOGGER.error("file system watch path: " + persistDirPath + " error.", e); + throw new PolarisException(ErrorCode.UNKNOWN_SERVER_ERROR, ""); + } + watchFileChange(); + } - @Override - public ConfigFileResponse getConfigFile(ConfigFile configFile) { - String configFileName = configFileToFileName(configFile); - ConfigFile configFileRes = loadConfigFile(configFileName); - if (configFileRes != null) { - return new ConfigFileResponse(ServerCodes.EXECUTE_SUCCESS, "success.", configFileRes); - } - return new ConfigFileResponse(ServerCodes.NOT_FOUND_RESOURCE, "config file not found.", null); - } + @Override + public ConfigFileResponse getConfigFile(ConfigFile configFile) { + String configFileName = configFileToFileName(configFile); + ConfigFile configFileRes = loadConfigFile(configFileName); + if (configFileRes != null) { + return new ConfigFileResponse(ServerCodes.EXECUTE_SUCCESS, "success.", configFileRes); + } + return new ConfigFileResponse(ServerCodes.NOT_FOUND_RESOURCE, "config file not found.", null); + } - @Override - public ConfigFileResponse watchConfigFiles(List configFiles) { - try { - while (true) { - ConfigFileChange configFileChange = blockingQueue.take(); - Optional optional = configFiles.stream().filter(item -> configFileToFileName(item) - .equals(configFileChange.getFileName())).findFirst(); - if (optional.isPresent()) { - return getConfigFile(optional.get()); - } - } - } - catch (InterruptedException e) { - LOGGER.warn("config file watch interrupt " + e.getMessage()); - } - return null; - } + @Override + public ConfigFileResponse watchConfigFiles(List configFiles) { + try { + while (true) { + ConfigFileChange configFileChange = blockingQueue.take(); + Optional optional = configFiles.stream().filter(item -> configFileToFileName(item) + .equals(configFileChange.getFileName())).findFirst(); + if (optional.isPresent()) { + return getConfigFile(optional.get()); + } + } + } catch (InterruptedException e) { + LOGGER.warn("config file watch interrupt " + e.getMessage()); + } + return null; + } - @Override - public ConfigFileResponse createConfigFile(ConfigFile configFile) { - throw new UnsupportedOperationException("not support createConfigFile"); - } + @Override + public ConfigFileResponse createConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support createConfigFile"); + } - @Override - public ConfigFileResponse updateConfigFile(ConfigFile configFile) { - throw new UnsupportedOperationException("not support updateConfigFile"); - } + @Override + public ConfigFileResponse updateConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support updateConfigFile"); + } - @Override - public ConfigFileResponse releaseConfigFile(ConfigFile configFile) { - throw new UnsupportedOperationException("not support releaseConfigFile"); - } + @Override + public ConfigFileResponse releaseConfigFile(ConfigFile configFile) { + throw new UnsupportedOperationException("not support releaseConfigFile"); + } - @Override - public ConfigFileResponse upsertAndPublishConfigFile(ConfigPublishFile request) { - throw new UnsupportedOperationException("not support upsertAndPublishConfigFile"); - } + @Override + public ConfigFileResponse upsertAndPublishConfigFile(ConfigPublishFile request) { + throw new UnsupportedOperationException("not support upsertAndPublishConfigFile"); + } - @Override - public String getName() { - return LOCAL_FILE_CONNECTOR_TYPE; - } + @Override + public String getName() { + return LOCAL_FILE_CONNECTOR_TYPE; + } - @Override - public PluginType getType() { - return PluginTypes.CONFIG_FILE_CONNECTOR.getBaseType(); - } + @Override + public PluginType getType() { + return PluginTypes.CONFIG_FILE_CONNECTOR.getBaseType(); + } - @Override - public void postContextInit(Extensions extensions) throws PolarisException { + @Override + public void postContextInit(Extensions extensions) throws PolarisException { - } + } - @Override - public void destroy() { - if (this.watcher != null) { - try { - this.watcher.close(); - LOGGER.info("watcher close success."); - } - catch (IOException e) { - LOGGER.error("watcher close error.", e); - } - } - if (this.executorService != null) { - this.executorService.shutdown(); - } - } + @Override + public void destroy() { + if (this.watcher != null) { + try { + this.watcher.close(); + LOGGER.info("watcher close success."); + } catch (IOException e) { + LOGGER.error("watcher close error.", e); + } + } + if (this.executorService != null) { + this.executorService.shutdown(); + } + } - private void watchFileChange() { - executorService.execute(() -> { - while (true) { - WatchKey key = null; - try { - key = watcher.take(); - } - catch (InterruptedException e) { - LOGGER.error("file watcher take interrupted.", e); - } - catch (ClosedWatchServiceException e) { - LOGGER.warn("file watcher closed.", e); - return; - } - List> watchEvents = key.pollEvents(); - for (WatchEvent event : watchEvents) { - LOGGER.info("watched file event:{}:{}/{}.", event.kind(), this.dir.toAbsolutePath(), - event.context()); - if (StandardWatchEventKinds.ENTRY_CREATE == event.kind()) { - blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.CREATE, - event.context().toString())); - } - if (StandardWatchEventKinds.ENTRY_MODIFY == event.kind()) { - blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.UPDATE, - event.context().toString())); - } - if (StandardWatchEventKinds.ENTRY_DELETE == event.kind()) { - blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.DELETE, - event.context().toString())); - } - } - key.reset(); - } - }); - } + private void watchFileChange() { + executorService.execute(() -> { + while (true) { + WatchKey key = null; + try { + key = watcher.take(); + } catch (InterruptedException e) { + LOGGER.error("file watcher take interrupted.", e); + } catch (ClosedWatchServiceException e) { + LOGGER.warn("file watcher closed.", e); + return; + } + List> watchEvents = key.pollEvents(); + for (WatchEvent event : watchEvents) { + LOGGER.info("watched file event:{}:{}/{}.", event.kind(), this.dir.toAbsolutePath(), + event.context()); + if (StandardWatchEventKinds.ENTRY_CREATE == event.kind()) { + blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.CREATE, + event.context().toString())); + } + if (StandardWatchEventKinds.ENTRY_MODIFY == event.kind()) { + blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.UPDATE, + event.context().toString())); + } + if (StandardWatchEventKinds.ENTRY_DELETE == event.kind()) { + blockingQueue.offer(new ConfigFileChange(ConfigFileChange.ChangeType.DELETE, + event.context().toString())); + } + } + key.reset(); + } + }); + } - public ConfigFile loadConfigFile(String fileName) { - String persistFilePathStr = persistDirPath + File.separator + fileName; - Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); - File persistFile = persistPath.toFile(); - if (null == persistFile || !persistFile.exists()) { - return null; - } - return loadConfigFile(persistPath.toFile(), fileNameToConfigFile(fileName)); - } + public ConfigFile loadConfigFile(String fileName) { + String persistFilePathStr = persistDirPath + File.separator + fileName; + Path persistPath = FileSystems.getDefault().getPath(persistFilePathStr); + File persistFile = persistPath.toFile(); + if (null == persistFile || !persistFile.exists()) { + return null; + } + return loadConfigFile(persistPath.toFile(), fileNameToConfigFile(fileName)); + } - private static final String CACHE_SUFFIX = ".yaml"; + private static final String CACHE_SUFFIX = ".yaml"; - private static String configFileToFileName(ConfigFile configFile) { - try { - String encodedNamespace = URLEncoder.encode(configFile.getNamespace(), "UTF-8"); - String encodedFileGroup = URLEncoder.encode(configFile.getFileGroup(), "UTF-8"); - String encodeFileName = URLEncoder.encode(configFile.getFileName(), "UTF-8"); - return String.format(PATTERN_CONFIG_FILE, encodedNamespace, encodedFileGroup, encodeFileName); - } - catch (UnsupportedEncodingException e) { - throw new AssertionError("UTF-8 is unknown"); - } - } + private static String configFileToFileName(ConfigFile configFile) { + try { + String encodedNamespace = URLEncoder.encode(configFile.getNamespace(), "UTF-8"); + String encodedFileGroup = URLEncoder.encode(configFile.getFileGroup(), "UTF-8"); + String encodeFileName = URLEncoder.encode(configFile.getFileName(), "UTF-8"); + return String.format(PATTERN_CONFIG_FILE, encodedNamespace, encodedFileGroup, encodeFileName); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("UTF-8 is unknown"); + } + } - private ConfigFile fileNameToConfigFile(String fileName) { - fileName = fileName.substring(0, fileName.length() - CACHE_SUFFIX.length()); - String[] pieces = fileName.split("#"); - try { - String namespace = URLDecoder.decode(pieces[0], "UTF-8"); - String fileGroup = URLDecoder.decode(pieces[1], "UTF-8"); - String configFileName = URLDecoder.decode(pieces[2], "UTF-8"); - return new ConfigFile(namespace, fileGroup, configFileName); - } - catch (UnsupportedEncodingException e) { - throw new AssertionError("UTF-8 is unknown"); - } - } + private ConfigFile fileNameToConfigFile(String fileName) { + fileName = fileName.substring(0, fileName.length() - CACHE_SUFFIX.length()); + String[] pieces = fileName.split("#"); + try { + String namespace = URLDecoder.decode(pieces[0], "UTF-8"); + String fileGroup = URLDecoder.decode(pieces[1], "UTF-8"); + String configFileName = URLDecoder.decode(pieces[2], "UTF-8"); + return new ConfigFile(namespace, fileGroup, configFileName); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("UTF-8 is unknown"); + } + } - private ConfigFile loadConfigFile(File persistFile, ConfigFile configFile) { - if (null == persistFile || !persistFile.exists()) { - return null; - } - InputStream inputStream = null; - InputStreamReader reader = null; - Yaml yaml = new Yaml(); - try { - inputStream = new FileInputStream(persistFile); - reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); - ConfigFile resConfigFile = new ConfigFile(configFile.getNamespace(), - configFile.getFileGroup(), configFile.getFileName()); - Map jsonMap = yaml.load(reader); - resConfigFile.setContent(jsonMap.get("content").toString()); - resConfigFile.setMd5(jsonMap.get("md5").toString()); - resConfigFile.setVersion(Long.valueOf(String.valueOf(jsonMap.get("version")))); - return resConfigFile; - } - catch (IOException e) { - LOGGER.warn("fail to read file :" + persistFile.getAbsoluteFile(), e); - return null; - } - finally { - if (null != reader) { - try { - reader.close(); - } - catch (IOException e) { - LOGGER.warn("fail to close reader for :" + persistFile.getAbsoluteFile(), e); - } - } - if (null != inputStream) { - try { - inputStream.close(); - } - catch (IOException e) { - LOGGER.warn("fail to close stream for :" + persistFile.getAbsoluteFile(), e); - } - } - } - } + private ConfigFile loadConfigFile(File persistFile, ConfigFile configFile) { + if (null == persistFile || !persistFile.exists()) { + return null; + } + InputStream inputStream = null; + InputStreamReader reader = null; + Yaml yaml = new Yaml(); + try { + inputStream = new FileInputStream(persistFile); + reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + ConfigFile resConfigFile = new ConfigFile(configFile.getNamespace(), + configFile.getFileGroup(), configFile.getFileName()); + Map jsonMap = yaml.load(reader); + resConfigFile.setContent(jsonMap.get("content").toString()); + resConfigFile.setMd5(jsonMap.get("md5").toString()); + resConfigFile.setVersion(Long.valueOf(String.valueOf(jsonMap.get("version")))); + return resConfigFile; + } catch (IOException e) { + LOGGER.warn("fail to read file :" + persistFile.getAbsoluteFile(), e); + return null; + } finally { + if (null != reader) { + try { + reader.close(); + } catch (IOException e) { + LOGGER.warn("fail to close reader for :" + persistFile.getAbsoluteFile(), e); + } + } + if (null != inputStream) { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.warn("fail to close stream for :" + persistFile.getAbsoluteFile(), e); + } + } + } + } } diff --git a/polaris-plugins/polaris-plugins-configuration-connector/polaris-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/polaris/PolarisConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/polaris-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/polaris/PolarisConfigFileConnector.java index 5a01fc67a..dbdd47102 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/polaris-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/polaris/PolarisConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/polaris-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/polaris/PolarisConfigFileConnector.java @@ -38,6 +38,8 @@ import java.util.List; +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.POLARIS_FILE_CONNECTOR_TYPE; + /** * @author lepdou 2022-03-02 */ @@ -237,7 +239,7 @@ public ConfigFileResponse upsertAndPublishConfigFile(ConfigPublishFile request) @Override public String getName() { - return "polaris"; + return POLARIS_FILE_CONNECTOR_TYPE; } private ConfigFile transferFromDTO(ConfigFileProto.ClientConfigFileInfo configFileDTO) { diff --git a/polaris-plugins/polaris-plugins-configuration-connector/pom.xml b/polaris-plugins/polaris-plugins-configuration-connector/pom.xml index 125c24214..93291577e 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/pom.xml +++ b/polaris-plugins/polaris-plugins-configuration-connector/pom.xml @@ -18,6 +18,7 @@ polaris-configuration-connector local-file-configuration-connector + consul-configuration-connector From 485b679d811debc16f80f7aaacc0a1e61977e136 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Fri, 24 May 2024 19:55:56 +0800 Subject: [PATCH 2/9] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0equalsIgnoreCase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../polaris/api/utils/StringUtils.java | 10 ++--- .../polaris/api/utils/StringUtilsTest.java | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/StringUtils.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/StringUtils.java index 8c6af0ac2..8f96f1cbf 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/StringUtils.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/StringUtils.java @@ -17,11 +17,7 @@ package com.tencent.polaris.api.utils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.StringJoiner; +import java.util.*; public class StringUtils { @@ -54,6 +50,10 @@ public static boolean equals(String str1, String str2) { return Objects.equals(str1, str2); } + public static boolean equalsIgnoreCase(String str1, String str2) { + return equals(str1, str2) || (str1 != null && str1.equalsIgnoreCase(str2)); + } + public static boolean isEmpty(String str) { return str == null || str.length() == 0; } diff --git a/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java b/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java new file mode 100644 index 000000000..75d0869b4 --- /dev/null +++ b/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.api.utils; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link StringUtils} + * + * @author Haotian Zhang + */ +public class StringUtilsTest { + + @Test + public void testIsEmpty() { + assertThat(StringUtils.equalsIgnoreCase(null, null)).isTrue(); + assertThat(StringUtils.equalsIgnoreCase(null, "abc")).isFalse(); + assertThat(StringUtils.equalsIgnoreCase("abc", null)).isFalse(); + assertThat(StringUtils.equalsIgnoreCase("abc", "abc")).isTrue(); + assertThat(StringUtils.equalsIgnoreCase("abc", "ABC")).isTrue(); + } +} From bc2efc6b0110be379ac8e19bc9220218b9a006f0 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Fri, 24 May 2024 19:56:02 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=B8=A4=E6=AC=A1?= =?UTF-8?q?=E6=8B=89=E5=8F=96=E9=85=8D=E7=BD=AE=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consul/ConsulConfigConstants.java | 2 ++ .../consul/ConsulConfigFileConnector.java | 31 ++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java index 22f70fdb7..6f321d0c0 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java @@ -32,4 +32,6 @@ public interface ConsulConfigConstants { * 空值CONSUL Index常量,设为-1L时,Consul会立即返回 */ Long EMPTY_VALUE_CONSUL_INDEX = -1L; + + String CONFIG_FILE_DELETED_MESSAGE = "config file deleted."; } diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index 22544d6f1..d95645aab 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -75,6 +75,8 @@ public class ConsulConfigFileConnector implements ConfigFileConnector { public final Map consulModifyIndexes = new ConcurrentHashMap<>(); + public final Map responseCache = new ConcurrentHashMap<>(); + private ConsulConfigContext consulConfigContext; /** @@ -154,17 +156,25 @@ public ConfigFileResponse getConfigFile(ConfigFile configFile) { } catch (Exception exception) { LOGGER.error("Watch consul config '{}' failed.", keyPrefix, exception); } - }, consulConfigContext.getDelay(), consulConfigContext.getDelay(), TimeUnit.MICROSECONDS)); + }, consulConfigContext.getDelay(), consulConfigContext.getDelay(), TimeUnit.MILLISECONDS)); + } + if (responseCache.containsKey(keyPrefix)) { + ConfigFileResponse configFileResponse = responseCache.get(keyPrefix); + if (StringUtils.equals(configFileResponse.getMessage(), ConsulConfigConstants.CONFIG_FILE_DELETED_MESSAGE)) { + responseCache.remove(keyPrefix); + } + return configFileResponse; } return getKVValues(configFile, keyPrefix); } - return new ConfigFileResponse(ServerCodes.NOT_FOUND_RESOURCE, "config file not found.", null); + return new ConfigFileResponse(ServerCodes.NOT_FOUND_RESOURCE, ConsulConfigConstants.CONFIG_FILE_DELETED_MESSAGE, null); } private ConfigFileResponse getKVValues(ConfigFile configFile, String keyPrefix) { // 使用default值逻辑处理 Long currentIndex = this.consulIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); Long currentModifyIndex = this.consulModifyIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + LOGGER.debug("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); // use the consul ACL token if found String aclToken = consulConfigContext.getAclToken(); @@ -188,11 +198,17 @@ private ConfigFileResponse getKVValues(ConfigFile configFile, String keyPrefix) private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefix, Long currentIndex, Long currentModifyIndex, Response> response) { if (response.getValue() == null) { - LOGGER.warn("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); + if (responseCache.containsKey(keyPrefix)) { + // 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry + this.consulIndexes.put(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + this.consulModifyIndexes.put(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + configFile.setVersion(response.getConsulIndex()); + return new ConfigFileResponse(CodeProto.Code.ExecuteSuccess.getNumber(), + ConsulConfigConstants.CONFIG_FILE_DELETED_MESSAGE, configFile); + } return new ConfigFileResponse(CodeProto.Code.NotFoundResource.getNumber(), "config file not found.", null); } - LOGGER.debug("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); int code = CodeProto.Code.ExecuteSuccess.getNumber(); String message = "execute success"; if (response.getValue() != null) { @@ -204,9 +220,8 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi if (newIndex != null && !newIndex.equals(currentIndex)) { // 根据currentModifyIndex和newModifyIndex判断内容是否实际发生了变化 if (!newModifyIndex.equals(currentModifyIndex)) { - LOGGER.info("KeyPrefix '{}' has new index {} and modify index {} with old index {} and old modify index {}", + LOGGER.info("KeyPrefix '{}' has new index {} and new modify index {} with old index {} and old modify index {}", keyPrefix, newIndex, newModifyIndex, currentIndex, currentModifyIndex); - } else if (LOGGER.isDebugEnabled()) { code = CodeProto.Code.DataNoChange.getNumber(); message = "config data is no change"; @@ -215,6 +230,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi } // 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry this.consulIndexes.put(keyPrefix, newIndex); + this.consulModifyIndexes.put(keyPrefix, newModifyIndex); } else if (LOGGER.isDebugEnabled()) { code = CodeProto.Code.DataNoChange.getNumber(); message = "config data is no change"; @@ -255,11 +271,12 @@ private void handleException(String keyPrefix, Long currentIndex, Long currentMo public ConfigFileResponse watchConfigFiles(List configFiles) { try { while (true) { - RefreshEventData refreshEventData = blockingQueue.take(); + RefreshEventData refreshEventData = blockingQueue.poll(30, TimeUnit.SECONDS); Optional optional = configFiles.stream() .filter(configFile -> StringUtils.equals(refreshEventData.getKeyPrefix(), ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile))) .findFirst(); if (optional.isPresent()) { + responseCache.put(refreshEventData.getKeyPrefix(), refreshEventData.getConfigFileResponse()); return refreshEventData.getConfigFileResponse(); } } From 5782569a8017df0bc1dccbdeac4b7248af64ccb5 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Mon, 27 May 2024 15:10:58 +0800 Subject: [PATCH 4/9] fix NPE. --- .../consul/ConsulConfigFileConnector.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index d95645aab..945efa378 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -199,10 +199,10 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi Long currentModifyIndex, Response> response) { if (response.getValue() == null) { if (responseCache.containsKey(keyPrefix)) { - // 在Consul中不存在自定义KEY时,此处的逻辑可以避免response实时返回,不断的触发retry - this.consulIndexes.put(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); + Long newIndex = response.getConsulIndex(); + this.consulIndexes.put(keyPrefix, newIndex); this.consulModifyIndexes.put(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); - configFile.setVersion(response.getConsulIndex()); + configFile.setVersion(newIndex); return new ConfigFileResponse(CodeProto.Code.ExecuteSuccess.getNumber(), ConsulConfigConstants.CONFIG_FILE_DELETED_MESSAGE, configFile); } @@ -272,12 +272,14 @@ public ConfigFileResponse watchConfigFiles(List configFiles) { try { while (true) { RefreshEventData refreshEventData = blockingQueue.poll(30, TimeUnit.SECONDS); - Optional optional = configFiles.stream() - .filter(configFile -> StringUtils.equals(refreshEventData.getKeyPrefix(), ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile))) - .findFirst(); - if (optional.isPresent()) { - responseCache.put(refreshEventData.getKeyPrefix(), refreshEventData.getConfigFileResponse()); - return refreshEventData.getConfigFileResponse(); + if (refreshEventData != null) { + Optional optional = configFiles.stream() + .filter(configFile -> StringUtils.equals(refreshEventData.getKeyPrefix(), ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile))) + .findFirst(); + if (optional.isPresent()) { + responseCache.put(refreshEventData.getKeyPrefix(), refreshEventData.getConfigFileResponse()); + return refreshEventData.getConfigFileResponse(); + } } } } catch (InterruptedException e) { From 5556681e11938f44e72e114dbff303bd668a4b7c Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Mon, 27 May 2024 17:02:45 +0800 Subject: [PATCH 5/9] fix error retry sleep. --- .../consul/ConsulConfigConstants.java | 2 ++ .../connector/consul/ConsulConfigContext.java | 11 +++++++ .../consul/ConsulConfigFileConnector.java | 29 ++++++++++++++++++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java index 6f321d0c0..9213bfc7e 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigConstants.java @@ -27,6 +27,8 @@ public interface ConsulConfigConstants { String WAIT_TIME_KEY = "waitTime"; String DELAY_KEY = "delay"; + + String CONSUL_ERROR_SLEEP_KEY = "consulErrorSleep"; /** * 空值CONSUL Index常量,设为-1L时,Consul会立即返回 diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java index 8a21317af..532634440 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigContext.java @@ -41,6 +41,8 @@ public class ConsulConfigContext { */ private int delay = 1000; + private long consulErrorSleep = 60000L; + private String aclToken = ""; public ServerConnectorConfig getConnectorConfig() { @@ -67,6 +69,14 @@ public void setDelay(int delay) { this.delay = delay; } + public long getConsulErrorSleep() { + return consulErrorSleep; + } + + public void setConsulErrorSleep(long consulErrorSleep) { + this.consulErrorSleep = consulErrorSleep; + } + public String getAclToken() { return aclToken; } @@ -81,6 +91,7 @@ public String toString() { "connectorConfig=" + connectorConfig + ", waitTime=" + waitTime + ", delay=" + delay + + ", consulErrorSleep=" + consulErrorSleep + ", aclToken='" + aclToken + '\'' + '}'; } diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index 945efa378..36187c5ac 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -41,6 +41,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; @@ -120,6 +121,16 @@ public void init(InitContext ctx) throws PolarisException { } } + String consulErrorSleepStr = metadata.get(ConsulConfigConstants.CONSUL_ERROR_SLEEP_KEY); + if (StringUtils.isNotBlank(consulErrorSleepStr)) { + try { + long consulErrorSleep = Long.parseLong(consulErrorSleepStr); + consulConfigContext.setConsulErrorSleep(consulErrorSleep); + } catch (Exception e) { + LOGGER.warn("delay string {} is not integer.", consulErrorSleepStr, e); + } + } + String tokenStr = connectorConfig.getToken(); if (StringUtils.isNotBlank(tokenStr)) { consulConfigContext.setAclToken(tokenStr); @@ -238,7 +249,12 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi } } transferFromGetValueList(configFile, response.getValue()); - return new ConfigFileResponse(code, message, configFile); + ConfigFileResponse configFileResponse = new ConfigFileResponse(code, message, configFile); + // for first time + if (!responseCache.containsKey(keyPrefix)) { + responseCache.put(keyPrefix, configFileResponse); + } + return configFileResponse; } private void transferFromGetValueList(ConfigFile configFile, List getValueList) { @@ -252,18 +268,29 @@ private void transferFromGetValueList(ConfigFile configFile, List getV configFile.setContent(decodedValue); configFile.setMd5(DigestUtils.md5Hex(decodedValue)); configFile.setVersion(firstValue.getModifyIndex()); + configFile.setReleaseTime(new Date()); } private void handleOperationException(String keyPrefix, Long currentIndex, Long currentModifyIndex, OperationException operationException) { LOGGER.error("KeyPrefix '{}' with operation exception with index {} and modify index {}.", keyPrefix, currentIndex, currentModifyIndex, operationException); + try { + Thread.sleep(consulConfigContext.getConsulErrorSleep()); + } catch (Exception e) { + LOGGER.error("error in sleep, msg: " + e.getMessage()); + } throw ServerErrorResponseException.build(CodeProto.Code.ExecuteException.getNumber(), operationException.toString()); } private void handleException(String keyPrefix, Long currentIndex, Long currentModifyIndex, Exception exception) { LOGGER.error("KeyPrefix '{}' with exception with index {} and modify index {}.", keyPrefix, currentIndex, currentModifyIndex, exception); + try { + Thread.sleep(consulConfigContext.getConsulErrorSleep()); + } catch (Exception e) { + LOGGER.error("error in sleep, msg: " + e.getMessage()); + } throw ServerErrorResponseException.build(CodeProto.Code.ExecuteException.getNumber(), exception.toString()); } From 8dacd3e7577443d9b79da6ca2181380fc194329a Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Wed, 29 May 2024 17:23:18 +0800 Subject: [PATCH 6/9] add some logs. --- .../connector/consul/ConsulConfigFileConnector.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index 36187c5ac..f91cfc725 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -152,6 +152,7 @@ public ConfigFileResponse getConfigFile(ConfigFile configFile) { if (this.running.get()) { String keyPrefix = ConsulConfigFileUtils.toConsulKVKeyPrefix(configFile); if (!watchFutures.containsKey(keyPrefix)) { + LOGGER.info("Start watching consul config for keyPrefix '{}'", keyPrefix); this.watchFutures.put(keyPrefix, this.scheduledExecutorService.scheduleWithFixedDelay(() -> { try { ConfigFileResponse configFileResponse = getKVValues(configFile, keyPrefix); @@ -185,7 +186,7 @@ private ConfigFileResponse getKVValues(ConfigFile configFile, String keyPrefix) // 使用default值逻辑处理 Long currentIndex = this.consulIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); Long currentModifyIndex = this.consulModifyIndexes.getOrDefault(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); - LOGGER.debug("watching consul for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); + LOGGER.debug("Get consul config for keyPrefix '{}' with index {} and modify index {}", keyPrefix, currentIndex, currentModifyIndex); // use the consul ACL token if found String aclToken = consulConfigContext.getAclToken(); @@ -214,6 +215,7 @@ private ConfigFileResponse handleResponse(ConfigFile configFile, String keyPrefi this.consulIndexes.put(keyPrefix, newIndex); this.consulModifyIndexes.put(keyPrefix, ConsulConfigConstants.EMPTY_VALUE_CONSUL_INDEX); configFile.setVersion(newIndex); + LOGGER.info("consul config file '{}' has been deleted.", keyPrefix); return new ConfigFileResponse(CodeProto.Code.ExecuteSuccess.getNumber(), ConsulConfigConstants.CONFIG_FILE_DELETED_MESSAGE, configFile); } From 552f4e6fe112c9dc9d80857883810ddf3af1b269 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Mon, 3 Jun 2024 19:23:03 +0800 Subject: [PATCH 7/9] set config connector token. --- .../connector/consul/ConsulConfigFileConnector.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java index f91cfc725..67c16422f 100644 --- a/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java +++ b/polaris-plugins/polaris-plugins-configuration-connector/consul-configuration-connector/src/main/java/com/tencent/polaris/plugins/configuration/connector/consul/ConsulConfigFileConnector.java @@ -99,6 +99,11 @@ public void init(InitContext ctx) throws PolarisException { // init consul config context. consulConfigContext = new ConsulConfigContext(); + // token + String tokenStr = connectorConfig.getToken(); + if (StringUtils.isNotBlank(tokenStr)) { + consulConfigContext.setAclToken(tokenStr); + } Map metadata = connectorConfig.getMetadata(); if (CollectionUtils.isNotEmpty(metadata)) { String waitTimeStr = metadata.get(ConsulConfigConstants.WAIT_TIME_KEY); @@ -130,11 +135,6 @@ public void init(InitContext ctx) throws PolarisException { LOGGER.warn("delay string {} is not integer.", consulErrorSleepStr, e); } } - - String tokenStr = connectorConfig.getToken(); - if (StringUtils.isNotBlank(tokenStr)) { - consulConfigContext.setAclToken(tokenStr); - } } // init watch executor. From 2aec5bec183ac1cece64f47e34892d08990f9658 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 4 Jun 2024 20:08:38 +0800 Subject: [PATCH 8/9] update junit test. --- .../java/com/tencent/polaris/api/utils/StringUtilsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java b/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java index 75d0869b4..d691ac22b 100644 --- a/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java +++ b/polaris-common/polaris-model/src/test/java/com/tencent/polaris/api/utils/StringUtilsTest.java @@ -29,7 +29,7 @@ public class StringUtilsTest { @Test - public void testIsEmpty() { + public void testEqualsIgnoreCase() { assertThat(StringUtils.equalsIgnoreCase(null, null)).isTrue(); assertThat(StringUtils.equalsIgnoreCase(null, "abc")).isFalse(); assertThat(StringUtils.equalsIgnoreCase("abc", null)).isFalse(); From 3ee61767d9b3f619f7d513031cfebc8158218479 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Thu, 6 Jun 2024 19:28:14 +0800 Subject: [PATCH 9/9] fix consul discovery bug. --- .../composite/CompositeServiceUpdateTask.java | 8 ++++ .../plugins/connector/grpc/GrpcConnector.java | 42 ++++--------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java index 40ce2a0b4..fca24928a 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java @@ -201,6 +201,14 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { } Service.Builder newServiceBuilder = Service.newBuilder() .mergeFrom(newDiscoverResponseBuilder.getService()); + if (newDiscoverResponseBuilder.getService() != null) { + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getNamespace().getValue())) { + newServiceBuilder.setNamespace(StringValue.of(serviceEventKey.getNamespace())); + } + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getName().getValue())) { + newServiceBuilder.setName(StringValue.of(serviceEventKey.getService())); + } + } newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString())); newDiscoverResponseBuilder.setService(newServiceBuilder.build()); newDiscoverResponseBuilder.clearInstances(); diff --git a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java index 0aec1401e..52d3e64d9 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java @@ -31,18 +31,7 @@ import com.tencent.polaris.api.plugin.common.InitContext; import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.compose.Extensions; -import com.tencent.polaris.api.plugin.server.CommonProviderRequest; -import com.tencent.polaris.api.plugin.server.CommonProviderResponse; -import com.tencent.polaris.api.plugin.server.CommonServiceContractRequest; -import com.tencent.polaris.api.plugin.server.InterfaceDescriptor; -import com.tencent.polaris.api.plugin.server.ReportClientRequest; -import com.tencent.polaris.api.plugin.server.ReportClientResponse; -import com.tencent.polaris.api.plugin.server.ReportServiceContractRequest; -import com.tencent.polaris.api.plugin.server.ReportServiceContractResponse; -import com.tencent.polaris.api.plugin.server.ServerConnector; -import com.tencent.polaris.api.plugin.server.ServerEvent; -import com.tencent.polaris.api.plugin.server.ServiceEventHandler; -import com.tencent.polaris.api.plugin.server.TargetServer; +import com.tencent.polaris.api.plugin.server.*; import com.tencent.polaris.api.pojo.ServiceEventKey; import com.tencent.polaris.api.pojo.ServiceEventKey.EventType; import com.tencent.polaris.api.pojo.ServiceKey; @@ -58,36 +47,18 @@ import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Type; import com.tencent.polaris.plugins.connector.grpc.Connection.ConnID; import com.tencent.polaris.specification.api.v1.model.ModelProto; -import com.tencent.polaris.specification.api.v1.service.manage.ClientProto; +import com.tencent.polaris.specification.api.v1.service.manage.*; import com.tencent.polaris.specification.api.v1.service.manage.ClientProto.Client; import com.tencent.polaris.specification.api.v1.service.manage.ClientProto.StatInfo; -import com.tencent.polaris.specification.api.v1.service.manage.PolarisGRPCGrpc; -import com.tencent.polaris.specification.api.v1.service.manage.PolarisServiceContractGRPCGrpc; -import com.tencent.polaris.specification.api.v1.service.manage.RequestProto; import com.tencent.polaris.specification.api.v1.service.manage.RequestProto.DiscoverRequest; -import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto.DiscoverResponse; -import com.tencent.polaris.specification.api.v1.service.manage.ServiceContractProto; -import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto.Service; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static com.tencent.polaris.specification.api.v1.model.CodeProto.Code.ExecuteSuccess; @@ -400,6 +371,11 @@ private ServiceProto.Instance buildRegisterInstanceRequest(CommonProviderRequest instanceBuilder.setPriority(UInt32Value.newBuilder().setValue(req.getPriority()).build()); } if (null != req.getMetadata()) { + for (Map.Entry entry : req.getMetadata().entrySet()) { + if (StringUtils.isBlank(entry.getValue())) { + entry.setValue(""); + } + } instanceBuilder.putAllMetadata(req.getMetadata()); } if (null != req.getTtl()) {