From 3c49a68d9ce3860339681ef75c48c8d2bf4c370e Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Wed, 12 Feb 2025 17:19:29 +0800 Subject: [PATCH 1/2] feat:support push gateway event. --- .../polaris-assembly-factory/pom.xml | 5 + .../polaris-circuitbreaker-factory/pom.xml | 5 + .../main/resources/conf/default-config.yml | 8 + .../api/config/plugin/DefaultPlugins.java | 5 + .../polaris-configuration-factory/pom.xml | 5 + polaris-dependencies/pom.xml | 5 + .../client/flow/DefaultDiscoveryFlow.java | 2 + .../polaris-discovery-factory/pom.xml | 5 + polaris-distribution/polaris-all/pom.xml | 6 + polaris-plugins/polaris-plugin-api/pom.xml | 9 +- .../api/plugin/common/ValueContext.java | 23 +- .../polaris/api/plugin/event/FlowEvent.java | 66 ++++- .../api/plugin/event/FlowEventConstants.java | 21 +- .../composite/ResourceCounters.java | 16 +- .../composite/utils/CircuitBreakerUtils.java | 16 +- .../lossless/common/LosslessEventUtils.java | 54 ++++ .../deregister/DeregisterLosslessPolicy.java | 12 +- .../HealthCheckRegisterLosslessPolicy.java | 10 + .../event/logger/LoggerEventReporter.java | 14 +- .../event-pushgateway/pom.xml | 37 +++ .../pushgateway/PushGatewayEventReporter.java | 231 ++++++++++++++++++ .../PushGatewayEventReporterConfig.java | 100 ++++++++ .../pushgateway/PushGatewayEventRequest.java | 48 ++++ .../pushgateway/PushGatewayEventResponse.java | 65 +++++ ...ent.polaris.api.plugin.event.EventReporter | 1 + .../PushGatewayEventRequestTest.java | 67 +++++ .../plugins/event/tsf/TsfEventReporter.java | 3 +- .../polaris-plugins-observability/pom.xml | 1 + .../client/utils/RateLimiterEventUtils.java | 28 ++- .../polaris-ratelimit-factory/pom.xml | 5 + pom.xml | 2 +- 31 files changed, 840 insertions(+), 35 deletions(-) create mode 100644 polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java diff --git a/polaris-assembly/polaris-assembly-factory/pom.xml b/polaris-assembly/polaris-assembly-factory/pom.xml index b962e1545..dd6ef9cbe 100644 --- a/polaris-assembly/polaris-assembly-factory/pom.xml +++ b/polaris-assembly/polaris-assembly-factory/pom.xml @@ -89,6 +89,11 @@ event-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml index 30bd2bfa4..959baef7a 100644 --- a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml +++ b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml @@ -111,6 +111,11 @@ event-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml index 396042b68..38549d5ec 100644 --- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml +++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml @@ -92,6 +92,14 @@ global: tsf: # 描述:TSF 事件上报开关 enable: false + # 描述:PushGateway 事件上报插件配置 + pushgateway: + # 描述:PushGateway 事件上报开关 + enable: false + # 描述:PushGateway 事件上报队列长度 + eventQueueSize: 10000; + # 描述:PushGateway 事件上报最大批量大小 + maxBatchSize: 100 # 描述:Admin相关的配置 admin: # 描述:Admin的监听的IP diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java index e6dbb19dd..c774cc29f 100644 --- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java +++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java @@ -90,6 +90,11 @@ public interface DefaultPlugins { */ String TSF_EVENT_REPORTER_TYPE = "tsf"; + /** + * PushGateway 事件上报插件名 + */ + String PUSH_GATEWAY_EVENT_REPORTER_TYPE = "pushgateway"; + /** * 黑白名单鉴权插件名 */ diff --git a/polaris-configuration/polaris-configuration-factory/pom.xml b/polaris-configuration/polaris-configuration-factory/pom.xml index 88fd8f59c..8e0e80e5d 100644 --- a/polaris-configuration/polaris-configuration-factory/pom.xml +++ b/polaris-configuration/polaris-configuration-factory/pom.xml @@ -131,6 +131,11 @@ event-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + com.tencent.polaris diff --git a/polaris-dependencies/pom.xml b/polaris-dependencies/pom.xml index ea89e61cd..720be65e6 100644 --- a/polaris-dependencies/pom.xml +++ b/polaris-dependencies/pom.xml @@ -329,6 +329,11 @@ event-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + com.tencent.polaris diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java index 91af95a36..f035ae245 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java @@ -153,6 +153,8 @@ private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Mapevent-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + diff --git a/polaris-distribution/polaris-all/pom.xml b/polaris-distribution/polaris-all/pom.xml index 8728d81c8..f933d827c 100644 --- a/polaris-distribution/polaris-all/pom.xml +++ b/polaris-distribution/polaris-all/pom.xml @@ -53,6 +53,11 @@ bcpkix-jdk15to18 ${bouncycastle.version} + + io.opentelemetry + opentelemetry-api + ${otel.version} + @@ -180,6 +185,7 @@ org.bouncycastle:* + io.opentelemetry:* diff --git a/polaris-plugins/polaris-plugin-api/pom.xml b/polaris-plugins/polaris-plugin-api/pom.xml index 940dd89bf..92791a7e7 100644 --- a/polaris-plugins/polaris-plugin-api/pom.xml +++ b/polaris-plugins/polaris-plugin-api/pom.xml @@ -1,7 +1,7 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> polaris-plugins com.tencent.polaris @@ -16,6 +16,11 @@ Polaris Plugin API JAR + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + com.tencent.polaris polaris-config diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java index 21df8e4f3..c8fcda264 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java @@ -18,9 +18,10 @@ package com.tencent.polaris.api.plugin.common; import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; /** * 用于主流程传递kv数据的上下文对象,线程安全 @@ -33,8 +34,12 @@ public class ValueContext { private static final String KEY_HOST = "key_host"; + private static final String KEY_PORT = "key_port"; + private static final String KEY_CLIENT_ID = "key_clientId"; + private static final String KEY_INSTANCE_ID = "key_instanceId"; + private static final String KEY_ENGINE = "key_engine"; private static final String KEY_SERVER_CONNECTOR_PROTOCOL = "key_serverConnectorProtocol"; @@ -88,6 +93,14 @@ public void setHost(String host) { setValue(KEY_HOST, host); } + public Integer getPort() { + return getValue(KEY_PORT); + } + + public void setPort(Integer port) { + setValue(KEY_PORT, port); + } + public String getClientId() { return getValue(KEY_CLIENT_ID); } @@ -96,6 +109,14 @@ public void setClientId(String clientId) { setValue(KEY_CLIENT_ID, clientId); } + public String getInstanceId() { + return getValue(KEY_INSTANCE_ID); + } + + public void setInstanceId(String instanceId) { + setValue(KEY_INSTANCE_ID, instanceId); + } + public String getServerConnectorProtocol() { return getValue(KEY_SERVER_CONNECTOR_PROTOCOL); } diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java index 348945624..5d5c07673 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java @@ -17,9 +17,11 @@ package com.tencent.polaris.api.plugin.event; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; import com.tencent.polaris.api.pojo.ServiceEventKey; -import java.time.Instant; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; @@ -33,71 +35,98 @@ public class FlowEvent { /** * 事件类型 */ + @JsonProperty("event_type") private final ServiceEventKey.EventType eventType; + /** + * 事件名称 + */ + @JsonProperty("event_name") + private final FlowEventConstants.EventName eventName; + /** * 时间戳 */ - private final Instant timestamp; + @JsonProperty("event_time") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss:SSSS") + private final LocalDateTime timestamp; /** * 客户端ID */ + @JsonProperty("client_id") private final String clientId; /** * 客户端IP */ + @JsonProperty("client_ip") private final String clientIp; /** * 被调命名空间 */ + @JsonProperty("namespace") private final String namespace; /** * 被调服务名 */ + @JsonProperty("service") private final String service; /** * 接口协议 */ + @JsonProperty("api_protocol") private final String apiProtocol; /** * 接口路径 */ + @JsonProperty("api_path") private final String apiPath; /** * 接口方法 */ + @JsonProperty("api_method") private final String apiMethod; + /** + * 实例ID + */ + @JsonProperty("instance_id") + private final String instanceId; + /** * 实例IP */ + @JsonProperty("host") private final String host; /** * 实例端口 */ + @JsonProperty("port") private final String port; /** * 主调命名空间 */ + @JsonProperty("source_namespace") private final String sourceNamespace; /** * 主调服务名 */ + @JsonProperty("source_service") private final String sourceService; /** * 主调标签 */ + @JsonProperty("labels") private final String labels; /** @@ -123,12 +152,14 @@ public class FlowEvent { /** * 状态转换原因 */ + @JsonProperty("reason") private final String reason; private final Map additionalParams; private FlowEvent(Builder builder) { this.eventType = builder.eventType; + this.eventName = builder.eventName; this.timestamp = builder.timestamp; this.clientId = builder.clientId; this.clientIp = builder.clientIp; @@ -137,6 +168,7 @@ private FlowEvent(Builder builder) { this.apiProtocol = builder.apiProtocol; this.apiPath = builder.apiPath; this.apiMethod = builder.apiMethod; + this.instanceId = builder.instanceId; this.host = builder.host; if (builder.port != null) { this.port = String.valueOf(builder.port); @@ -156,7 +188,8 @@ private FlowEvent(Builder builder) { public static class Builder { private ServiceEventKey.EventType eventType; - private Instant timestamp; + private FlowEventConstants.EventName eventName; + private LocalDateTime timestamp; private String clientId = ""; private String clientIp = ""; private String namespace = ""; @@ -164,6 +197,7 @@ public static class Builder { private String apiProtocol = ""; private String apiPath = ""; private String apiMethod = ""; + private String instanceId = ""; private String host = ""; private Integer port; private String sourceNamespace = ""; @@ -180,7 +214,12 @@ public Builder withEventType(ServiceEventKey.EventType eventType) { return this; } - public Builder withTimestamp(Instant timestamp) { + public Builder withEventName(FlowEventConstants.EventName eventName) { + this.eventName = eventName; + return this; + } + + public Builder withTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; return this; } @@ -220,6 +259,11 @@ public Builder withApiMethod(String apiMethod) { return this; } + public Builder withInstanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + public Builder withHost(String host) { this.host = host; return this; @@ -279,7 +323,11 @@ public ServiceEventKey.EventType getEventType() { return eventType; } - public Instant getTimestamp() { + public FlowEventConstants.EventName getEventName() { + return eventName; + } + + public LocalDateTime getTimestamp() { return timestamp; } @@ -311,6 +359,10 @@ public String getApiMethod() { return apiMethod; } + public String getInstanceId() { + return instanceId; + } + public String getHost() { return host; } @@ -359,6 +411,7 @@ public Map getAdditionalParams() { public String toString() { return "FlowEvent{" + "eventType=" + eventType + + ", eventName=" + eventName + ", timestamp=" + timestamp + ", clientId='" + clientId + '\'' + ", clientIp='" + clientIp + '\'' + @@ -367,8 +420,9 @@ public String toString() { ", apiProtocol='" + apiProtocol + '\'' + ", apiPath='" + apiPath + '\'' + ", apiMethod='" + apiMethod + '\'' + + ", instanceId='" + instanceId + '\'' + ", host='" + host + '\'' + - ", port=" + port + + ", port='" + port + '\'' + ", sourceNamespace='" + sourceNamespace + '\'' + ", sourceService='" + sourceService + '\'' + ", labels='" + labels + '\'' + diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java index be06aa86c..9df90c2d5 100644 --- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java +++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java @@ -22,6 +22,26 @@ */ public class FlowEventConstants { + public enum EventName { + LosslessOnlineStart, + LosslessOnlineEnd, + LosslessWarmupStart, + LosslessWarmupEnd, + LosslessOfflineStart, + + InstanceThreadEnd, + + CircuitBreakerOpen, + CircuitBreakerHalfOpen, + CircuitBreakerClose, + CircuitBreakerDestroy, + + RateLimitStart, + RateLimitEnd, + + UNKNOWN, + } + public enum Status { // circuit breaker status OPEN, @@ -47,5 +67,4 @@ public enum ResourceType { UNKNOWN, } - } diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java index b9e053b47..268b45065 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java @@ -34,9 +34,9 @@ import com.tencent.polaris.api.pojo.*; import com.tencent.polaris.api.pojo.CircuitBreakerStatus.FallbackInfo; import com.tencent.polaris.api.pojo.CircuitBreakerStatus.Status; -import com.tencent.polaris.api.utils.TrieUtil; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.api.utils.TrieUtil; import com.tencent.polaris.client.flow.BaseFlow; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.ConsecutiveCounter; @@ -48,7 +48,7 @@ import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto.*; import org.slf4j.Logger; -import java.time.Instant; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -339,17 +339,23 @@ private void reportEvent(CircuitBreakerStatus.Status previousStatus, CircuitBrea if (extensions == null) { return; } + + FlowEventConstants.Status currentFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(currentStatus); + FlowEventConstants.Status previousFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(previousStatus); + FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder() .withEventType(ServiceEventKey.EventType.CIRCUIT_BREAKING) - .withTimestamp(Instant.now()) + .withEventName(CircuitBreakerUtils.parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus)) + .withTimestamp(LocalDateTime.now()) .withClientId(extensions.getValueContext().getClientId()) .withClientIp(extensions.getValueContext().getHost()) .withNamespace(resource.getService().getNamespace()) .withService(resource.getService().getService()) + .withInstanceId(extensions.getValueContext().getInstanceId()) .withSourceNamespace(resource.getCallerService().getNamespace()) .withSourceService(resource.getService().getService()) - .withCurrentStatus(CircuitBreakerUtils.parseFlowEventStatus(currentStatus)) - .withPreviousStatus(CircuitBreakerUtils.parseFlowEventStatus(previousStatus)) + .withCurrentStatus(currentFlowEventStatus) + .withPreviousStatus(previousFlowEventStatus) .withRuleName(ruleName); if (StringUtils.isNotBlank(reason)) { flowEventBuilder.withReason(reason); diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java index 323f39529..552cafdd9 100644 --- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java +++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java @@ -36,7 +36,7 @@ public class CircuitBreakerUtils { public static long DEFAULT_ERROR_RATE_INTERVAL_MS = 60 * 1000; - public static long MIN_CLEANUP_INTERVAL = 60 * 1000; + public static long MIN_CLEANUP_INTERVAL = 60 * 1000; public static boolean checkRule(CircuitBreakerProto.CircuitBreakerRule rule) { return checkLevel(rule.getLevel()); @@ -110,6 +110,20 @@ public static FlowEventConstants.Status parseFlowEventStatus(CircuitBreakerStatu } } + public static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) { + if (currentStatus == FlowEventConstants.Status.OPEN) { + return FlowEventConstants.EventName.CircuitBreakerOpen; + } else if (currentStatus == FlowEventConstants.Status.HALF_OPEN) { + return FlowEventConstants.EventName.CircuitBreakerHalfOpen; + } else if (currentStatus == FlowEventConstants.Status.CLOSE) { + return FlowEventConstants.EventName.CircuitBreakerClose; + } else if (currentStatus == FlowEventConstants.Status.DESTROY) { + return FlowEventConstants.EventName.CircuitBreakerDestroy; + } else { + return FlowEventConstants.EventName.UNKNOWN; + } + } + public static String getApiCircuitBreakerName(String targetNamespaceId, String serviceName, String path, String method) { return targetNamespaceId + "#" + serviceName + "#" + getFormatApi(path, method); } diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java new file mode 100644 index 000000000..11373c26d --- /dev/null +++ b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java @@ -0,0 +1,54 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugin.lossless.common; + +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.event.FlowEvent; +import com.tencent.polaris.api.plugin.event.FlowEventConstants; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.client.flow.BaseFlow; + +import java.time.LocalDateTime; + +/** + * @author Haotian Zhang + */ +public class LosslessEventUtils { + + public static void reportEvent(Extensions extensions, String namespace, String service, String host, + int port, FlowEventConstants.EventName eventName) { + if (extensions == null) { + return; + } + + FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder() + .withEventType(ServiceEventKey.EventType.LOSSLESS) + .withEventName(eventName) + .withTimestamp(LocalDateTime.now()) + .withClientId(extensions.getValueContext().getClientId()) + .withClientIp(extensions.getValueContext().getHost()) + .withNamespace(namespace) + .withService(service) + .withInstanceId(extensions.getValueContext().getInstanceId()) + .withHost(host) + .withPort(port); + + FlowEvent flowEvent = flowEventBuilder.build(); + BaseFlow.reportFlowEvent(extensions, flowEvent); + } +} diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java index 3d0517056..86c7142ae 100644 --- a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java +++ b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java @@ -27,6 +27,7 @@ import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.common.ValueContext; import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.event.FlowEventConstants; import com.tencent.polaris.api.plugin.lossless.InstanceProperties; import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider; import com.tencent.polaris.api.plugin.lossless.LosslessPolicy; @@ -37,19 +38,14 @@ import com.tencent.polaris.client.util.HttpServerUtils; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.logging.LoggingConsts; +import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils; import com.tencent.polaris.plugin.lossless.common.LosslessUtils; import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto; import org.slf4j.Logger; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class DeregisterLosslessPolicy implements LosslessPolicy, HttpServerAware { @@ -197,6 +193,8 @@ private void doLosslessDeregister(BaseInstance instance, LosslessActionProvider event.setBaseInstance(instance); event.setEventName(EVENT_LOSSLESS_DEREGISTER); EVENT_LOG.info(event.toString()); + LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(), + instance.getPort(), FlowEventConstants.EventName.LosslessOfflineStart); } private boolean isLosslessOfflineEnable(BaseInstance instance) { diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java index 18ea0c029..1d8aed604 100644 --- a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java +++ b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java @@ -27,6 +27,7 @@ import com.tencent.polaris.api.plugin.common.PluginTypes; import com.tencent.polaris.api.plugin.common.ValueContext; import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.event.FlowEventConstants; import com.tencent.polaris.api.plugin.lossless.InstanceProperties; import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider; import com.tencent.polaris.api.plugin.lossless.LosslessPolicy; @@ -38,6 +39,7 @@ import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.logging.LoggingConsts; +import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils; import com.tencent.polaris.plugin.lossless.common.LosslessUtils; import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto; import org.slf4j.Logger; @@ -181,6 +183,8 @@ private void doLosslessRegister( } LOG.info("[HealthCheckRegisterLosslessPolicy] do losslessRegister for instance {}", instance); + LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(), + instance.getPort(), FlowEventConstants.EventName.LosslessOnlineStart); if (!losslessActionProvider.isEnableHealthCheck()) { long delayRegisterInterval = getDelayRegisterInterval(instance); LOG.info("[HealthCheckRegisterLosslessPolicy] health check disabled, start lossless register after {}ms, plugin {}", @@ -216,6 +220,8 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc } else { EVENT_LOG.info(new Event(clientId, instance, EVENT_DIRECT_REGISTER).toString()); } + LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(), + instance.getPort(), FlowEventConstants.EventName.LosslessOnlineEnd); int warmupInterval = getWarmupInterval(instance); if (warmupInterval > 0) { @@ -226,11 +232,15 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc Event warmupStartEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_START); EVENT_LOG.info(warmupStartEvent.toString()); + LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(), + instance.getPort(), FlowEventConstants.EventName.LosslessWarmupStart); healthCheckExecutor.schedule(() -> { LOG.info("[HealthCheckRegisterLosslessPolicy] warmup for instance {} finished", instance); Event warmupEndEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_END); EVENT_LOG.info(warmupEndEvent.toString()); + LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(), + instance.getPort(), FlowEventConstants.EventName.LosslessWarmupEnd); }, warmupInterval, TimeUnit.MILLISECONDS); } else { LOG.info("[HealthCheckRegisterLosslessPolicy] no warmup for instance {}", instance); diff --git a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java index f10d27d39..f81046223 100644 --- a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java +++ b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java @@ -95,6 +95,11 @@ public String convertMessage(FlowEvent event) { eventType = event.getEventType().name(); } + String eventName = ""; + if (event.getEventName() != null) { + eventName = event.getEventName().name(); + } + String currentStatus = ""; if (event.getCurrentStatus() != null) { currentStatus = event.getCurrentStatus().name(); @@ -109,11 +114,12 @@ public String convertMessage(FlowEvent event) { if (event.getResourceType() != null) { resourceType = event.getResourceType().name(); } - return eventType + "|" + formattedDateTime + "|" + event.getClientId() + "|" + return eventType + "|" + eventName + "|" + formattedDateTime + "|" + event.getClientId() + "|" + event.getClientIp() + "|" + event.getNamespace() + "|" + event.getService() + "|" + event.getApiProtocol() + "|" + event.getApiPath() + "|" + event.getApiMethod() + "|" - + event.getHost() + "|" + event.getPort() + "|" + event.getSourceNamespace() + "|" - + event.getSourceService() + "|" + event.getLabels() + "|" + currentStatus + "|" + previousStatus + "|" - + resourceType + "|" + event.getRuleName() + "|" + event.getReason(); + + event.getInstanceId() + "|" + event.getHost() + "|" + event.getPort() + "|" + + event.getSourceNamespace() + "|" + event.getSourceService() + "|" + event.getLabels() + "|" + + currentStatus + "|" + previousStatus + "|" + resourceType + "|" + event.getRuleName() + "|" + + event.getReason(); } } diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml new file mode 100644 index 000000000..1b3fbf539 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + polaris-plugins-observability + com.tencent.polaris + ${revision} + ../pom.xml + + + event-pushgateway + Polaris Plugins Observability Push Gateway Event Reporter + Polaris Plugins Observability Push Gateway Event Reporter JAR + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + com.google.code.gson + gson + ${gson.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + \ No newline at end of file diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java new file mode 100644 index 000000000..d302bca25 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java @@ -0,0 +1,231 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.event.pushgateway; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.tencent.polaris.api.config.global.EventReporterConfig; +import com.tencent.polaris.api.config.plugin.DefaultPlugins; +import com.tencent.polaris.api.config.plugin.PluginConfigProvider; +import com.tencent.polaris.api.config.verify.Verifier; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.plugin.PluginType; +import com.tencent.polaris.api.plugin.common.InitContext; +import com.tencent.polaris.api.plugin.common.PluginTypes; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.plugin.event.EventReporter; +import com.tencent.polaris.api.plugin.event.FlowEvent; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.api.utils.ThreadPoolUtils; +import com.tencent.polaris.client.util.NamedThreadFactory; +import com.tencent.polaris.logging.LoggerFactory; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * Polaris push gateway event reporter. + * + * @author Haotian Zhang + */ +public class PushGatewayEventReporter implements EventReporter, PluginConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(PushGatewayEventReporter.class); + + private final ObjectMapper mapper = new ObjectMapper(); + + private BlockingQueue eventQueue; + + private volatile boolean init = true; + + private PushGatewayEventReporterConfig config; + + private URI eventUri; + + private final ScheduledExecutorService eventExecutors = new ScheduledThreadPoolExecutor(1, + new NamedThreadFactory("event-pushgateway")); + + @Override + public boolean isEnabled() { + return config.isEnable(); + } + + @Override + public boolean reportEvent(FlowEvent flowEvent) { + if (eventUri == null) { + LOG.warn("build event request url fail, can not sent event."); + return false; + } + // 如果满了就抛出异常 + try { + eventQueue.add(flowEvent); + if (LOG.isDebugEnabled()) { + LOG.debug("add push gateway event to event queue: {}", flowEvent); + } + } catch (Throwable throwable) { + LOG.warn("Event queue is full. Log this event and drop it. event={}, error={}.", + flowEvent, throwable.getMessage(), throwable); + } + return true; + } + + @Override + public String getName() { + return DefaultPlugins.PUSH_GATEWAY_EVENT_REPORTER_TYPE; + } + + @Override + public Class getPluginConfigClazz() { + return PushGatewayEventReporterConfig.class; + } + + @Override + public PluginType getType() { + return PluginTypes.EVENT_REPORTER.getBaseType(); + } + + @Override + public void init(InitContext ctx) throws PolarisException { + EventReporterConfig eventReporterConfig = ctx.getConfig().getGlobal().getEventReporter(); + if (eventReporterConfig != null && CollectionUtils.isNotEmpty(eventReporterConfig.getReporters())) { + for (String reporter : eventReporterConfig.getReporters()) { + if (StringUtils.equals(getName(), reporter)) { + this.config = ctx.getConfig().getGlobal().getEventReporter() + .getPluginConfig(getName(), PushGatewayEventReporterConfig.class); + if (config.isEnable()) { + init = false; + } + return; + } + } + } + } + + @Override + public void postContextInit(Extensions ctx) throws PolarisException { + if (!init) { + synchronized (this) { + if (!init) { + init = true; + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + eventQueue = new LinkedBlockingQueue<>(config.getEventQueueSize()); + + // 分割主机名和端口号 + if (StringUtils.isBlank(config.getAddress())) { + throw new RuntimeException("PushGateway address is empty."); + } + String[] parts = config.getAddress().split(":"); + if (parts.length != 2) { + throw new RuntimeException(String.format("PushGateway address %s format error.", config.getAddress())); + } + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + eventUri = new URIBuilder() + .setHost(host) + .setPort(port) + .setScheme("http") + .setPath("/polaris/client/events") + .build(); + LOG.info("PushGateway event reporter init with uri: {}", eventUri); + + eventExecutors.scheduleWithFixedDelay(new PushGatewayEventTask(), 1000, 1000, TimeUnit.MILLISECONDS); + LOG.info("PushGateway event reporter starts reporting task."); + } catch (URISyntaxException e) { + LOG.error("Build event request url fail.", e); + } + } + } + } + } + + @Override + public void destroy() { + ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{eventExecutors}); + } + + class PushGatewayEventTask implements Runnable { + + @Override + public void run() { + try { + // 每次把eventQueue发空结束 + while (CollectionUtils.isNotEmpty(eventQueue)) { + List eventDataList = new ArrayList<>(); + PushGatewayEventRequest request = new PushGatewayEventRequest(); + + eventQueue.drainTo(eventDataList, config.getMaxBatchSize()); + request.setBatch(eventDataList); + + postPushGatewayEvent(request); + } + } catch (Throwable e) { + LOG.warn("Push gateway event reporter task fail.", e); + } + } + + private void postPushGatewayEvent(PushGatewayEventRequest request) { + StringEntity postBody = null; + RequestConfig config = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(10000).setSocketTimeout(10000).build(); + try (CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build()) { + HttpPost httpPost = new HttpPost(eventUri); + postBody = new StringEntity(mapper.writeValueAsString(request)); + if (LOG.isDebugEnabled()) { + LOG.debug("postPushGatewayEvent body:{}", postBody); + } + httpPost.setEntity(postBody); + httpPost.setHeader("Content-Type", "application/json"); + HttpResponse httpResponse; + + httpResponse = httpClient.execute(httpPost); + + if (200 != httpResponse.getStatusLine().getStatusCode()) { + String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8"); + throw new RuntimeException("Report push gateway event failed. Response = [" + resultString + "]."); + } else { + if (LOG.isDebugEnabled()) { + String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8"); + LOG.info("Report push gateway event success. Response is : {}", resultString); + } else { + LOG.info("Report push gateway event success."); + } + } + } catch (Exception e) { + LOG.warn("Report push gateway event failed, postBody:{}.", postBody, e); + } + } + } +} diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java new file mode 100644 index 000000000..94ea9d519 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java @@ -0,0 +1,100 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.event.pushgateway; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.tencent.polaris.api.config.verify.Verifier; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.factory.util.ConfigUtils; + +/** + * Polaris push gateway event reporter config. + * + * @author Haotian Zhang + */ +public class PushGatewayEventReporterConfig implements Verifier { + + @JsonProperty + private Boolean enable; + + @JsonProperty + private String address; + + @JsonProperty + private Integer eventQueueSize = 1000; + + @JsonProperty + private Integer maxBatchSize = 100; + + @Override + public void verify() { + ConfigUtils.validateNull(enable, "global.eventReporter.plugin.pushgateway.enable"); + if (!enable) { + return; + } + ConfigUtils.validateString(address, "global.eventReporter.plugin.pushgateway.address"); + } + + @Override + public void setDefault(Object defaultObject) { + if (defaultObject instanceof PushGatewayEventReporterConfig) { + PushGatewayEventReporterConfig pushGatewayEventReporterConfig = (PushGatewayEventReporterConfig) defaultObject; + if (null == enable) { + setEnable(pushGatewayEventReporterConfig.isEnable()); + } + if (StringUtils.isBlank(address)) { + setAddress(pushGatewayEventReporterConfig.getAddress()); + } + } + } + + public boolean isEnable() { + if (null == enable) { + enable = false; + } + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Integer getEventQueueSize() { + return eventQueueSize; + } + + public void setEventQueueSize(Integer eventQueueSize) { + this.eventQueueSize = eventQueueSize; + } + + public Integer getMaxBatchSize() { + return maxBatchSize; + } + + public void setMaxBatchSize(Integer maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } +} diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java new file mode 100644 index 000000000..7a5a34ab1 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java @@ -0,0 +1,48 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.event.pushgateway; + +import com.tencent.polaris.api.plugin.event.FlowEvent; + +import java.util.ArrayList; +import java.util.List; + +/** + * Polaris push gateway event request. + * + * @author Haotian Zhang + */ +public class PushGatewayEventRequest { + + private List batch = new ArrayList<>(); + + public List getBatch() { + return batch; + } + + public void setBatch(List batch) { + this.batch = batch; + } + + @Override + public String toString() { + return "PushGatewayEventRequest{" + + "batch=" + batch + + '}'; + } +} diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java new file mode 100644 index 000000000..0678d93e1 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java @@ -0,0 +1,65 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.event.pushgateway; + +/** + * Polaris push gateway event response. + * + * @author Haotian Zhang + */ +public class PushGatewayEventResponse { + + private String requestId; + + private int code; + + private String info; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + + @Override + public String toString() { + return "PushGatewayEventResponse{" + + "requestId='" + requestId + '\'' + + ", code=" + code + + ", info='" + info + '\'' + + '}'; + } +} diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter new file mode 100644 index 000000000..55a65137a --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter @@ -0,0 +1 @@ +com.tencent.polaris.plugins.event.pushgateway.PushGatewayEventReporter diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java new file mode 100644 index 000000000..95e0fa694 --- /dev/null +++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java @@ -0,0 +1,67 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.event.pushgateway; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.tencent.polaris.api.plugin.event.FlowEvent; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import org.junit.Test; + +import java.time.LocalDateTime; + +import static com.tencent.polaris.api.plugin.event.FlowEventConstants.EventName.LosslessOnlineStart; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link PushGatewayEventRequest}. + * + * @author Haotian Zhang + */ +public class PushGatewayEventRequestTest { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void test() throws JsonProcessingException { + FlowEvent flowEvent = new FlowEvent.Builder() + .withEventType(ServiceEventKey.EventType.LOSSLESS) + .withEventName(LosslessOnlineStart) + .withTimestamp(LocalDateTime.of(2025, 1, 13, 11, 58, 42)) + .withClientId("test-client") + .withClientIp("1.2.3.4") + .withNamespace("test-namespace") + .withService("test-service") + .withInstanceId("test-instance") + .withHost("1.2.3.4") + .withPort(8080) + .build(); + + PushGatewayEventRequest request = new PushGatewayEventRequest(); + request.getBatch().add(flowEvent); + + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + assertThat(mapper.writeValueAsString(request)).isEqualTo("{\"batch\":[{\"currentStatus\":null,\"previousStatus\":null,\"resourceType\":null,\"ruleName\":\"\",\"additionalParams\":{},\"event_type\":\"LOSSLESS\",\"event_name\":\"LosslessOnlineStart\",\"event_time\":\"2025-01-13 11:58:42:0000\",\"client_id\":\"test-client\",\"client_ip\":\"1.2.3.4\",\"namespace\":\"test-namespace\",\"service\":\"test-service\",\"api_protocol\":\"\",\"api_path\":\"\",\"api_method\":\"\",\"instance_id\":\"test-instance\",\"host\":\"1.2.3.4\",\"port\":\"8080\",\"source_namespace\":\"\",\"source_service\":\"\",\"labels\":\"\",\"reason\":\"\"}]}"); + } +} \ No newline at end of file diff --git a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java index e111343a5..bf985c879 100644 --- a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java +++ b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java @@ -57,6 +57,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -115,7 +116,7 @@ private boolean reportV1Event(FlowEvent flowEvent) { } TsfEventData eventData = new TsfEventData(); - eventData.setOccurTime(flowEvent.getTimestamp().getEpochSecond()); + eventData.setOccurTime(flowEvent.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().getEpochSecond()); eventData.setEventName(TsfEventDataUtils.convertEventName(flowEvent)); Byte status = TsfEventDataUtils.convertStatus(flowEvent); if (status == null || status == -1) { diff --git a/polaris-plugins/polaris-plugins-observability/pom.xml b/polaris-plugins/polaris-plugins-observability/pom.xml index 565759f03..0b57b24b6 100644 --- a/polaris-plugins/polaris-plugins-observability/pom.xml +++ b/polaris-plugins/polaris-plugins-observability/pom.xml @@ -21,6 +21,7 @@ trace-otel event-logger event-tsf + event-pushgateway diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java index 91f59031a..6a610888f 100644 --- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java +++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java @@ -27,7 +27,7 @@ import com.tencent.polaris.client.flow.BaseFlow; import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto; -import java.time.Instant; +import java.time.LocalDateTime; import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_DETAIL_KEY; import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_ID_KEY; @@ -43,23 +43,29 @@ public static void reportEvent(Extensions extensions, ServiceKey serviceKey, Rat if (extensions == null) { return; } + + FlowEventConstants.Status currentFlowEventStatus = parseFlowEventStatus(currentCode); + FlowEventConstants.Status previousFlowEventStatus = parseFlowEventStatus(previousCode); + FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder() .withEventType(ServiceEventKey.EventType.RATE_LIMITING) - .withTimestamp(Instant.now()) + .withEventName(parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus)) + .withTimestamp(LocalDateTime.now()) .withClientId(extensions.getValueContext().getClientId()) .withClientIp(extensions.getValueContext().getHost()) .withNamespace(serviceKey.getNamespace()) .withService(serviceKey.getService()) .withApiPath(rule.getMethod().getValue().getValue()) + .withInstanceId(extensions.getValueContext().getInstanceId()) .withHost(extensions.getValueContext().getHost()) + .withPort(extensions.getValueContext().getPort()) .withSourceNamespace(sourceNamespace) .withSourceService(sourceService) .withLabels(labels) - .withCurrentStatus(parseFlowEventStatus(currentCode)) - .withPreviousStatus(parseFlowEventStatus(previousCode)) + .withCurrentStatus(currentFlowEventStatus) + .withPreviousStatus(previousFlowEventStatus) .withResourceType(parseFlowEventResourceType(rule.getResource())) - .withRuleName(rule.getName().getValue()) - .withReason(reason); + .withRuleName(rule.getName().getValue()); if (StringUtils.isNotBlank(reason)) { flowEventBuilder.withReason(reason); } @@ -92,6 +98,16 @@ private static FlowEventConstants.Status parseFlowEventStatus(QuotaResult.Code c } } + private static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) { + if (currentStatus == FlowEventConstants.Status.LIMITED && previousStatus == FlowEventConstants.Status.UNLIMITED) { + return FlowEventConstants.EventName.RateLimitStart; + } else if (currentStatus == FlowEventConstants.Status.UNLIMITED && previousStatus == FlowEventConstants.Status.LIMITED) { + return FlowEventConstants.EventName.RateLimitEnd; + } else { + return FlowEventConstants.EventName.UNKNOWN; + } + } + private static FlowEventConstants.ResourceType parseFlowEventResourceType(RateLimitProto.Rule.Resource resource) { switch (resource) { case QPS: diff --git a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml index 10f673240..eaaa113b2 100644 --- a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml +++ b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml @@ -118,6 +118,11 @@ event-tsf ${project.version} + + com.tencent.polaris + event-pushgateway + ${project.version} + com.tencent.polaris diff --git a/pom.xml b/pom.xml index 334c3af2a..e5320c38f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ - 2.0.0.1 + 2.0.1.0-SNAPSHOT ${maven.build.timestamp} false From 7821cd59fed322bc25f3108e46e4b9b5c4f77e95 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Thu, 13 Feb 2025 16:36:45 +0800 Subject: [PATCH 2/2] remove instance id. --- .../client/flow/DefaultDiscoveryFlow.java | 2 - .../api/plugin/common/ValueContext.java | 20 --- .../polaris/api/plugin/event/FlowEvent.java | 18 --- .../composite/ResourceCounters.java | 108 ++-------------- .../utils/CircuitBreakerEventUtils.java | 122 ++++++++++++++++++ .../lossless/common/LosslessEventUtils.java | 1 - .../event/logger/LoggerEventReporter.java | 7 +- .../PushGatewayEventRequestTest.java | 3 +- .../client/utils/RateLimiterEventUtils.java | 2 - 9 files changed, 139 insertions(+), 144 deletions(-) create mode 100644 polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerEventUtils.java diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java index f035ae245..91af95a36 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java @@ -153,8 +153,6 @@ private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Map