diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index c57b3b7..b7067ed 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -32,7 +32,7 @@ This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment*
- *Added unit tests*
-- *Manually verified by running the Kinesis connector on a local Flink cluster.*
+- *Manually verified by running the Prometheus connector on a local Flink cluster.*
## Significant changes
*(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*
diff --git a/docs/data/prometheus.yml b/docs/data/prometheus.yml
new file mode 100644
index 0000000..c3c93ee
--- /dev/null
+++ b/docs/data/prometheus.yml
@@ -0,0 +1,22 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: 1.1-SNAPSHOT
+variants:
+ - maven: flink-connector-prometheus
+ - maven: flink-connector-prometheus-request-signer-amp
diff --git a/flink-connector-prometheus-request-signer-amp/pom.xml b/flink-connector-prometheus-request-signer-amp/pom.xml
index 65f62ac..fe72d7c 100644
--- a/flink-connector-prometheus-request-signer-amp/pom.xml
+++ b/flink-connector-prometheus-request-signer-amp/pom.xml
@@ -54,6 +54,12 @@ under the License.
provided
+
+ org.apache.flink
+ flink-connector-aws-base
+ 5.0.0-1.20
+
+
software.amazon.awssdkauth
diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
index 5922b0e..af6f2bf 100644
--- a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
+++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
@@ -59,12 +59,19 @@ public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequ
* @param awsRegion Region of the AMP workspace
*/
public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) {
+ this(remoteWriteUrl, awsRegion, DefaultCredentialsProvider.create());
+ }
+
+ public AmazonManagedPrometheusWriteRequestSigner(
+ String remoteWriteUrl, String awsRegion, AwsCredentialsProvider credentialsProvider) {
Preconditions.checkArgument(
StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
Preconditions.checkArgument(
StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
this.awsRegion = awsRegion;
+ this.credentialsProvider = credentialsProvider;
+
try {
this.remoteWriteUrl = new URL(remoteWriteUrl);
} catch (MalformedURLException e) {
diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java
new file mode 100644
index 0000000..b110c33
--- /dev/null
+++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.table.PrometheusConfig;
+import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+
+import java.util.Properties;
+
+public class AmazonManagedPrometheusWriteRequestSignerFactory
+ implements PrometheusDynamicRequestSignerFactory {
+ public AmazonManagedPrometheusWriteRequestSignerFactory() {}
+
+ @Override
+ public String requestSignerIdentifer() {
+ return "amazon-managed-prometheus";
+ }
+
+ @Override
+ public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
+ Properties properties = config.toProperties();
+ AWSGeneralUtil.validateAwsConfiguration(properties);
+
+ final AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+ final String awsRegion = AWSGeneralUtil.getRegion(properties).toString();
+ final String remoteWriteUrl = config.getRemoteWriteEndpointUrl();
+
+ return new AmazonManagedPrometheusWriteRequestSigner(
+ remoteWriteUrl, awsRegion, credentialsProvider);
+ }
+}
diff --git a/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory b/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory
new file mode 100644
index 0000000..fd33d6f
--- /dev/null
+++ b/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory
\ No newline at end of file
diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java
new file mode 100644
index 0000000..6bb8a4b
--- /dev/null
+++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java
@@ -0,0 +1,68 @@
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.table.PrometheusConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.prometheus.table.PrometheusConnectorOption.METRIC_REMOTE_WRITE_URL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class AmazonManagedPrometheusWriteRequestSignerFactoryTest {
+ private AmazonManagedPrometheusWriteRequestSignerFactory factory;
+
+ @BeforeEach
+ public void setup() {
+ factory = new AmazonManagedPrometheusWriteRequestSignerFactory();
+ }
+
+ @Test
+ void testIdentifier() {
+ assertEquals(factory.requestSignerIdentifer(), "amazon-managed-prometheus");
+ }
+
+ @Test
+ void testCreateRequestSigner() {
+ final String endpoint =
+ "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write";
+ final String region = "us-east-1";
+ Configuration config = new Configuration();
+ config.set(METRIC_REMOTE_WRITE_URL, endpoint);
+ config.setString(AWS_REGION, region);
+
+ PrometheusRequestSigner requestSigner =
+ factory.getRequestSigner(new PrometheusConfig(config));
+
+ assertInstanceOf(AmazonManagedPrometheusWriteRequestSigner.class, requestSigner);
+ }
+
+ @Test
+ void testCreateRequestSignerFailsWithInvalidRegion() {
+ final String endpoint =
+ "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write";
+ final String region = "invalid-region";
+ Configuration config = new Configuration();
+ config.set(METRIC_REMOTE_WRITE_URL, endpoint);
+ config.setString(AWS_REGION, region);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> factory.getRequestSigner(new PrometheusConfig(config)));
+ }
+
+ @Test
+ void testCreateRequestSignerFailsWithInvalidURL() {
+ Configuration config = new Configuration();
+ config.set(METRIC_REMOTE_WRITE_URL, "invalid-endpoint");
+ config.setString(AWS_REGION, "us-east-1");
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> factory.getRequestSigner(new PrometheusConfig(config)));
+ }
+}
diff --git a/flink-connector-prometheus/pom.xml b/flink-connector-prometheus/pom.xml
index 90b101b..4f3aabb 100644
--- a/flink-connector-prometheus/pom.xml
+++ b/flink-connector-prometheus/pom.xml
@@ -77,6 +77,31 @@ under the License.
httpcore5
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+ true
+
+
org.apache.flink
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
index 78edd7c..da8890e 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
@@ -37,11 +37,12 @@
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
@PublicEvolving
-public class PrometheusSink extends AsyncSinkBase {
+public class PrometheusSink extends AsyncSinkBase {
private final String prometheusRemoteWriteUrl;
private final PrometheusAsyncHttpClientBuilder clientBuilder;
private final PrometheusRequestSigner requestSigner;
private final int maxBatchSizeInSamples;
+ private final int maxRecordSizeInSamples;
private final String httpUserAgent;
private final PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
errorHandlingBehaviorConfig;
@@ -49,7 +50,7 @@ public class PrometheusSink extends AsyncSinkBase elementConverter,
+ ElementConverter elementConverter,
int maxInFlightRequests,
int maxBufferedRequests,
int maxBatchSizeInSamples,
@@ -93,6 +94,7 @@ protected PrometheusSink(
Preconditions.checkArgument(
StringUtils.isNotBlank(metricGroupName), "Missing metric group name");
this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+ this.maxRecordSizeInSamples = maxRecordSizeInSamples;
this.requestSigner = requestSigner;
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
this.clientBuilder = clientBuilder;
@@ -102,8 +104,8 @@ protected PrometheusSink(
}
@Override
- public StatefulSinkWriter>
- createWriter(InitContext initContext) {
+ public StatefulSinkWriter> createWriter(
+ InitContext initContext) {
SinkMetricsCallback metricsCallback =
new SinkMetricsCallback(
SinkMetrics.registerSinkMetrics(
@@ -111,13 +113,13 @@ protected PrometheusSink(
CloseableHttpAsyncClient asyncHttpClient =
clientBuilder.buildAndStartClient(metricsCallback);
- return new PrometheusSinkWriter(
+ return new PrometheusSinkWriter<>(
getElementConverter(),
initContext,
getMaxInFlightRequests(),
getMaxBufferedRequests(),
maxBatchSizeInSamples,
- getMaxRecordSizeInBytes(),
+ maxRecordSizeInSamples,
getMaxTimeInBufferMS(),
prometheusRemoteWriteUrl,
asyncHttpClient,
@@ -128,23 +130,22 @@ protected PrometheusSink(
}
@Override
- public StatefulSinkWriter>
- restoreWriter(
- InitContext initContext,
- Collection> recoveredState) {
+ public StatefulSinkWriter> restoreWriter(
+ InitContext initContext,
+ Collection> recoveredState) {
SinkMetricsCallback metricsCallback =
new SinkMetricsCallback(
SinkMetrics.registerSinkMetrics(
initContext.metricGroup().addGroup(metricGroupName)));
CloseableHttpAsyncClient asyncHttpClient =
clientBuilder.buildAndStartClient(metricsCallback);
- return new PrometheusSinkWriter(
+ return new PrometheusSinkWriter<>(
getElementConverter(),
initContext,
getMaxInFlightRequests(),
getMaxBufferedRequests(),
maxBatchSizeInSamples,
- getMaxRecordSizeInBytes(),
+ maxRecordSizeInSamples,
getMaxTimeInBufferMS(),
prometheusRemoteWriteUrl,
asyncHttpClient,
@@ -155,8 +156,8 @@ protected PrometheusSink(
recoveredState);
}
- public static PrometheusSinkBuilder builder() {
- return new PrometheusSinkBuilder();
+ public static PrometheusSinkBuilder builder() {
+ return new PrometheusSinkBuilder<>();
}
@Override
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
index 345c61c..3fb46bc 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
@@ -18,8 +18,8 @@
package org.apache.flink.connector.prometheus.sink;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -30,9 +30,8 @@
/** Builder for Sink implementation. */
@PublicEvolving
-public class PrometheusSinkBuilder
- extends AsyncSinkBaseBuilder<
- PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> {
+public class PrometheusSinkBuilder
+ extends AsyncSinkBaseBuilder> {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class);
// Max batch size, in number of samples
@@ -56,22 +55,31 @@ public class PrometheusSinkBuilder
private String httpUserAgent = null;
private PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
errorHandlingBehaviorConfig = null;
+ private ElementConverter elementConverter;
private String metricGroupName = null;
- @Override
- public AsyncSinkBase build() {
+ public PrometheusSinkBuilder setElementConverter(
+ ElementConverter elementConverter) {
+ this.elementConverter = elementConverter;
+ return this;
+ }
+ @Override
+ public PrometheusSink build() {
int actualMaxBatchSizeInSamples =
Optional.ofNullable(maxBatchSizeInSamples)
.orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES);
+
int actualMaxBufferedRequests =
- Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS);
+ Optional.ofNullable(super.getMaxBufferedRequests())
+ .orElse(DEFAULT_MAX_BUFFERED_REQUESTS);
+
long actualMaxTimeInBufferMS =
- Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
+ Optional.ofNullable(super.getMaxTimeInBufferMS())
+ .orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
int actualMaxRecordSizeInSamples =
Optional.ofNullable(maxRecordSizeInSamples).orElse(actualMaxBatchSizeInSamples);
-
int actualSocketTimeoutMs =
Optional.ofNullable(socketTimeoutMs)
.orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS);
@@ -116,8 +124,9 @@ public AsyncSinkBase build() {
actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetryableError());
- return new PrometheusSink(
- new PrometheusTimeSeriesConverter(),
+ return new PrometheusSink<>(
+ Optional.ofNullable(elementConverter)
+ .orElse(new PrometheusTimeSeriesBaseConverter<>()),
MAX_IN_FLIGHT_REQUESTS,
actualMaxBufferedRequests,
actualMaxBatchSizeInSamples,
@@ -132,50 +141,50 @@ public AsyncSinkBase build() {
actualMetricGroupName);
}
- public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) {
+ public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) {
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
return this;
}
- public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) {
+ public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) {
this.requestSigner = requestSigner;
return this;
}
- public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) {
+ public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) {
this.maxBatchSizeInSamples = maxBatchSizeInSamples;
return this;
}
- public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) {
+ public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) {
this.maxRecordSizeInSamples = maxRecordSizeInSamples;
return this;
}
- public PrometheusSinkBuilder setRetryConfiguration(
+ public PrometheusSinkBuilder setRetryConfiguration(
PrometheusSinkConfiguration.RetryConfiguration retryConfiguration) {
this.retryConfiguration = retryConfiguration;
return this;
}
- public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
+ public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
this.socketTimeoutMs = socketTimeoutMs;
return this;
}
- public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) {
+ public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) {
this.httpUserAgent = httpUserAgent;
return this;
}
- public PrometheusSinkBuilder setErrorHandlingBehaviorConfiguration(
+ public PrometheusSinkBuilder setErrorHandlingBehaviorConfiguration(
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
errorHandlingBehaviorConfig) {
this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
return this;
}
- public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
+ public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
this.metricGroupName = metricGroupName;
return this;
}
@@ -184,20 +193,20 @@ public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
/** Not supported. Use setMaxBatchSizeInSamples(int) instead */
@Override
- public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) {
+ public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) {
throw new UnsupportedOperationException("maxBatchSize is not supported by this sink");
}
/** Not supported. Use setMaxBatchSizeInSamples(int) instead */
@Override
- public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
+ public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
throw new UnsupportedOperationException(
"maxBatchSizeInBytes is not supported by this sink");
}
/** Not supported. Use setMaxRecordSizeInSamples(int) instead */
@Override
- public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
+ public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
throw new UnsupportedOperationException(
"maxRecordSizeInBytes is not supported by this sink");
}
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java
index f06f451..89ae7fc 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java
@@ -41,7 +41,7 @@
import java.util.function.Consumer;
/**
- * Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries.
+ * Writer, taking care of batching the {@link IN} and handling retries.
*
*
The batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase
* the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable number.
@@ -59,7 +59,7 @@
* maxBatchSizeInBytes.
*/
@Internal
-public class PrometheusSinkWriter extends AsyncSinkWriter {
+public class PrometheusSinkWriter extends AsyncSinkWriter {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class);
private final SinkMetricsCallback metricsCallback;
@@ -69,7 +69,7 @@ public class PrometheusSinkWriter extends AsyncSinkWriter elementConverter,
+ ElementConverter elementConverter,
Sink.InitContext context,
int maxInFlightRequests,
int maxBufferedRequests,
@@ -101,7 +101,7 @@ public PrometheusSinkWriter(
}
public PrometheusSinkWriter(
- ElementConverter elementConverter,
+ ElementConverter elementConverter,
Sink.InitContext context,
int maxInFlightRequests,
int maxBufferedRequests,
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java
index 80fa6ac..de84421 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.prometheus.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -114,6 +115,7 @@ public int hashCode() {
private final Label[] labels;
private final Sample[] samples;
private final String metricName;
+ private static final String METRIC_NAME_LABEL_NAME = "__name__";
public PrometheusTimeSeries(String metricName, Label[] labels, Sample[] samples) {
this.metricName = metricName;
@@ -142,6 +144,35 @@ public static Builder builderFrom(PrometheusTimeSeries other) {
Arrays.asList(other.labels), Arrays.asList(other.samples), other.metricName);
}
+ public Types.TimeSeries toTimeSeries() {
+
+ Types.TimeSeries.Builder builder =
+ Types.TimeSeries.newBuilder()
+ .addLabels(
+ Types.Label.newBuilder()
+ .setName(METRIC_NAME_LABEL_NAME)
+ .setValue(getMetricName())
+ .build());
+
+ for (PrometheusTimeSeries.Label label : getLabels()) {
+ builder.addLabels(
+ Types.Label.newBuilder()
+ .setName(label.getName())
+ .setValue(label.getValue())
+ .build());
+ }
+
+ for (PrometheusTimeSeries.Sample sample : getSamples()) {
+ builder.addSamples(
+ Types.Sample.newBuilder()
+ .setValue(sample.getValue())
+ .setTimestamp(sample.getTimestamp())
+ .build());
+ }
+
+ return builder.build();
+ }
+
/** Builder for sink input pojo instance. */
public static final class Builder {
private List