diff --git a/polaris-assembly/polaris-assembly-factory/pom.xml b/polaris-assembly/polaris-assembly-factory/pom.xml
index b962e1545..dd6ef9cbe 100644
--- a/polaris-assembly/polaris-assembly-factory/pom.xml
+++ b/polaris-assembly/polaris-assembly-factory/pom.xml
@@ -89,6 +89,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
index 30bd2bfa4..959baef7a 100644
--- a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
+++ b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
@@ -111,6 +111,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
index 396042b68..38549d5ec 100644
--- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
+++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml
@@ -92,6 +92,14 @@ global:
tsf:
# 描述:TSF 事件上报开关
enable: false
+ # 描述:PushGateway 事件上报插件配置
+ pushgateway:
+ # 描述:PushGateway 事件上报开关
+ enable: false
+ # 描述:PushGateway 事件上报队列长度
+ eventQueueSize: 10000;
+ # 描述:PushGateway 事件上报最大批量大小
+ maxBatchSize: 100
# 描述:Admin相关的配置
admin:
# 描述:Admin的监听的IP
diff --git a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
index e6dbb19dd..c774cc29f 100644
--- a/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
+++ b/polaris-common/polaris-config/src/main/java/com/tencent/polaris/api/config/plugin/DefaultPlugins.java
@@ -90,6 +90,11 @@ public interface DefaultPlugins {
*/
String TSF_EVENT_REPORTER_TYPE = "tsf";
+ /**
+ * PushGateway 事件上报插件名
+ */
+ String PUSH_GATEWAY_EVENT_REPORTER_TYPE = "pushgateway";
+
/**
* 黑白名单鉴权插件名
*/
diff --git a/polaris-configuration/polaris-configuration-factory/pom.xml b/polaris-configuration/polaris-configuration-factory/pom.xml
index 88fd8f59c..8e0e80e5d 100644
--- a/polaris-configuration/polaris-configuration-factory/pom.xml
+++ b/polaris-configuration/polaris-configuration-factory/pom.xml
@@ -131,6 +131,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/polaris-dependencies/pom.xml b/polaris-dependencies/pom.xml
index ea89e61cd..720be65e6 100644
--- a/polaris-dependencies/pom.xml
+++ b/polaris-dependencies/pom.xml
@@ -329,6 +329,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/polaris-discovery/polaris-discovery-factory/pom.xml b/polaris-discovery/polaris-discovery-factory/pom.xml
index 29b7b47b3..77cbec49f 100644
--- a/polaris-discovery/polaris-discovery-factory/pom.xml
+++ b/polaris-discovery/polaris-discovery-factory/pom.xml
@@ -188,6 +188,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
diff --git a/polaris-distribution/polaris-all/pom.xml b/polaris-distribution/polaris-all/pom.xml
index 8728d81c8..f933d827c 100644
--- a/polaris-distribution/polaris-all/pom.xml
+++ b/polaris-distribution/polaris-all/pom.xml
@@ -53,6 +53,11 @@
bcpkix-jdk15to18
${bouncycastle.version}
+
+ io.opentelemetry
+ opentelemetry-api
+ ${otel.version}
+
@@ -180,6 +185,7 @@
org.bouncycastle:*
+ io.opentelemetry:*
diff --git a/polaris-plugins/polaris-plugin-api/pom.xml b/polaris-plugins/polaris-plugin-api/pom.xml
index 940dd89bf..92791a7e7 100644
--- a/polaris-plugins/polaris-plugin-api/pom.xml
+++ b/polaris-plugins/polaris-plugin-api/pom.xml
@@ -1,7 +1,7 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
polaris-plugins
com.tencent.polaris
@@ -16,6 +16,11 @@
Polaris Plugin API JAR
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson.version}
+
com.tencent.polaris
polaris-config
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
index 21df8e4f3..a7eb664fc 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/common/ValueContext.java
@@ -18,9 +18,10 @@
package com.tencent.polaris.api.plugin.common;
import com.tencent.polaris.logging.LoggerFactory;
+import org.slf4j.Logger;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
/**
* 用于主流程传递kv数据的上下文对象,线程安全
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
index 348945624..40261c59e 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEvent.java
@@ -17,9 +17,11 @@
package com.tencent.polaris.api.plugin.event;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.tencent.polaris.api.pojo.ServiceEventKey;
-import java.time.Instant;
+import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@@ -33,71 +35,92 @@ public class FlowEvent {
/**
* 事件类型
*/
+ @JsonProperty("event_type")
private final ServiceEventKey.EventType eventType;
+ /**
+ * 事件名称
+ */
+ @JsonProperty("event_name")
+ private final FlowEventConstants.EventName eventName;
+
/**
* 时间戳
*/
- private final Instant timestamp;
+ @JsonProperty("event_time")
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss:SSSS")
+ private final LocalDateTime timestamp;
/**
* 客户端ID
*/
+ @JsonProperty("client_id")
private final String clientId;
/**
* 客户端IP
*/
+ @JsonProperty("client_ip")
private final String clientIp;
/**
* 被调命名空间
*/
+ @JsonProperty("namespace")
private final String namespace;
/**
* 被调服务名
*/
+ @JsonProperty("service")
private final String service;
/**
* 接口协议
*/
+ @JsonProperty("api_protocol")
private final String apiProtocol;
/**
* 接口路径
*/
+ @JsonProperty("api_path")
private final String apiPath;
/**
* 接口方法
*/
+ @JsonProperty("api_method")
private final String apiMethod;
/**
* 实例IP
*/
+ @JsonProperty("host")
private final String host;
/**
* 实例端口
*/
+ @JsonProperty("port")
private final String port;
/**
* 主调命名空间
*/
+ @JsonProperty("source_namespace")
private final String sourceNamespace;
/**
* 主调服务名
*/
+ @JsonProperty("source_service")
private final String sourceService;
/**
* 主调标签
*/
+ @JsonProperty("labels")
private final String labels;
/**
@@ -123,12 +146,14 @@ public class FlowEvent {
/**
* 状态转换原因
*/
+ @JsonProperty("reason")
private final String reason;
private final Map additionalParams;
private FlowEvent(Builder builder) {
this.eventType = builder.eventType;
+ this.eventName = builder.eventName;
this.timestamp = builder.timestamp;
this.clientId = builder.clientId;
this.clientIp = builder.clientIp;
@@ -156,7 +181,8 @@ private FlowEvent(Builder builder) {
public static class Builder {
private ServiceEventKey.EventType eventType;
- private Instant timestamp;
+ private FlowEventConstants.EventName eventName;
+ private LocalDateTime timestamp;
private String clientId = "";
private String clientIp = "";
private String namespace = "";
@@ -180,7 +206,12 @@ public Builder withEventType(ServiceEventKey.EventType eventType) {
return this;
}
- public Builder withTimestamp(Instant timestamp) {
+ public Builder withEventName(FlowEventConstants.EventName eventName) {
+ this.eventName = eventName;
+ return this;
+ }
+
+ public Builder withTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
@@ -279,7 +310,11 @@ public ServiceEventKey.EventType getEventType() {
return eventType;
}
- public Instant getTimestamp() {
+ public FlowEventConstants.EventName getEventName() {
+ return eventName;
+ }
+
+ public LocalDateTime getTimestamp() {
return timestamp;
}
@@ -359,6 +394,7 @@ public Map getAdditionalParams() {
public String toString() {
return "FlowEvent{" +
"eventType=" + eventType +
+ ", eventName=" + eventName +
", timestamp=" + timestamp +
", clientId='" + clientId + '\'' +
", clientIp='" + clientIp + '\'' +
@@ -368,7 +404,7 @@ public String toString() {
", apiPath='" + apiPath + '\'' +
", apiMethod='" + apiMethod + '\'' +
", host='" + host + '\'' +
- ", port=" + port +
+ ", port='" + port + '\'' +
", sourceNamespace='" + sourceNamespace + '\'' +
", sourceService='" + sourceService + '\'' +
", labels='" + labels + '\'' +
diff --git a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
index be06aa86c..9df90c2d5 100644
--- a/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
+++ b/polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/event/FlowEventConstants.java
@@ -22,6 +22,26 @@
*/
public class FlowEventConstants {
+ public enum EventName {
+ LosslessOnlineStart,
+ LosslessOnlineEnd,
+ LosslessWarmupStart,
+ LosslessWarmupEnd,
+ LosslessOfflineStart,
+
+ InstanceThreadEnd,
+
+ CircuitBreakerOpen,
+ CircuitBreakerHalfOpen,
+ CircuitBreakerClose,
+ CircuitBreakerDestroy,
+
+ RateLimitStart,
+ RateLimitEnd,
+
+ UNKNOWN,
+ }
+
public enum Status {
// circuit breaker status
OPEN,
@@ -47,5 +67,4 @@ public enum ResourceType {
UNKNOWN,
}
-
}
diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
index b9e053b47..b9b1d93b9 100644
--- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
+++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/ResourceCounters.java
@@ -26,29 +26,28 @@
import com.tencent.polaris.api.plugin.circuitbreaker.entity.Resource;
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.compose.Extensions;
-import com.tencent.polaris.api.plugin.event.FlowEvent;
-import com.tencent.polaris.api.plugin.event.FlowEventConstants;
import com.tencent.polaris.api.plugin.stat.DefaultCircuitBreakResult;
import com.tencent.polaris.api.plugin.stat.StatInfo;
import com.tencent.polaris.api.plugin.stat.StatReporter;
-import com.tencent.polaris.api.pojo.*;
+import com.tencent.polaris.api.pojo.CircuitBreakerStatus;
import com.tencent.polaris.api.pojo.CircuitBreakerStatus.FallbackInfo;
import com.tencent.polaris.api.pojo.CircuitBreakerStatus.Status;
-import com.tencent.polaris.api.utils.TrieUtil;
-import com.tencent.polaris.api.utils.CollectionUtils;
+import com.tencent.polaris.api.pojo.HalfOpenStatus;
+import com.tencent.polaris.api.pojo.RetStatus;
+import com.tencent.polaris.api.pojo.TrieNode;
import com.tencent.polaris.api.utils.StringUtils;
-import com.tencent.polaris.client.flow.BaseFlow;
+import com.tencent.polaris.api.utils.TrieUtil;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.ConsecutiveCounter;
import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.CounterOptions;
import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.ErrRateCounter;
import com.tencent.polaris.plugins.circuitbreaker.composite.trigger.TriggerCounter;
+import com.tencent.polaris.plugins.circuitbreaker.composite.utils.CircuitBreakerEventUtils;
import com.tencent.polaris.plugins.circuitbreaker.composite.utils.CircuitBreakerUtils;
import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto;
import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto.*;
import org.slf4j.Logger;
-import java.time.Instant;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -58,7 +57,6 @@
import java.util.regex.Pattern;
import static com.tencent.polaris.api.plugin.cache.CacheConstants.API_ID;
-import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.*;
import static com.tencent.polaris.logging.LoggingConsts.LOGGING_CIRCUIT_BREAKER;
/**
@@ -204,7 +202,8 @@ private void toOpen(CircuitBreakerStatus preStatus, String circuitBreaker, Strin
CB_LOG.info("previous status {}, current status {}, resource {}, rule {}", preStatus.getStatus(),
newStatus.getStatus(), resource, circuitBreaker);
reportCircuitStatus();
- reportEvent(preStatus.getStatus(), newStatus.getStatus(), circuitBreaker, reason);
+ CircuitBreakerEventUtils.reportEvent(extensions, resource, currentActiveRule,
+ preStatus.getStatus(), newStatus.getStatus(), circuitBreaker, reason);
long sleepWindow = CircuitBreakerUtils.getSleepWindowMilli(currentActiveRule, circuitBreakerConfig);
// add callback after timeout
stateChangeExecutors.schedule(new Runnable() {
@@ -233,7 +232,8 @@ public void openToHalfOpen() {
circuitBreakerStatus.getStatus(),
halfOpenStatus.getStatus(), resource, circuitBreakerStatus.getCircuitBreaker());
circuitBreakerStatusReference.set(halfOpenStatus);
- reportEvent(circuitBreakerStatus.getStatus(), halfOpenStatus.getStatus(), circuitBreakerStatus.getCircuitBreaker());
+ CircuitBreakerEventUtils.reportEvent(extensions, resource, currentActiveRule,
+ circuitBreakerStatus.getStatus(), halfOpenStatus.getStatus(), circuitBreakerStatus.getCircuitBreaker(), null);
reportCircuitStatus();
}
}
@@ -256,7 +256,8 @@ public void halfOpenToClose() {
for (TriggerCounter triggerCounter : counters) {
triggerCounter.resume();
}
- reportEvent(circuitBreakerStatus.getStatus(), newStatus.getStatus(), circuitBreakerStatus.getCircuitBreaker());
+ CircuitBreakerEventUtils.reportEvent(extensions, resource, currentActiveRule,
+ circuitBreakerStatus.getStatus(), newStatus.getStatus(), circuitBreakerStatus.getCircuitBreaker(), null);
reportCircuitStatus();
}
}
@@ -330,84 +331,6 @@ public CircuitBreakerStatus getCircuitBreakerStatus() {
return circuitBreakerStatusReference.get();
}
- private void reportEvent(CircuitBreakerStatus.Status previousStatus, CircuitBreakerStatus.Status currentStatus, String ruleName) {
- reportEvent(previousStatus, currentStatus, ruleName, null);
- }
-
- private void reportEvent(CircuitBreakerStatus.Status previousStatus, CircuitBreakerStatus.Status currentStatus,
- String ruleName, String reason) {
- if (extensions == null) {
- return;
- }
- FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
- .withEventType(ServiceEventKey.EventType.CIRCUIT_BREAKING)
- .withTimestamp(Instant.now())
- .withClientId(extensions.getValueContext().getClientId())
- .withClientIp(extensions.getValueContext().getHost())
- .withNamespace(resource.getService().getNamespace())
- .withService(resource.getService().getService())
- .withSourceNamespace(resource.getCallerService().getNamespace())
- .withSourceService(resource.getService().getService())
- .withCurrentStatus(CircuitBreakerUtils.parseFlowEventStatus(currentStatus))
- .withPreviousStatus(CircuitBreakerUtils.parseFlowEventStatus(previousStatus))
- .withRuleName(ruleName);
- if (StringUtils.isNotBlank(reason)) {
- flowEventBuilder.withReason(reason);
- }
- String isolationObject = "";
- switch (resource.getLevel()) {
- case SERVICE:
- flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.SERVICE);
- isolationObject = CircuitBreakerUtils.getServiceCircuitBreakerName(
- resource.getService().getNamespace(), resource.getService().getService());
- break;
- case METHOD:
- MethodResource methodResource = (MethodResource) resource;
- flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.METHOD)
- .withApiProtocol(methodResource.getProtocol())
- .withApiPath(methodResource.getPath())
- .withApiMethod(methodResource.getMethod());
- isolationObject = CircuitBreakerUtils.getApiCircuitBreakerName(
- methodResource.getService().getNamespace(), methodResource.getService().getService(),
- methodResource.getPath(), methodResource.getMethod());
- break;
- case INSTANCE:
- InstanceResource instanceResource = (InstanceResource) resource;
- flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.INSTANCE)
- .withHost(instanceResource.getHost())
- .withPort(instanceResource.getPort());
- isolationObject = CircuitBreakerUtils.getInstanceCircuitBreakerName(
- instanceResource.getHost(), instanceResource.getPort());
- break;
- }
-
- FlowEvent flowEvent = flowEventBuilder.build();
-
- String failureRate = "";
- String slowCallRate = "";
- if (CollectionUtils.isNotEmpty(currentActiveRule.getBlockConfigsList())) {
- for (BlockConfig blockConfig : currentActiveRule.getBlockConfigsList()) {
- if (CollectionUtils.isNotEmpty(blockConfig.getTriggerConditionsList())) {
- if (StringUtils.equals(blockConfig.getName(), "failure")) {
- failureRate = String.valueOf(blockConfig.getTriggerConditions(0).getErrorPercent());
- } else if (StringUtils.equals(blockConfig.getName(), "slow")) {
- slowCallRate = String.valueOf(blockConfig.getTriggerConditions(0).getErrorPercent());
- }
- }
- }
- }
- if (StringUtils.isNotBlank(isolationObject)) {
- flowEvent.getAdditionalParams().put(ISOLATION_OBJECT_KEY, isolationObject);
- }
- if (StringUtils.isNotBlank(failureRate)) {
- flowEvent.getAdditionalParams().put(FAILURE_RATE_KEY, failureRate);
- }
- if (StringUtils.isNotBlank(slowCallRate)) {
- flowEvent.getAdditionalParams().put(SLOW_CALL_DURATION_KEY, slowCallRate);
- }
- BaseFlow.reportFlowEvent(extensions, flowEvent);
- }
-
public void reportCircuitStatus() {
if (Objects.isNull(extensions)) {
return;
@@ -473,7 +396,8 @@ private void toDestroy() {
for (TriggerCounter triggerCounter : counters) {
triggerCounter.resume();
}
- reportEvent(circuitBreakerStatus.getStatus(), Status.DESTROY, circuitBreakerStatus.getCircuitBreaker());
+ CircuitBreakerEventUtils.reportEvent(extensions, resource, currentActiveRule,
+ circuitBreakerStatus.getStatus(), Status.CLOSE, circuitBreakerStatus.getCircuitBreaker(), null);
reportCircuitStatus();
}
}
diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerEventUtils.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerEventUtils.java
new file mode 100644
index 000000000..f0353c840
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerEventUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.circuitbreaker.composite.utils;
+
+import com.tencent.polaris.api.plugin.circuitbreaker.entity.InstanceResource;
+import com.tencent.polaris.api.plugin.circuitbreaker.entity.MethodResource;
+import com.tencent.polaris.api.plugin.circuitbreaker.entity.Resource;
+import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
+import com.tencent.polaris.api.pojo.CircuitBreakerStatus;
+import com.tencent.polaris.api.pojo.ServiceEventKey;
+import com.tencent.polaris.api.utils.CollectionUtils;
+import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.client.flow.BaseFlow;
+import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto;
+
+import java.time.LocalDateTime;
+
+import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.*;
+
+/**
+ * @author Haotian Zhang
+ */
+public class CircuitBreakerEventUtils {
+
+ public static void reportEvent(Extensions extensions, Resource resource,
+ CircuitBreakerProto.CircuitBreakerRule currentActiveRule,
+ CircuitBreakerStatus.Status previousStatus, CircuitBreakerStatus.Status currentStatus,
+ String ruleName, String reason) {
+ if (extensions == null) {
+ return;
+ }
+
+ FlowEventConstants.Status currentFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(currentStatus);
+ FlowEventConstants.Status previousFlowEventStatus = CircuitBreakerUtils.parseFlowEventStatus(previousStatus);
+
+ FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
+ .withEventType(ServiceEventKey.EventType.CIRCUIT_BREAKING)
+ .withEventName(CircuitBreakerUtils.parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus))
+ .withTimestamp(LocalDateTime.now())
+ .withClientId(extensions.getValueContext().getClientId())
+ .withClientIp(extensions.getValueContext().getHost())
+ .withNamespace(resource.getService().getNamespace())
+ .withService(resource.getService().getService())
+ .withSourceNamespace(resource.getCallerService().getNamespace())
+ .withSourceService(resource.getService().getService())
+ .withCurrentStatus(currentFlowEventStatus)
+ .withPreviousStatus(previousFlowEventStatus)
+ .withRuleName(ruleName);
+ if (StringUtils.isNotBlank(reason)) {
+ flowEventBuilder.withReason(reason);
+ }
+ String isolationObject = "";
+ switch (resource.getLevel()) {
+ case SERVICE:
+ flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.SERVICE);
+ isolationObject = CircuitBreakerUtils.getServiceCircuitBreakerName(
+ resource.getService().getNamespace(), resource.getService().getService());
+ break;
+ case METHOD:
+ MethodResource methodResource = (MethodResource) resource;
+ flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.METHOD)
+ .withApiProtocol(methodResource.getProtocol())
+ .withApiPath(methodResource.getPath())
+ .withApiMethod(methodResource.getMethod());
+ isolationObject = CircuitBreakerUtils.getApiCircuitBreakerName(
+ methodResource.getService().getNamespace(), methodResource.getService().getService(),
+ methodResource.getPath(), methodResource.getMethod());
+ break;
+ case INSTANCE:
+ InstanceResource instanceResource = (InstanceResource) resource;
+ flowEventBuilder = flowEventBuilder.withResourceType(FlowEventConstants.ResourceType.INSTANCE)
+ .withHost(instanceResource.getHost())
+ .withPort(instanceResource.getPort());
+ isolationObject = CircuitBreakerUtils.getInstanceCircuitBreakerName(
+ instanceResource.getHost(), instanceResource.getPort());
+ break;
+ }
+
+ FlowEvent flowEvent = flowEventBuilder.build();
+
+ String failureRate = "";
+ String slowCallRate = "";
+ if (CollectionUtils.isNotEmpty(currentActiveRule.getBlockConfigsList())) {
+ for (CircuitBreakerProto.BlockConfig blockConfig : currentActiveRule.getBlockConfigsList()) {
+ if (CollectionUtils.isNotEmpty(blockConfig.getTriggerConditionsList())) {
+ if (StringUtils.equals(blockConfig.getName(), "failure")) {
+ failureRate = String.valueOf(blockConfig.getTriggerConditions(0).getErrorPercent());
+ } else if (StringUtils.equals(blockConfig.getName(), "slow")) {
+ slowCallRate = String.valueOf(blockConfig.getTriggerConditions(0).getErrorPercent());
+ }
+ }
+ }
+ }
+ if (StringUtils.isNotBlank(isolationObject)) {
+ flowEvent.getAdditionalParams().put(ISOLATION_OBJECT_KEY, isolationObject);
+ }
+ if (StringUtils.isNotBlank(failureRate)) {
+ flowEvent.getAdditionalParams().put(FAILURE_RATE_KEY, failureRate);
+ }
+ if (StringUtils.isNotBlank(slowCallRate)) {
+ flowEvent.getAdditionalParams().put(SLOW_CALL_DURATION_KEY, slowCallRate);
+ }
+ BaseFlow.reportFlowEvent(extensions, flowEvent);
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
index 323f39529..552cafdd9 100644
--- a/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
+++ b/polaris-plugins/polaris-plugins-circuitbreaker/circuitbreaker-composite/src/main/java/com/tencent/polaris/plugins/circuitbreaker/composite/utils/CircuitBreakerUtils.java
@@ -36,7 +36,7 @@ public class CircuitBreakerUtils {
public static long DEFAULT_ERROR_RATE_INTERVAL_MS = 60 * 1000;
- public static long MIN_CLEANUP_INTERVAL = 60 * 1000;
+ public static long MIN_CLEANUP_INTERVAL = 60 * 1000;
public static boolean checkRule(CircuitBreakerProto.CircuitBreakerRule rule) {
return checkLevel(rule.getLevel());
@@ -110,6 +110,20 @@ public static FlowEventConstants.Status parseFlowEventStatus(CircuitBreakerStatu
}
}
+ public static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) {
+ if (currentStatus == FlowEventConstants.Status.OPEN) {
+ return FlowEventConstants.EventName.CircuitBreakerOpen;
+ } else if (currentStatus == FlowEventConstants.Status.HALF_OPEN) {
+ return FlowEventConstants.EventName.CircuitBreakerHalfOpen;
+ } else if (currentStatus == FlowEventConstants.Status.CLOSE) {
+ return FlowEventConstants.EventName.CircuitBreakerClose;
+ } else if (currentStatus == FlowEventConstants.Status.DESTROY) {
+ return FlowEventConstants.EventName.CircuitBreakerDestroy;
+ } else {
+ return FlowEventConstants.EventName.UNKNOWN;
+ }
+ }
+
public static String getApiCircuitBreakerName(String targetNamespaceId, String serviceName, String path, String method) {
return targetNamespaceId + "#" + serviceName + "#" + getFormatApi(path, method);
}
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java
new file mode 100644
index 000000000..a17e329e8
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-common/src/main/java/com/tencent/polaris/plugin/lossless/common/LosslessEventUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugin.lossless.common;
+
+import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
+import com.tencent.polaris.api.pojo.ServiceEventKey;
+import com.tencent.polaris.client.flow.BaseFlow;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author Haotian Zhang
+ */
+public class LosslessEventUtils {
+
+ public static void reportEvent(Extensions extensions, String namespace, String service, String host,
+ int port, FlowEventConstants.EventName eventName) {
+ if (extensions == null) {
+ return;
+ }
+
+ FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
+ .withEventType(ServiceEventKey.EventType.LOSSLESS)
+ .withEventName(eventName)
+ .withTimestamp(LocalDateTime.now())
+ .withClientId(extensions.getValueContext().getClientId())
+ .withClientIp(extensions.getValueContext().getHost())
+ .withNamespace(namespace)
+ .withService(service)
+ .withHost(host)
+ .withPort(port);
+
+ FlowEvent flowEvent = flowEventBuilder.build();
+ BaseFlow.reportFlowEvent(extensions, flowEvent);
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
index 3d0517056..86c7142ae 100644
--- a/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-deregister/src/main/java/com/tencent/polaris/plugin/lossless/deregister/DeregisterLosslessPolicy.java
@@ -27,6 +27,7 @@
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.common.ValueContext;
import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
import com.tencent.polaris.api.plugin.lossless.InstanceProperties;
import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider;
import com.tencent.polaris.api.plugin.lossless.LosslessPolicy;
@@ -37,19 +38,14 @@
import com.tencent.polaris.client.util.HttpServerUtils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.logging.LoggingConsts;
+import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils;
import com.tencent.polaris.plugin.lossless.common.LosslessUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class DeregisterLosslessPolicy implements LosslessPolicy, HttpServerAware {
@@ -197,6 +193,8 @@ private void doLosslessDeregister(BaseInstance instance, LosslessActionProvider
event.setBaseInstance(instance);
event.setEventName(EVENT_LOSSLESS_DEREGISTER);
EVENT_LOG.info(event.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOfflineStart);
}
private boolean isLosslessOfflineEnable(BaseInstance instance) {
diff --git a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
index 18ea0c029..1d8aed604 100644
--- a/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
+++ b/polaris-plugins/polaris-plugins-lossless/lossless-register/src/main/java/com/tencent/polaris/plugin/lossless/register/HealthCheckRegisterLosslessPolicy.java
@@ -27,6 +27,7 @@
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.common.ValueContext;
import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.FlowEventConstants;
import com.tencent.polaris.api.plugin.lossless.InstanceProperties;
import com.tencent.polaris.api.plugin.lossless.LosslessActionProvider;
import com.tencent.polaris.api.plugin.lossless.LosslessPolicy;
@@ -38,6 +39,7 @@
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.logging.LoggingConsts;
+import com.tencent.polaris.plugin.lossless.common.LosslessEventUtils;
import com.tencent.polaris.plugin.lossless.common.LosslessUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LosslessProto;
import org.slf4j.Logger;
@@ -181,6 +183,8 @@ private void doLosslessRegister(
}
LOG.info("[HealthCheckRegisterLosslessPolicy] do losslessRegister for instance {}", instance);
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOnlineStart);
if (!losslessActionProvider.isEnableHealthCheck()) {
long delayRegisterInterval = getDelayRegisterInterval(instance);
LOG.info("[HealthCheckRegisterLosslessPolicy] health check disabled, start lossless register after {}ms, plugin {}",
@@ -216,6 +220,8 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc
} else {
EVENT_LOG.info(new Event(clientId, instance, EVENT_DIRECT_REGISTER).toString());
}
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessOnlineEnd);
int warmupInterval = getWarmupInterval(instance);
if (warmupInterval > 0) {
@@ -226,11 +232,15 @@ private void doRegister(BaseInstance instance, LosslessActionProvider losslessAc
Event warmupStartEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_START);
EVENT_LOG.info(warmupStartEvent.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessWarmupStart);
healthCheckExecutor.schedule(() -> {
LOG.info("[HealthCheckRegisterLosslessPolicy] warmup for instance {} finished", instance);
Event warmupEndEvent = new Event(clientId, instance, EVENT_LOSSLESS_WARMUP_END);
EVENT_LOG.info(warmupEndEvent.toString());
+ LosslessEventUtils.reportEvent(extensions, instance.getNamespace(), instance.getService(), instance.getHost(),
+ instance.getPort(), FlowEventConstants.EventName.LosslessWarmupEnd);
}, warmupInterval, TimeUnit.MILLISECONDS);
} else {
LOG.info("[HealthCheckRegisterLosslessPolicy] no warmup for instance {}", instance);
diff --git a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
index f10d27d39..33abeddce 100644
--- a/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
+++ b/polaris-plugins/polaris-plugins-observability/event-logger/src/main/java/com/tencent/polaris/plugins/event/logger/LoggerEventReporter.java
@@ -95,6 +95,11 @@ public String convertMessage(FlowEvent event) {
eventType = event.getEventType().name();
}
+ String eventName = "";
+ if (event.getEventName() != null) {
+ eventName = event.getEventName().name();
+ }
+
String currentStatus = "";
if (event.getCurrentStatus() != null) {
currentStatus = event.getCurrentStatus().name();
@@ -109,11 +114,11 @@ public String convertMessage(FlowEvent event) {
if (event.getResourceType() != null) {
resourceType = event.getResourceType().name();
}
- return eventType + "|" + formattedDateTime + "|" + event.getClientId() + "|"
+ return eventType + "|" + eventName + "|" + formattedDateTime + "|" + event.getClientId() + "|"
+ event.getClientIp() + "|" + event.getNamespace() + "|" + event.getService() + "|"
+ event.getApiProtocol() + "|" + event.getApiPath() + "|" + event.getApiMethod() + "|"
+ event.getHost() + "|" + event.getPort() + "|" + event.getSourceNamespace() + "|"
- + event.getSourceService() + "|" + event.getLabels() + "|" + currentStatus + "|" + previousStatus + "|"
- + resourceType + "|" + event.getRuleName() + "|" + event.getReason();
+ + event.getSourceService() + "|" + event.getLabels() + "|" + currentStatus + "|"
+ + previousStatus + "|" + resourceType + "|" + event.getRuleName() + "|" + event.getReason();
}
}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml
new file mode 100644
index 000000000..1b3fbf539
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+ polaris-plugins-observability
+ com.tencent.polaris
+ ${revision}
+ ../pom.xml
+
+
+ event-pushgateway
+ Polaris Plugins Observability Push Gateway Event Reporter
+ Polaris Plugins Observability Push Gateway Event Reporter JAR
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+
+ com.google.code.gson
+ gson
+ ${gson.version}
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java
new file mode 100644
index 000000000..d302bca25
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporter.java
@@ -0,0 +1,231 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.tencent.polaris.api.config.global.EventReporterConfig;
+import com.tencent.polaris.api.config.plugin.DefaultPlugins;
+import com.tencent.polaris.api.config.plugin.PluginConfigProvider;
+import com.tencent.polaris.api.config.verify.Verifier;
+import com.tencent.polaris.api.exception.PolarisException;
+import com.tencent.polaris.api.plugin.PluginType;
+import com.tencent.polaris.api.plugin.common.InitContext;
+import com.tencent.polaris.api.plugin.common.PluginTypes;
+import com.tencent.polaris.api.plugin.compose.Extensions;
+import com.tencent.polaris.api.plugin.event.EventReporter;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.utils.CollectionUtils;
+import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.api.utils.ThreadPoolUtils;
+import com.tencent.polaris.client.util.NamedThreadFactory;
+import com.tencent.polaris.logging.LoggerFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Polaris push gateway event reporter.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventReporter implements EventReporter, PluginConfigProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PushGatewayEventReporter.class);
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private BlockingQueue eventQueue;
+
+ private volatile boolean init = true;
+
+ private PushGatewayEventReporterConfig config;
+
+ private URI eventUri;
+
+ private final ScheduledExecutorService eventExecutors = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory("event-pushgateway"));
+
+ @Override
+ public boolean isEnabled() {
+ return config.isEnable();
+ }
+
+ @Override
+ public boolean reportEvent(FlowEvent flowEvent) {
+ if (eventUri == null) {
+ LOG.warn("build event request url fail, can not sent event.");
+ return false;
+ }
+ // 如果满了就抛出异常
+ try {
+ eventQueue.add(flowEvent);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add push gateway event to event queue: {}", flowEvent);
+ }
+ } catch (Throwable throwable) {
+ LOG.warn("Event queue is full. Log this event and drop it. event={}, error={}.",
+ flowEvent, throwable.getMessage(), throwable);
+ }
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return DefaultPlugins.PUSH_GATEWAY_EVENT_REPORTER_TYPE;
+ }
+
+ @Override
+ public Class extends Verifier> getPluginConfigClazz() {
+ return PushGatewayEventReporterConfig.class;
+ }
+
+ @Override
+ public PluginType getType() {
+ return PluginTypes.EVENT_REPORTER.getBaseType();
+ }
+
+ @Override
+ public void init(InitContext ctx) throws PolarisException {
+ EventReporterConfig eventReporterConfig = ctx.getConfig().getGlobal().getEventReporter();
+ if (eventReporterConfig != null && CollectionUtils.isNotEmpty(eventReporterConfig.getReporters())) {
+ for (String reporter : eventReporterConfig.getReporters()) {
+ if (StringUtils.equals(getName(), reporter)) {
+ this.config = ctx.getConfig().getGlobal().getEventReporter()
+ .getPluginConfig(getName(), PushGatewayEventReporterConfig.class);
+ if (config.isEnable()) {
+ init = false;
+ }
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void postContextInit(Extensions ctx) throws PolarisException {
+ if (!init) {
+ synchronized (this) {
+ if (!init) {
+ init = true;
+ try {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ eventQueue = new LinkedBlockingQueue<>(config.getEventQueueSize());
+
+ // 分割主机名和端口号
+ if (StringUtils.isBlank(config.getAddress())) {
+ throw new RuntimeException("PushGateway address is empty.");
+ }
+ String[] parts = config.getAddress().split(":");
+ if (parts.length != 2) {
+ throw new RuntimeException(String.format("PushGateway address %s format error.", config.getAddress()));
+ }
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+ eventUri = new URIBuilder()
+ .setHost(host)
+ .setPort(port)
+ .setScheme("http")
+ .setPath("/polaris/client/events")
+ .build();
+ LOG.info("PushGateway event reporter init with uri: {}", eventUri);
+
+ eventExecutors.scheduleWithFixedDelay(new PushGatewayEventTask(), 1000, 1000, TimeUnit.MILLISECONDS);
+ LOG.info("PushGateway event reporter starts reporting task.");
+ } catch (URISyntaxException e) {
+ LOG.error("Build event request url fail.", e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{eventExecutors});
+ }
+
+ class PushGatewayEventTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ // 每次把eventQueue发空结束
+ while (CollectionUtils.isNotEmpty(eventQueue)) {
+ List eventDataList = new ArrayList<>();
+ PushGatewayEventRequest request = new PushGatewayEventRequest();
+
+ eventQueue.drainTo(eventDataList, config.getMaxBatchSize());
+ request.setBatch(eventDataList);
+
+ postPushGatewayEvent(request);
+ }
+ } catch (Throwable e) {
+ LOG.warn("Push gateway event reporter task fail.", e);
+ }
+ }
+
+ private void postPushGatewayEvent(PushGatewayEventRequest request) {
+ StringEntity postBody = null;
+ RequestConfig config = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(10000).setSocketTimeout(10000).build();
+ try (CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build()) {
+ HttpPost httpPost = new HttpPost(eventUri);
+ postBody = new StringEntity(mapper.writeValueAsString(request));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postPushGatewayEvent body:{}", postBody);
+ }
+ httpPost.setEntity(postBody);
+ httpPost.setHeader("Content-Type", "application/json");
+ HttpResponse httpResponse;
+
+ httpResponse = httpClient.execute(httpPost);
+
+ if (200 != httpResponse.getStatusLine().getStatusCode()) {
+ String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
+ throw new RuntimeException("Report push gateway event failed. Response = [" + resultString + "].");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ String resultString = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
+ LOG.info("Report push gateway event success. Response is : {}", resultString);
+ } else {
+ LOG.info("Report push gateway event success.");
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Report push gateway event failed, postBody:{}.", postBody, e);
+ }
+ }
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java
new file mode 100644
index 000000000..94ea9d519
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventReporterConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.tencent.polaris.api.config.verify.Verifier;
+import com.tencent.polaris.api.utils.StringUtils;
+import com.tencent.polaris.factory.util.ConfigUtils;
+
+/**
+ * Polaris push gateway event reporter config.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventReporterConfig implements Verifier {
+
+ @JsonProperty
+ private Boolean enable;
+
+ @JsonProperty
+ private String address;
+
+ @JsonProperty
+ private Integer eventQueueSize = 1000;
+
+ @JsonProperty
+ private Integer maxBatchSize = 100;
+
+ @Override
+ public void verify() {
+ ConfigUtils.validateNull(enable, "global.eventReporter.plugin.pushgateway.enable");
+ if (!enable) {
+ return;
+ }
+ ConfigUtils.validateString(address, "global.eventReporter.plugin.pushgateway.address");
+ }
+
+ @Override
+ public void setDefault(Object defaultObject) {
+ if (defaultObject instanceof PushGatewayEventReporterConfig) {
+ PushGatewayEventReporterConfig pushGatewayEventReporterConfig = (PushGatewayEventReporterConfig) defaultObject;
+ if (null == enable) {
+ setEnable(pushGatewayEventReporterConfig.isEnable());
+ }
+ if (StringUtils.isBlank(address)) {
+ setAddress(pushGatewayEventReporterConfig.getAddress());
+ }
+ }
+ }
+
+ public boolean isEnable() {
+ if (null == enable) {
+ enable = false;
+ }
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public Integer getEventQueueSize() {
+ return eventQueueSize;
+ }
+
+ public void setEventQueueSize(Integer eventQueueSize) {
+ this.eventQueueSize = eventQueueSize;
+ }
+
+ public Integer getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public void setMaxBatchSize(Integer maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java
new file mode 100644
index 000000000..7a5a34ab1
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Polaris push gateway event request.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventRequest {
+
+ private List batch = new ArrayList<>();
+
+ public List getBatch() {
+ return batch;
+ }
+
+ public void setBatch(List batch) {
+ this.batch = batch;
+ }
+
+ @Override
+ public String toString() {
+ return "PushGatewayEventRequest{" +
+ "batch=" + batch +
+ '}';
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java
new file mode 100644
index 000000000..0678d93e1
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+/**
+ * Polaris push gateway event response.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventResponse {
+
+ private String requestId;
+
+ private int code;
+
+ private String info;
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getInfo() {
+ return info;
+ }
+
+ public void setInfo(String info) {
+ this.info = info;
+ }
+
+ @Override
+ public String toString() {
+ return "PushGatewayEventResponse{" +
+ "requestId='" + requestId + '\'' +
+ ", code=" + code +
+ ", info='" + info + '\'' +
+ '}';
+ }
+}
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter
new file mode 100644
index 000000000..55a65137a
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/main/resources/META-INF/services/com.tencent.polaris.api.plugin.event.EventReporter
@@ -0,0 +1 @@
+com.tencent.polaris.plugins.event.pushgateway.PushGatewayEventReporter
diff --git a/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java
new file mode 100644
index 000000000..b8cfb0500
--- /dev/null
+++ b/polaris-plugins/polaris-plugins-observability/event-pushgateway/src/test/java/com/tencent/polaris/plugins/event/pushgateway/PushGatewayEventRequestTest.java
@@ -0,0 +1,66 @@
+/*
+ * Tencent is pleased to support the open source community by making polaris-java available.
+ *
+ * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.tencent.polaris.plugins.event.pushgateway;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.tencent.polaris.api.plugin.event.FlowEvent;
+import com.tencent.polaris.api.pojo.ServiceEventKey;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+
+import static com.tencent.polaris.api.plugin.event.FlowEventConstants.EventName.LosslessOnlineStart;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for {@link PushGatewayEventRequest}.
+ *
+ * @author Haotian Zhang
+ */
+public class PushGatewayEventRequestTest {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Test
+ public void test() throws JsonProcessingException {
+ FlowEvent flowEvent = new FlowEvent.Builder()
+ .withEventType(ServiceEventKey.EventType.LOSSLESS)
+ .withEventName(LosslessOnlineStart)
+ .withTimestamp(LocalDateTime.of(2025, 1, 13, 11, 58, 42))
+ .withClientId("test-client")
+ .withClientIp("1.2.3.4")
+ .withNamespace("test-namespace")
+ .withService("test-service")
+ .withHost("1.2.3.4")
+ .withPort(8080)
+ .build();
+
+ PushGatewayEventRequest request = new PushGatewayEventRequest();
+ request.getBatch().add(flowEvent);
+
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ assertThat(mapper.writeValueAsString(request)).isEqualTo("{\"batch\":[{\"currentStatus\":null,\"previousStatus\":null,\"resourceType\":null,\"ruleName\":\"\",\"additionalParams\":{},\"event_type\":\"LOSSLESS\",\"event_name\":\"LosslessOnlineStart\",\"event_time\":\"2025-01-13 11:58:42:0000\",\"client_id\":\"test-client\",\"client_ip\":\"1.2.3.4\",\"namespace\":\"test-namespace\",\"service\":\"test-service\",\"api_protocol\":\"\",\"api_path\":\"\",\"api_method\":\"\",\"host\":\"1.2.3.4\",\"port\":\"8080\",\"source_namespace\":\"\",\"source_service\":\"\",\"labels\":\"\",\"reason\":\"\"}]}");
+ }
+}
\ No newline at end of file
diff --git a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
index e111343a5..bf985c879 100644
--- a/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
+++ b/polaris-plugins/polaris-plugins-observability/event-tsf/src/main/java/com/tencent/polaris/plugins/event/tsf/TsfEventReporter.java
@@ -57,6 +57,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -115,7 +116,7 @@ private boolean reportV1Event(FlowEvent flowEvent) {
}
TsfEventData eventData = new TsfEventData();
- eventData.setOccurTime(flowEvent.getTimestamp().getEpochSecond());
+ eventData.setOccurTime(flowEvent.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().getEpochSecond());
eventData.setEventName(TsfEventDataUtils.convertEventName(flowEvent));
Byte status = TsfEventDataUtils.convertStatus(flowEvent);
if (status == null || status == -1) {
diff --git a/polaris-plugins/polaris-plugins-observability/pom.xml b/polaris-plugins/polaris-plugins-observability/pom.xml
index 565759f03..0b57b24b6 100644
--- a/polaris-plugins/polaris-plugins-observability/pom.xml
+++ b/polaris-plugins/polaris-plugins-observability/pom.xml
@@ -21,6 +21,7 @@
trace-otel
event-logger
event-tsf
+ event-pushgateway
diff --git a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
index 91f59031a..d5f12477e 100644
--- a/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
+++ b/polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/utils/RateLimiterEventUtils.java
@@ -27,7 +27,7 @@
import com.tencent.polaris.client.flow.BaseFlow;
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto;
-import java.time.Instant;
+import java.time.LocalDateTime;
import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_DETAIL_KEY;
import static com.tencent.polaris.api.plugin.event.tsf.TsfEventDataConstants.RULE_ID_KEY;
@@ -43,9 +43,14 @@ public static void reportEvent(Extensions extensions, ServiceKey serviceKey, Rat
if (extensions == null) {
return;
}
+
+ FlowEventConstants.Status currentFlowEventStatus = parseFlowEventStatus(currentCode);
+ FlowEventConstants.Status previousFlowEventStatus = parseFlowEventStatus(previousCode);
+
FlowEvent.Builder flowEventBuilder = new FlowEvent.Builder()
.withEventType(ServiceEventKey.EventType.RATE_LIMITING)
- .withTimestamp(Instant.now())
+ .withEventName(parseFlowEventName(currentFlowEventStatus, previousFlowEventStatus))
+ .withTimestamp(LocalDateTime.now())
.withClientId(extensions.getValueContext().getClientId())
.withClientIp(extensions.getValueContext().getHost())
.withNamespace(serviceKey.getNamespace())
@@ -55,11 +60,10 @@ public static void reportEvent(Extensions extensions, ServiceKey serviceKey, Rat
.withSourceNamespace(sourceNamespace)
.withSourceService(sourceService)
.withLabels(labels)
- .withCurrentStatus(parseFlowEventStatus(currentCode))
- .withPreviousStatus(parseFlowEventStatus(previousCode))
+ .withCurrentStatus(currentFlowEventStatus)
+ .withPreviousStatus(previousFlowEventStatus)
.withResourceType(parseFlowEventResourceType(rule.getResource()))
- .withRuleName(rule.getName().getValue())
- .withReason(reason);
+ .withRuleName(rule.getName().getValue());
if (StringUtils.isNotBlank(reason)) {
flowEventBuilder.withReason(reason);
}
@@ -92,6 +96,16 @@ private static FlowEventConstants.Status parseFlowEventStatus(QuotaResult.Code c
}
}
+ private static FlowEventConstants.EventName parseFlowEventName(FlowEventConstants.Status currentStatus, FlowEventConstants.Status previousStatus) {
+ if (currentStatus == FlowEventConstants.Status.LIMITED && previousStatus == FlowEventConstants.Status.UNLIMITED) {
+ return FlowEventConstants.EventName.RateLimitStart;
+ } else if (currentStatus == FlowEventConstants.Status.UNLIMITED && previousStatus == FlowEventConstants.Status.LIMITED) {
+ return FlowEventConstants.EventName.RateLimitEnd;
+ } else {
+ return FlowEventConstants.EventName.UNKNOWN;
+ }
+ }
+
private static FlowEventConstants.ResourceType parseFlowEventResourceType(RateLimitProto.Rule.Resource resource) {
switch (resource) {
case QPS:
diff --git a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
index 10f673240..eaaa113b2 100644
--- a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
+++ b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml
@@ -118,6 +118,11 @@
event-tsf
${project.version}
+
+ com.tencent.polaris
+ event-pushgateway
+ ${project.version}
+
com.tencent.polaris
diff --git a/pom.xml b/pom.xml
index 334c3af2a..e5320c38f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
- 2.0.0.1
+ 2.0.1.0-SNAPSHOT
${maven.build.timestamp}
false