From 3c49a68d9ce3860339681ef75c48c8d2bf4c370e Mon Sep 17 00:00:00 2001
From: Haotian Zhang <928016560@qq.com>
Date: Wed, 12 Feb 2025 17:19:29 +0800
Subject: [PATCH 1/2] feat:support push gateway event.
---
.../polaris-assembly-factory/pom.xml | 5 +
.../polaris-circuitbreaker-factory/pom.xml | 5 +
.../main/resources/conf/default-config.yml | 8 +
.../api/config/plugin/DefaultPlugins.java | 5 +
.../polaris-configuration-factory/pom.xml | 5 +
polaris-dependencies/pom.xml | 5 +
.../client/flow/DefaultDiscoveryFlow.java | 2 +
.../polaris-discovery-factory/pom.xml | 5 +
polaris-distribution/polaris-all/pom.xml | 6 +
polaris-plugins/polaris-plugin-api/pom.xml | 9 +-
.../api/plugin/common/ValueContext.java | 23 +-
.../polaris/api/plugin/event/FlowEvent.java | 66 ++++-
.../api/plugin/event/FlowEventConstants.java | 21 +-
.../composite/ResourceCounters.java | 16 +-
.../composite/utils/CircuitBreakerUtils.java | 16 +-
.../lossless/common/LosslessEventUtils.java | 54 ++++
.../deregister/DeregisterLosslessPolicy.java | 12 +-
.../HealthCheckRegisterLosslessPolicy.java | 10 +
.../event/logger/LoggerEventReporter.java | 14 +-
.../event-pushgateway/pom.xml | 37 +++
.../pushgateway/PushGatewayEventReporter.java | 231 ++++++++++++++++++
.../PushGatewayEventReporterConfig.java | 100 ++++++++
.../pushgateway/PushGatewayEventRequest.java | 48 ++++
.../pushgateway/PushGatewayEventResponse.java | 65 +++++
...ent.polaris.api.plugin.event.EventReporter | 1 +
.../PushGatewayEventRequestTest.java | 67 +++++
.../plugins/event/tsf/TsfEventReporter.java | 3 +-
.../polaris-plugins-observability/pom.xml | 1 +
.../client/utils/RateLimiterEventUtils.java | 28 ++-
.../polaris-ratelimit-factory/pom.xml | 5 +
pom.xml | 2 +-
31 files changed, 840 insertions(+), 35 deletions(-)
create mode 100644 polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter
create mode 100644 polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java
diff --git a/polaris-assembly/polaris-assembly-factory/pom.xml b/polaris-assembly/polaris-assembly-factory/pom.xml
index b962e1545..dd6ef9cbe 100644
--- a/polaris-assembly/polaris-assembly-factory/pom.xml
+++ b/polaris-assembly/polaris-assembly-factory/pom.xml
@@ -89,6 +89,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
index 30bd2bfa4..959baef7a 100644
--- a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
+++ b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
@@ -111,6 +111,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
index 396042b68..38549d5ec 100644
--- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
+++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
@@ -92,6 +92,14 @@ global:
tsf:
# 描述:TSF 事件上报开关
enable: false
+ # 描述:PushGateway 事件上报插件配置
+ pushgateway:
+ # 描述:PushGateway 事件上报开关
+ enable: false
+ # 描述:PushGateway 事件上报队列长度
+ eventQueueSize: 10000;
+ # 描述:PushGateway 事件上报最大批量大小
+ maxBatchSize: 100
# 描述:Admin相关的配置
admin:
# 描述:Admin的监听的IP
diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
index e6dbb19dd..c774cc29f 100644
--- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
+++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
@@ -90,6 +90,11 @@ public interface DefaultPlugins {
*/
String TSF_EVENT_REPORTER_TYPE = "tsf";
+ /**
+ * PushGateway 事件上报插件名
+ */
+ String PUSH_GATEWAY_EVENT_REPORTER_TYPE = "pushgateway";
+
/**
* 黑白名单鉴权插件名
*/
diff --git a/polaris-configuration/polaris-configuration-factory/pom.xml b/polaris-configuration/polaris-configuration-factory/pom.xml
index 88fd8f59c..8e0e80e5d 100644
--- a/polaris-configuration/polaris-configuration-factory/pom.xml
+++ b/polaris-configuration/polaris-configuration-factory/pom.xml
@@ -131,6 +131,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/polaris-dependencies/pom.xml b/polaris-dependencies/pom.xml
index ea89e61cd..720be65e6 100644
--- a/polaris-dependencies/pom.xml
+++ b/polaris-dependencies/pom.xml
@@ -329,6 +329,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
index 91af95a36..f035ae245 100644
--- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
+++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
@@ -153,6 +153,8 @@ private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Mapevent-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-distribution/polaris-all/pom.xml b/polaris-distribution/polaris-all/pom.xml
index 8728d81c8..f933d827c 100644
--- a/polaris-distribution/polaris-all/pom.xml
+++ b/polaris-distribution/polaris-all/pom.xml
@@ -53,6 +53,11 @@
bcpkix-jdk15to18
${bouncycastle.version}
+
+ io.opentelemetry
+ opentelemetry-api
+ ${otel.version}
+
@@ -180,6 +185,7 @@
org.bouncycastle:*
+ io.opentelemetry:*
diff --git a/polaris-plugins/polaris-plugin-api/pom.xml b/polaris-plugins/polaris-plugin-api/pom.xml
index 940dd89bf..92791a7e7 100644
--- a/polaris-plugins/polaris-plugin-api/pom.xml
+++ b/polaris-plugins/polaris-plugin-api/pom.xml
@@ -1,7 +1,7 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
polaris-plugins
com.tencent.polaris
@@ -16,6 +16,11 @@
Polaris Plugin API JAR
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson.version}
+
com.tencent.polaris
polaris-config
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
index 21df8e4f3..c8fcda264 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
@@ -18,9 +18,10 @@
package com.tencent.polaris.api.plugin.common;
import com.tencent.polaris.logging.LoggerFactory;
+import org.slf4j.Logger;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
/**
* 用于主流程传递kv数据的上下文对象,线程安全
@@ -33,8 +34,12 @@ public class ValueContext {
private static final String KEY_HOST = "key_host";
+ private static final String KEY_PORT = "key_port";
+
private static final String KEY_CLIENT_ID = "key_clientId";
+ private static final String KEY_INSTANCE_ID = "key_instanceId";
+
private static final String KEY_ENGINE = "key_engine";
private static final String KEY_SERVER_CONNECTOR_PROTOCOL = "key_serverConnectorProtocol";
@@ -88,6 +93,14 @@ public void setHost(String host) {
setValue(KEY_HOST, host);
}
+ public Integer getPort() {
+ return getValue(KEY_PORT);
+ }
+
+ public void setPort(Integer port) {
+ setValue(KEY_PORT, port);
+ }
+
public String getClientId() {
return getValue(KEY_CLIENT_ID);
}
@@ -96,6 +109,14 @@ public void setClientId(String clientId) {
setValue(KEY_CLIENT_ID, clientId);
}
+ public String getInstanceId() {
+ return getValue(KEY_INSTANCE_ID);
+ }
+
+ public void setInstanceId(String instanceId) {
+ setValue(KEY_INSTANCE_ID, instanceId);
+ }
+
public String getServerConnectorProtocol() {
return getValue(KEY_SERVER_CONNECTOR_PROTOCOL);
}
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
index 348945624..5d5c07673 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
@@ -17,9 +17,11 @@
package com.tencent.polaris.api.plugin.event;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.tencent.polaris.api.pojo.ServiceEventKey;
-import java.time.Instant;
+import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@@ -33,71 +35,98 @@ public class FlowEvent {
/**
* 事件类型
*/
+ @JsonProperty("event_type")
private final ServiceEventKey.EventType eventType;
+ /**
+ * 事件名称
+ */
+ @JsonProperty("event_name")
+ private final FlowEventConstants.EventName eventName;
+
/**
* 时间戳
*/
- private final Instant timestamp;
+ @JsonProperty("event_time")
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss:SSSS")
+ private final LocalDateTime timestamp;
/**
* 客户端ID
*/
+ @JsonProperty("client_id")
private final String clientId;
/**
* 客户端IP
*/
+ @JsonProperty("client_ip")
private final String clientIp;
/**
* 被调命名空间
*/
+ @JsonProperty("namespace")
private final String namespace;
/**
* 被调服务名
*/
+ @JsonProperty("service")
private final String service;
/**
* 接口协议
*/
+ @JsonProperty("api_protocol")
private final String apiProtocol;
/**
* 接口路径
*/
+ @JsonProperty("api_path")
private final String apiPath;
/**
* 接口方法
*/
+ @JsonProperty("api_method")
private final String apiMethod;
+ /**
+ * 实例ID
+ */
+ @JsonProperty("instance_id")
+ private final String instanceId;
+
/**
* 实例IP
*/
+ @JsonProperty("host")
private final String host;
/**
* 实例端口
*/
+ @JsonProperty("port")
private final String port;
/**
* 主调命名空间
*/
+ @JsonProperty("source_namespace")
private final String sourceNamespace;
/**
* 主调服务名
*/
+ @JsonProperty("source_service")
private final String sourceService;
/**
* 主调标签
*/
+ @JsonProperty("labels")
private final String labels;
/**
@@ -123,12 +152,14 @@ public class FlowEvent {
/**
* 状态转换原因
*/
+ @JsonProperty("reason")
private final String reason;
private final Map additionalParams;
private FlowEvent(Builder builder) {
this.eventType = builder.eventType;
+ this.eventName = builder.eventName;
this.timestamp = builder.timestamp;
this.clientId = builder.clientId;
this.clientIp = builder.clientIp;
@@ -137,6 +168,7 @@ private FlowEvent(Builder builder) {
this.apiProtocol = builder.apiProtocol;
this.apiPath = builder.apiPath;
this.apiMethod = builder.apiMethod;
+ this.instanceId = builder.instanceId;
this.host = builder.host;
if (builder.port != null) {
this.port = String.valueOf(builder.port);
@@ -156,7 +188,8 @@ private FlowEvent(Builder builder) {
public static class Builder {
private ServiceEventKey.EventType eventType;
- private Instant timestamp;
+ private FlowEventConstants.EventName eventName;
+ private LocalDateTime timestamp;
private String clientId = "";
private String clientIp = "";
private String namespace = "";
@@ -164,6 +197,7 @@ public static class Builder {
private String apiProtocol = "";
private String apiPath = "";
private String apiMethod = "";
+ private String instanceId = "";
private String host = "";
private Integer port;
private String sourceNamespace = "";
@@ -180,7 +214,12 @@ public Builder withEventType(ServiceEventKey.EventType eventType) {
return this;
}
- public Builder withTimestamp(Instant timestamp) {
+ public Builder withEventName(FlowEventConstants.EventName eventName) {
+ this.eventName = eventName;
+ return this;
+ }
+
+ public Builder withTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
@@ -220,6 +259,11 @@ public Builder withApiMethod(String apiMethod) {
return this;
}
+ public Builder withInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
public Builder withHost(String host) {
this.host = host;
return this;
@@ -279,7 +323,11 @@ public ServiceEventKey.EventType getEventType() {
return eventType;
}
- public Instant getTimestamp() {
+ public FlowEventConstants.EventName getEventName() {
+ return eventName;
+ }
+
+ public LocalDateTime getTimestamp() {
return timestamp;
}
@@ -311,6 +359,10 @@ public String getApiMethod() {
return apiMethod;
}
+ public String getInstanceId() {
+ return instanceId;
+ }
+
public String getHost() {
return host;
}
@@ -359,6 +411,7 @@ public Map getAdditionalParams() {
public String toString() {
return "FlowEvent{" +
"eventType=" + eventType +
+ ", eventName=" + eventName +
", timestamp=" + timestamp +
", clientId='" + clientId + '\'' +
", clientIp='" + clientIp + '\'' +
@@ -367,8 +420,9 @@ public String toString() {
", apiProtocol='" + apiProtocol + '\'' +
", apiPath='" + apiPath + '\'' +
", apiMethod='" + apiMethod + '\'' +
+ ", instanceId='" + instanceId + '\'' +
", host='" + host + '\'' +
- ", port=" + port +
+ ", port='" + port + '\'' +
", sourceNamespace='" + sourceNamespace + '\'' +
", sourceService='" + sourceService + '\'' +
", labels='" + labels + '\'' +
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
index be06aa86c..9df90c2d5 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
@@ -22,6 +22,26 @@
*/
public class FlowEventConstants {
+ public enum EventName {
+ LosslessOnlineStart,
+ LosslessOnlineEnd,
+ LosslessWarmupStart,
+ LosslessWarmupEnd,
+ LosslessOfflineStart,
+
+ InstanceThreadEnd,
+
+ CircuitBreakerOpen,
+ CircuitBreakerHalfOpen,
+ CircuitBreakerClose,
+ CircuitBreakerDestroy,
+
+ RateLimitStart,
+ RateLimitEnd,
+
+ UNKNOWN,
+ }
+
public enum Status {
// circuit breaker status
OPEN,
@@ -47,5 +67,4 @@ public enum ResourceType {
UNKNOWN,
}
-
}
diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
index b9e053b47..268b45065 100644
--- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
+++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
@@ -34,9 +34,9 @@
import com.tencent.polaris.api.pojo.*;
import com.tencent.polaris.api.pojo.CircuitBreakerStatus.FallbackInfo;
import com.tencent.polaris.api.pojo.CircuitBreakerStatus.Status;
-import com.tencent.polaris.api.utils.TrieUtil;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.api.utils.TrieUtil;
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.ConsecutiveCounter;
@@ -48,7 +48,7 @@
import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto.*;
import org.slf4j.Logger;
-import java.time.Instant;
+import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -339,17 +339,23 @@ private void reportEvent(CircuitBreakerStatus.Status previousStatus, CircuitBrea
if (extensions == null) {
return;
}
+
+ FlowEventConstants.Status currentFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(currentStatus);
+ FlowEventConstants.Status previousFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(previousStatus);
+
FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
.withEventType(ServiceEventKey.EventType.CIRCUIT_BREAKING)
- .withTimestamp(Instant.now())
+ .withEventName(CircuitBreakerUtils.parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus))
+ .withTimestamp(LocalDateTime.now())
.withClientId(extensions.getValueContext().getClientId())
.withClientIp(extensions.getValueContext().getHost())
.withNamespace(resource.getService().getNamespace())
.withService(resource.getService().getService())
+ .withInstanceId(extensions.getValueContext().getInstanceId())
.withSourceNamespace(resource.getCallerService().getNamespace())
.withSourceService(resource.getService().getService())
- .withCurrentStatus(CircuitBreakerUtils.parseFlowEventStatus(currentStatus))
- .withPreviousStatus(CircuitBreakerUtils.parseFlowEventStatus(previousStatus))
+ .withCurrentStatus(currentFlowEventStatus)
+ .withPreviousStatus(previousFlowEventStatus)
.withRuleName(ruleName);
if (StringUtils.isNotBlank(reason)) {
flowEventBuilder.withReason(reason);
diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
index 323f39529..552cafdd9 100644
--- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
+++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
@@ -36,7 +36,7 @@ public class CircuitBreakerUtils {
public static long DEFAULT_ERROR_RATE_INTERVAL_MS = 60 * 1000;
- public static long MIN_CLEANUP_INTERVAL = 60 * 1000;
+ public static long MIN_CLEANUP_INTERVAL = 60 * 1000;
public static boolean checkRule(CircuitBreakerProto.CircuitBreakerRule rule) {
return checkLevel(rule.getLevel());
@@ -110,6 +110,20 @@ public static FlowEventConstants.Status parseFlowEventStatus(CircuitBreakerStatu
}
}
+ public static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) {
+ if (currentStatus == FlowEventConstants.Status.OPEN) {
+ return FlowEventConstants.EventName.CircuitBreakerOpen;
+ } else if (currentStatus == FlowEventConstants.Status.HALF_OPEN) {
+ return FlowEventConstants.EventName.CircuitBreakerHalfOpen;
+ } else if (currentStatus == FlowEventConstants.Status.CLOSE) {
+ return FlowEventConstants.EventName.CircuitBreakerClose;
+ } else if (currentStatus == FlowEventConstants.Status.DESTROY) {
+ return FlowEventConstants.EventName.CircuitBreakerDestroy;
+ } else {
+ return FlowEventConstants.EventName.UNKNOWN;
+ }
+ }
+
public static String getApiCircuitBreakerName(String targetNamespaceId, String serviceName, String path, String method) {
return targetNamespaceId + "#" + serviceName + "#" + getFormatApi(path, method);
}
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java
new file mode 100644
index 000000000..11373c26d
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugin.lossless.common;
+
+import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
+import com.tencent.polaris.api.pojo.ServiceEventKey;
+import com.tencent.polaris.client.flow.BaseFlow;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author Haotian Zhang
+ */
+public class LosslessEventUtils {
+
+ public static void reportEvent(Extensions extensions, String namespace, String service, String host,
+ int port, FlowEventConstants.EventName eventName) {
+ if (extensions == null) {
+ return;
+ }
+
+ FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
+ .withEventType(ServiceEventKey.EventType.LOSSLESS)
+ .withEventName(eventName)
+ .withTimestamp(LocalDateTime.now())
+ .withClientId(extensions.getValueContext().getClientId())
+ .withClientIp(extensions.getValueContext().getHost())
+ .withNamespace(namespace)
+ .withService(service)
+ .withInstanceId(extensions.getValueContext().getInstanceId())
+ .withHost(host)
+ .withPort(port);
+
+ FlowEvent flowEvent = flowEventBuilder.build();
+ BaseFlow.reportFlowEvent(extensions, flowEvent);
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
index 3d0517056..86c7142ae 100644
--- a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
@@ -27,6 +27,7 @@
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.common.ValueContext;
import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
import com.tencent.polaris.api.plugin.lossless.InstanceProperties;
import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider;
import com.tencent.polaris.api.plugin.lossless.LosslessPolicy;
@@ -37,19 +38,14 @@
import com.tencent.polaris.client.util.HttpServerUtils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.logging.LoggingConsts;
+import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils;
import com.tencent.polaris.plugin.lossless.common.LosslessUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class DeregisterLosslessPolicy implements LosslessPolicy, HttpServerAware {
@@ -197,6 +193,8 @@ private void doLosslessDeregister(BaseInstance instance, LosslessActionProvider
event.setBaseInstance(instance);
event.setEventName(EVENT_LOSSLESS_DEREGISTER);
EVENT_LOG.info(event.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOfflineStart);
}
private boolean isLosslessOfflineEnable(BaseInstance instance) {
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
index 18ea0c029..1d8aed604 100644
--- a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
@@ -27,6 +27,7 @@
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.common.ValueContext;
import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
import com.tencent.polaris.api.plugin.lossless.InstanceProperties;
import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider;
import com.tencent.polaris.api.plugin.lossless.LosslessPolicy;
@@ -38,6 +39,7 @@
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.logging.LoggingConsts;
+import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils;
import com.tencent.polaris.plugin.lossless.common.LosslessUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto;
import org.slf4j.Logger;
@@ -181,6 +183,8 @@ private void doLosslessRegister(
}
LOG.info("[HealthCheckRegisterLosslessPolicy] do losslessRegister for instance {}", instance);
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOnlineStart);
if (!losslessActionProvider.isEnableHealthCheck()) {
long delayRegisterInterval = getDelayRegisterInterval(instance);
LOG.info("[HealthCheckRegisterLosslessPolicy] health check disabled, start lossless register after {}ms, plugin {}",
@@ -216,6 +220,8 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc
} else {
EVENT_LOG.info(new Event(clientId, instance, EVENT_DIRECT_REGISTER).toString());
}
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOnlineEnd);
int warmupInterval = getWarmupInterval(instance);
if (warmupInterval > 0) {
@@ -226,11 +232,15 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc
Event warmupStartEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_START);
EVENT_LOG.info(warmupStartEvent.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessWarmupStart);
healthCheckExecutor.schedule(() -> {
LOG.info("[HealthCheckRegisterLosslessPolicy] warmup for instance {} finished", instance);
Event warmupEndEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_END);
EVENT_LOG.info(warmupEndEvent.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessWarmupEnd);
}, warmupInterval, TimeUnit.MILLISECONDS);
} else {
LOG.info("[HealthCheckRegisterLosslessPolicy] no warmup for instance {}", instance);
diff --git a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
index f10d27d39..f81046223 100644
--- a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
+++ b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
@@ -95,6 +95,11 @@ public String convertMessage(FlowEvent event) {
eventType = event.getEventType().name();
}
+ String eventName = "";
+ if (event.getEventName() != null) {
+ eventName = event.getEventName().name();
+ }
+
String currentStatus = "";
if (event.getCurrentStatus() != null) {
currentStatus = event.getCurrentStatus().name();
@@ -109,11 +114,12 @@ public String convertMessage(FlowEvent event) {
if (event.getResourceType() != null) {
resourceType = event.getResourceType().name();
}
- return eventType + "|" + formattedDateTime + "|" + event.getClientId() + "|"
+ return eventType + "|" + eventName + "|" + formattedDateTime + "|" + event.getClientId() + "|"
+ event.getClientIp() + "|" + event.getNamespace() + "|" + event.getService() + "|"
+ event.getApiProtocol() + "|" + event.getApiPath() + "|" + event.getApiMethod() + "|"
- + event.getHost() + "|" + event.getPort() + "|" + event.getSourceNamespace() + "|"
- + event.getSourceService() + "|" + event.getLabels() + "|" + currentStatus + "|" + previousStatus + "|"
- + resourceType + "|" + event.getRuleName() + "|" + event.getReason();
+ + event.getInstanceId() + "|" + event.getHost() + "|" + event.getPort() + "|"
+ + event.getSourceNamespace() + "|" + event.getSourceService() + "|" + event.getLabels() + "|"
+ + currentStatus + "|" + previousStatus + "|" + resourceType + "|" + event.getRuleName() + "|"
+ + event.getReason();
}
}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml
new file mode 100644
index 000000000..1b3fbf539
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+ polaris-plugins-observability
+ com.tencent.polaris
+ ${revision}
+ ../pom.xml
+
+
+ event-pushgateway
+ Polaris Plugins Observability Push Gateway Event Reporter
+ Polaris Plugins Observability Push Gateway Event Reporter JAR
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+
+ com.google.code.gson
+ gson
+ ${gson.version}
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java
new file mode 100644
index 000000000..d302bca25
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java
@@ -0,0 +1,231 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.tencent.polaris.api.config.global.EventReporterConfig;
+import com.tencent.polaris.api.config.plugin.DefaultPlugins;
+import com.tencent.polaris.api.config.plugin.PluginConfigProvider;
+import com.tencent.polaris.api.config.verify.Verifier;
+import com.tencent.polaris.api.exception.PolarisException;
+import com.tencent.polaris.api.plugin.PluginType;
+import com.tencent.polaris.api.plugin.common.InitContext;
+import com.tencent.polaris.api.plugin.common.PluginTypes;
+import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.EventReporter;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.utils.CollectionUtils;
+import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.api.utils.ThreadPoolUtils;
+import com.tencent.polaris.client.util.NamedThreadFactory;
+import com.tencent.polaris.logging.LoggerFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Polaris push gateway event reporter.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventReporter implements EventReporter, PluginConfigProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PushGatewayEventReporter.class);
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private BlockingQueue eventQueue;
+
+ private volatile boolean init = true;
+
+ private PushGatewayEventReporterConfig config;
+
+ private URI eventUri;
+
+ private final ScheduledExecutorService eventExecutors = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory("event-pushgateway"));
+
+ @Override
+ public boolean isEnabled() {
+ return config.isEnable();
+ }
+
+ @Override
+ public boolean reportEvent(FlowEvent flowEvent) {
+ if (eventUri == null) {
+ LOG.warn("build event request url fail, can not sent event.");
+ return false;
+ }
+ // 如果满了就抛出异常
+ try {
+ eventQueue.add(flowEvent);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add push gateway event to event queue: {}", flowEvent);
+ }
+ } catch (Throwable throwable) {
+ LOG.warn("Event queue is full. Log this event and drop it. event={}, error={}.",
+ flowEvent, throwable.getMessage(), throwable);
+ }
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return DefaultPlugins.PUSH_GATEWAY_EVENT_REPORTER_TYPE;
+ }
+
+ @Override
+ public Class extends Verifier> getPluginConfigClazz() {
+ return PushGatewayEventReporterConfig.class;
+ }
+
+ @Override
+ public PluginType getType() {
+ return PluginTypes.EVENT_REPORTER.getBaseType();
+ }
+
+ @Override
+ public void init(InitContext ctx) throws PolarisException {
+ EventReporterConfig eventReporterConfig = ctx.getConfig().getGlobal().getEventReporter();
+ if (eventReporterConfig != null && CollectionUtils.isNotEmpty(eventReporterConfig.getReporters())) {
+ for (String reporter : eventReporterConfig.getReporters()) {
+ if (StringUtils.equals(getName(), reporter)) {
+ this.config = ctx.getConfig().getGlobal().getEventReporter()
+ .getPluginConfig(getName(), PushGatewayEventReporterConfig.class);
+ if (config.isEnable()) {
+ init = false;
+ }
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void postContextInit(Extensions ctx) throws PolarisException {
+ if (!init) {
+ synchronized (this) {
+ if (!init) {
+ init = true;
+ try {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ eventQueue = new LinkedBlockingQueue<>(config.getEventQueueSize());
+
+ // 分割主机名和端口号
+ if (StringUtils.isBlank(config.getAddress())) {
+ throw new RuntimeException("PushGateway address is empty.");
+ }
+ String[] parts = config.getAddress().split(":");
+ if (parts.length != 2) {
+ throw new RuntimeException(String.format("PushGateway address %s format error.", config.getAddress()));
+ }
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+ eventUri = new URIBuilder()
+ .setHost(host)
+ .setPort(port)
+ .setScheme("http")
+ .setPath("/polaris/client/events")
+ .build();
+ LOG.info("PushGateway event reporter init with uri: {}", eventUri);
+
+ eventExecutors.scheduleWithFixedDelay(new PushGatewayEventTask(), 1000, 1000, TimeUnit.MILLISECONDS);
+ LOG.info("PushGateway event reporter starts reporting task.");
+ } catch (URISyntaxException e) {
+ LOG.error("Build event request url fail.", e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{eventExecutors});
+ }
+
+ class PushGatewayEventTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ // 每次把eventQueue发空结束
+ while (CollectionUtils.isNotEmpty(eventQueue)) {
+ List eventDataList = new ArrayList<>();
+ PushGatewayEventRequest request = new PushGatewayEventRequest();
+
+ eventQueue.drainTo(eventDataList, config.getMaxBatchSize());
+ request.setBatch(eventDataList);
+
+ postPushGatewayEvent(request);
+ }
+ } catch (Throwable e) {
+ LOG.warn("Push gateway event reporter task fail.", e);
+ }
+ }
+
+ private void postPushGatewayEvent(PushGatewayEventRequest request) {
+ StringEntity postBody = null;
+ RequestConfig config = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(10000).setSocketTimeout(10000).build();
+ try (CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build()) {
+ HttpPost httpPost = new HttpPost(eventUri);
+ postBody = new StringEntity(mapper.writeValueAsString(request));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postPushGatewayEvent body:{}", postBody);
+ }
+ httpPost.setEntity(postBody);
+ httpPost.setHeader("Content-Type", "application/json");
+ HttpResponse httpResponse;
+
+ httpResponse = httpClient.execute(httpPost);
+
+ if (200 != httpResponse.getStatusLine().getStatusCode()) {
+ String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
+ throw new RuntimeException("Report push gateway event failed. Response = [" + resultString + "].");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
+ LOG.info("Report push gateway event success. Response is : {}", resultString);
+ } else {
+ LOG.info("Report push gateway event success.");
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Report push gateway event failed, postBody:{}.", postBody, e);
+ }
+ }
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java
new file mode 100644
index 000000000..94ea9d519
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.tencent.polaris.api.config.verify.Verifier;
+import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.factory.util.ConfigUtils;
+
+/**
+ * Polaris push gateway event reporter config.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventReporterConfig implements Verifier {
+
+ @JsonProperty
+ private Boolean enable;
+
+ @JsonProperty
+ private String address;
+
+ @JsonProperty
+ private Integer eventQueueSize = 1000;
+
+ @JsonProperty
+ private Integer maxBatchSize = 100;
+
+ @Override
+ public void verify() {
+ ConfigUtils.validateNull(enable, "global.eventReporter.plugin.pushgateway.enable");
+ if (!enable) {
+ return;
+ }
+ ConfigUtils.validateString(address, "global.eventReporter.plugin.pushgateway.address");
+ }
+
+ @Override
+ public void setDefault(Object defaultObject) {
+ if (defaultObject instanceof PushGatewayEventReporterConfig) {
+ PushGatewayEventReporterConfig pushGatewayEventReporterConfig = (PushGatewayEventReporterConfig) defaultObject;
+ if (null == enable) {
+ setEnable(pushGatewayEventReporterConfig.isEnable());
+ }
+ if (StringUtils.isBlank(address)) {
+ setAddress(pushGatewayEventReporterConfig.getAddress());
+ }
+ }
+ }
+
+ public boolean isEnable() {
+ if (null == enable) {
+ enable = false;
+ }
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public Integer getEventQueueSize() {
+ return eventQueueSize;
+ }
+
+ public void setEventQueueSize(Integer eventQueueSize) {
+ this.eventQueueSize = eventQueueSize;
+ }
+
+ public Integer getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public void setMaxBatchSize(Integer maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java
new file mode 100644
index 000000000..7a5a34ab1
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Polaris push gateway event request.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventRequest {
+
+ private List batch = new ArrayList<>();
+
+ public List getBatch() {
+ return batch;
+ }
+
+ public void setBatch(List batch) {
+ this.batch = batch;
+ }
+
+ @Override
+ public String toString() {
+ return "PushGatewayEventRequest{" +
+ "batch=" + batch +
+ '}';
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java
new file mode 100644
index 000000000..0678d93e1
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+/**
+ * Polaris push gateway event response.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventResponse {
+
+ private String requestId;
+
+ private int code;
+
+ private String info;
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getInfo() {
+ return info;
+ }
+
+ public void setInfo(String info) {
+ this.info = info;
+ }
+
+ @Override
+ public String toString() {
+ return "PushGatewayEventResponse{" +
+ "requestId='" + requestId + '\'' +
+ ", code=" + code +
+ ", info='" + info + '\'' +
+ '}';
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter
new file mode 100644
index 000000000..55a65137a
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter
@@ -0,0 +1 @@
+com.tencent.polaris.plugins.event.pushgateway.PushGatewayEventReporter
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java
new file mode 100644
index 000000000..95e0fa694
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java
@@ -0,0 +1,67 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.pojo.ServiceEventKey;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+
+import static com.tencent.polaris.api.plugin.event.FlowEventConstants.EventName.LosslessOnlineStart;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for {@link PushGatewayEventRequest}.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventRequestTest {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Test
+ public void test() throws JsonProcessingException {
+ FlowEvent flowEvent = new FlowEvent.Builder()
+ .withEventType(ServiceEventKey.EventType.LOSSLESS)
+ .withEventName(LosslessOnlineStart)
+ .withTimestamp(LocalDateTime.of(2025, 1, 13, 11, 58, 42))
+ .withClientId("test-client")
+ .withClientIp("1.2.3.4")
+ .withNamespace("test-namespace")
+ .withService("test-service")
+ .withInstanceId("test-instance")
+ .withHost("1.2.3.4")
+ .withPort(8080)
+ .build();
+
+ PushGatewayEventRequest request = new PushGatewayEventRequest();
+ request.getBatch().add(flowEvent);
+
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ assertThat(mapper.writeValueAsString(request)).isEqualTo("{\"batch\":[{\"currentStatus\":null,\"previousStatus\":null,\"resourceType\":null,\"ruleName\":\"\",\"additionalParams\":{},\"event_type\":\"LOSSLESS\",\"event_name\":\"LosslessOnlineStart\",\"event_time\":\"2025-01-13 11:58:42:0000\",\"client_id\":\"test-client\",\"client_ip\":\"1.2.3.4\",\"namespace\":\"test-namespace\",\"service\":\"test-service\",\"api_protocol\":\"\",\"api_path\":\"\",\"api_method\":\"\",\"instance_id\":\"test-instance\",\"host\":\"1.2.3.4\",\"port\":\"8080\",\"source_namespace\":\"\",\"source_service\":\"\",\"labels\":\"\",\"reason\":\"\"}]}");
+ }
+}
\ No newline at end of file
diff --git a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
index e111343a5..bf985c879 100644
--- a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
+++ b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
@@ -57,6 +57,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -115,7 +116,7 @@ private boolean reportV1Event(FlowEvent flowEvent) {
}
TsfEventData eventData = new TsfEventData();
- eventData.setOccurTime(flowEvent.getTimestamp().getEpochSecond());
+ eventData.setOccurTime(flowEvent.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().getEpochSecond());
eventData.setEventName(TsfEventDataUtils.convertEventName(flowEvent));
Byte status = TsfEventDataUtils.convertStatus(flowEvent);
if (status == null || status == -1) {
diff --git a/polaris-plugins/polaris-plugins-observability/pom.xml b/polaris-plugins/polaris-plugins-observability/pom.xml
index 565759f03..0b57b24b6 100644
--- a/polaris-plugins/polaris-plugins-observability/pom.xml
+++ b/polaris-plugins/polaris-plugins-observability/pom.xml
@@ -21,6 +21,7 @@
trace-otel
event-logger
event-tsf
+ event-pushgateway
diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
index 91f59031a..6a610888f 100644
--- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
+++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
@@ -27,7 +27,7 @@
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto;
-import java.time.Instant;
+import java.time.LocalDateTime;
import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_DETAIL_KEY;
import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_ID_KEY;
@@ -43,23 +43,29 @@ public static void reportEvent(Extensions extensions, ServiceKey serviceKey, Rat
if (extensions == null) {
return;
}
+
+ FlowEventConstants.Status currentFlowEventStatus = parseFlowEventStatus(currentCode);
+ FlowEventConstants.Status previousFlowEventStatus = parseFlowEventStatus(previousCode);
+
FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
.withEventType(ServiceEventKey.EventType.RATE_LIMITING)
- .withTimestamp(Instant.now())
+ .withEventName(parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus))
+ .withTimestamp(LocalDateTime.now())
.withClientId(extensions.getValueContext().getClientId())
.withClientIp(extensions.getValueContext().getHost())
.withNamespace(serviceKey.getNamespace())
.withService(serviceKey.getService())
.withApiPath(rule.getMethod().getValue().getValue())
+ .withInstanceId(extensions.getValueContext().getInstanceId())
.withHost(extensions.getValueContext().getHost())
+ .withPort(extensions.getValueContext().getPort())
.withSourceNamespace(sourceNamespace)
.withSourceService(sourceService)
.withLabels(labels)
- .withCurrentStatus(parseFlowEventStatus(currentCode))
- .withPreviousStatus(parseFlowEventStatus(previousCode))
+ .withCurrentStatus(currentFlowEventStatus)
+ .withPreviousStatus(previousFlowEventStatus)
.withResourceType(parseFlowEventResourceType(rule.getResource()))
- .withRuleName(rule.getName().getValue())
- .withReason(reason);
+ .withRuleName(rule.getName().getValue());
if (StringUtils.isNotBlank(reason)) {
flowEventBuilder.withReason(reason);
}
@@ -92,6 +98,16 @@ private static FlowEventConstants.Status parseFlowEventStatus(QuotaResult.Code c
}
}
+ private static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) {
+ if (currentStatus == FlowEventConstants.Status.LIMITED && previousStatus == FlowEventConstants.Status.UNLIMITED) {
+ return FlowEventConstants.EventName.RateLimitStart;
+ } else if (currentStatus == FlowEventConstants.Status.UNLIMITED && previousStatus == FlowEventConstants.Status.LIMITED) {
+ return FlowEventConstants.EventName.RateLimitEnd;
+ } else {
+ return FlowEventConstants.EventName.UNKNOWN;
+ }
+ }
+
private static FlowEventConstants.ResourceType parseFlowEventResourceType(RateLimitProto.Rule.Resource resource) {
switch (resource) {
case QPS:
diff --git a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
index 10f673240..eaaa113b2 100644
--- a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
+++ b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
@@ -118,6 +118,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/pom.xml b/pom.xml
index 334c3af2a..e5320c38f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
- 2.0.0.1
+ 2.0.1.0-SNAPSHOT
${maven.build.timestamp}
false
From 7821cd59fed322bc25f3108e46e4b9b5c4f77e95 Mon Sep 17 00:00:00 2001
From: Haotian Zhang <928016560@qq.com>
Date: Thu, 13 Feb 2025 16:36:45 +0800
Subject: [PATCH 2/2] remove instance id.
---
.../client/flow/DefaultDiscoveryFlow.java | 2 -
.../api/plugin/common/ValueContext.java | 20 ---
.../polaris/api/plugin/event/FlowEvent.java | 18 ---
.../composite/ResourceCounters.java | 108 ++--------------
.../utils/CircuitBreakerEventUtils.java | 122 ++++++++++++++++++
.../lossless/common/LosslessEventUtils.java | 1 -
.../event/logger/LoggerEventReporter.java | 7 +-
.../PushGatewayEventRequestTest.java | 3 +-
.../client/utils/RateLimiterEventUtils.java | 2 -
9 files changed, 139 insertions(+), 144 deletions(-)
create mode 100644 polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerEventUtils.java
diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
index f035ae245..91af95a36 100644
--- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
+++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/DefaultDiscoveryFlow.java
@@ -153,8 +153,6 @@ private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Map