diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index c57b3b7..b7067ed 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -32,7 +32,7 @@ This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment* - *Added unit tests* -- *Manually verified by running the Kinesis connector on a local Flink cluster.* +- *Manually verified by running the Prometheus connector on a local Flink cluster.* ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* diff --git a/docs/data/prometheus.yml b/docs/data/prometheus.yml new file mode 100644 index 0000000..c3c93ee --- /dev/null +++ b/docs/data/prometheus.yml @@ -0,0 +1,22 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +version: 1.1-SNAPSHOT +variants: + - maven: flink-connector-prometheus + - maven: flink-connector-prometheus-request-signer-amp diff --git a/flink-connector-prometheus-request-signer-amp/pom.xml b/flink-connector-prometheus-request-signer-amp/pom.xml index 65f62ac..fe72d7c 100644 --- a/flink-connector-prometheus-request-signer-amp/pom.xml +++ b/flink-connector-prometheus-request-signer-amp/pom.xml @@ -54,6 +54,12 @@ under the License. provided + + org.apache.flink + flink-connector-aws-base + 5.0.0-1.20 + + software.amazon.awssdk auth diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java index 5922b0e..af6f2bf 100644 --- a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java +++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java @@ -59,12 +59,19 @@ public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequ * @param awsRegion Region of the AMP workspace */ public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) { + this(remoteWriteUrl, awsRegion, DefaultCredentialsProvider.create()); + } + + public AmazonManagedPrometheusWriteRequestSigner( + String remoteWriteUrl, String awsRegion, AwsCredentialsProvider credentialsProvider) { Preconditions.checkArgument( StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty"); Preconditions.checkArgument( StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty"); this.awsRegion = awsRegion; + this.credentialsProvider = credentialsProvider; + try { this.remoteWriteUrl = new URL(remoteWriteUrl); } catch (MalformedURLException e) { diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java new file mode 100644 index 0000000..b110c33 --- /dev/null +++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.table.PrometheusConfig; +import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory; + +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +import java.util.Properties; + +public class AmazonManagedPrometheusWriteRequestSignerFactory + implements PrometheusDynamicRequestSignerFactory { + public AmazonManagedPrometheusWriteRequestSignerFactory() {} + + @Override + public String requestSignerIdentifer() { + return "amazon-managed-prometheus"; + } + + @Override + public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) { + Properties properties = config.toProperties(); + AWSGeneralUtil.validateAwsConfiguration(properties); + + final AwsCredentialsProvider credentialsProvider = + AWSGeneralUtil.getCredentialsProvider(properties); + final String awsRegion = AWSGeneralUtil.getRegion(properties).toString(); + final String remoteWriteUrl = config.getRemoteWriteEndpointUrl(); + + return new AmazonManagedPrometheusWriteRequestSigner( + remoteWriteUrl, awsRegion, credentialsProvider); + } +} diff --git a/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory b/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory new file mode 100644 index 0000000..fd33d6f --- /dev/null +++ b/flink-connector-prometheus-request-signer-amp/src/main/resources/META-INF/services/org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory \ No newline at end of file diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java new file mode 100644 index 0000000..6bb8a4b --- /dev/null +++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java @@ -0,0 +1,68 @@ +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.table.PrometheusConfig; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.prometheus.table.PrometheusConnectorOption.METRIC_REMOTE_WRITE_URL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class AmazonManagedPrometheusWriteRequestSignerFactoryTest { + private AmazonManagedPrometheusWriteRequestSignerFactory factory; + + @BeforeEach + public void setup() { + factory = new AmazonManagedPrometheusWriteRequestSignerFactory(); + } + + @Test + void testIdentifier() { + assertEquals(factory.requestSignerIdentifer(), "amazon-managed-prometheus"); + } + + @Test + void testCreateRequestSigner() { + final String endpoint = + "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write"; + final String region = "us-east-1"; + Configuration config = new Configuration(); + config.set(METRIC_REMOTE_WRITE_URL, endpoint); + config.setString(AWS_REGION, region); + + PrometheusRequestSigner requestSigner = + factory.getRequestSigner(new PrometheusConfig(config)); + + assertInstanceOf(AmazonManagedPrometheusWriteRequestSigner.class, requestSigner); + } + + @Test + void testCreateRequestSignerFailsWithInvalidRegion() { + final String endpoint = + "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write"; + final String region = "invalid-region"; + Configuration config = new Configuration(); + config.set(METRIC_REMOTE_WRITE_URL, endpoint); + config.setString(AWS_REGION, region); + + assertThrows( + IllegalArgumentException.class, + () -> factory.getRequestSigner(new PrometheusConfig(config))); + } + + @Test + void testCreateRequestSignerFailsWithInvalidURL() { + Configuration config = new Configuration(); + config.set(METRIC_REMOTE_WRITE_URL, "invalid-endpoint"); + config.setString(AWS_REGION, "us-east-1"); + + assertThrows( + IllegalArgumentException.class, + () -> factory.getRequestSigner(new PrometheusConfig(config))); + } +} diff --git a/flink-connector-prometheus/pom.xml b/flink-connector-prometheus/pom.xml index 90b101b..4f3aabb 100644 --- a/flink-connector-prometheus/pom.xml +++ b/flink-connector-prometheus/pom.xml @@ -77,6 +77,31 @@ under the License. httpcore5 + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + true + + org.apache.flink diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java index 78edd7c..da8890e 100644 --- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java +++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java @@ -37,11 +37,12 @@ /** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ @PublicEvolving -public class PrometheusSink extends AsyncSinkBase { +public class PrometheusSink extends AsyncSinkBase { private final String prometheusRemoteWriteUrl; private final PrometheusAsyncHttpClientBuilder clientBuilder; private final PrometheusRequestSigner requestSigner; private final int maxBatchSizeInSamples; + private final int maxRecordSizeInSamples; private final String httpUserAgent; private final PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; @@ -49,7 +50,7 @@ public class PrometheusSink extends AsyncSinkBase elementConverter, + ElementConverter elementConverter, int maxInFlightRequests, int maxBufferedRequests, int maxBatchSizeInSamples, @@ -93,6 +94,7 @@ protected PrometheusSink( Preconditions.checkArgument( StringUtils.isNotBlank(metricGroupName), "Missing metric group name"); this.maxBatchSizeInSamples = maxBatchSizeInSamples; + this.maxRecordSizeInSamples = maxRecordSizeInSamples; this.requestSigner = requestSigner; this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; this.clientBuilder = clientBuilder; @@ -102,8 +104,8 @@ protected PrometheusSink( } @Override - public StatefulSinkWriter> - createWriter(InitContext initContext) { + public StatefulSinkWriter> createWriter( + InitContext initContext) { SinkMetricsCallback metricsCallback = new SinkMetricsCallback( SinkMetrics.registerSinkMetrics( @@ -111,13 +113,13 @@ protected PrometheusSink( CloseableHttpAsyncClient asyncHttpClient = clientBuilder.buildAndStartClient(metricsCallback); - return new PrometheusSinkWriter( + return new PrometheusSinkWriter<>( getElementConverter(), initContext, getMaxInFlightRequests(), getMaxBufferedRequests(), maxBatchSizeInSamples, - getMaxRecordSizeInBytes(), + maxRecordSizeInSamples, getMaxTimeInBufferMS(), prometheusRemoteWriteUrl, asyncHttpClient, @@ -128,23 +130,22 @@ protected PrometheusSink( } @Override - public StatefulSinkWriter> - restoreWriter( - InitContext initContext, - Collection> recoveredState) { + public StatefulSinkWriter> restoreWriter( + InitContext initContext, + Collection> recoveredState) { SinkMetricsCallback metricsCallback = new SinkMetricsCallback( SinkMetrics.registerSinkMetrics( initContext.metricGroup().addGroup(metricGroupName))); CloseableHttpAsyncClient asyncHttpClient = clientBuilder.buildAndStartClient(metricsCallback); - return new PrometheusSinkWriter( + return new PrometheusSinkWriter<>( getElementConverter(), initContext, getMaxInFlightRequests(), getMaxBufferedRequests(), maxBatchSizeInSamples, - getMaxRecordSizeInBytes(), + maxRecordSizeInSamples, getMaxTimeInBufferMS(), prometheusRemoteWriteUrl, asyncHttpClient, @@ -155,8 +156,8 @@ protected PrometheusSink( recoveredState); } - public static PrometheusSinkBuilder builder() { - return new PrometheusSinkBuilder(); + public static PrometheusSinkBuilder builder() { + return new PrometheusSinkBuilder<>(); } @Override diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java index 345c61c..3fb46bc 100644 --- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java +++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.prometheus.sink; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; import org.apache.flink.connector.prometheus.sink.prometheus.Types; @@ -30,9 +30,8 @@ /** Builder for Sink implementation. */ @PublicEvolving -public class PrometheusSinkBuilder - extends AsyncSinkBaseBuilder< - PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder> { private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); // Max batch size, in number of samples @@ -56,22 +55,31 @@ public class PrometheusSinkBuilder private String httpUserAgent = null; private PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig = null; + private ElementConverter elementConverter; private String metricGroupName = null; - @Override - public AsyncSinkBase build() { + public PrometheusSinkBuilder setElementConverter( + ElementConverter elementConverter) { + this.elementConverter = elementConverter; + return this; + } + @Override + public PrometheusSink build() { int actualMaxBatchSizeInSamples = Optional.ofNullable(maxBatchSizeInSamples) .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES); + int actualMaxBufferedRequests = - Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + Optional.ofNullable(super.getMaxBufferedRequests()) + .orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + long actualMaxTimeInBufferMS = - Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); + Optional.ofNullable(super.getMaxTimeInBufferMS()) + .orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); int actualMaxRecordSizeInSamples = Optional.ofNullable(maxRecordSizeInSamples).orElse(actualMaxBatchSizeInSamples); - int actualSocketTimeoutMs = Optional.ofNullable(socketTimeoutMs) .orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS); @@ -116,8 +124,9 @@ public AsyncSinkBase build() { actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(), actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetryableError()); - return new PrometheusSink( - new PrometheusTimeSeriesConverter(), + return new PrometheusSink<>( + Optional.ofNullable(elementConverter) + .orElse(new PrometheusTimeSeriesBaseConverter<>()), MAX_IN_FLIGHT_REQUESTS, actualMaxBufferedRequests, actualMaxBatchSizeInSamples, @@ -132,50 +141,50 @@ public AsyncSinkBase build() { actualMetricGroupName); } - public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { + public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; return this; } - public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { + public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { this.requestSigner = requestSigner; return this; } - public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { + public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { this.maxBatchSizeInSamples = maxBatchSizeInSamples; return this; } - public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { + public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { this.maxRecordSizeInSamples = maxRecordSizeInSamples; return this; } - public PrometheusSinkBuilder setRetryConfiguration( + public PrometheusSinkBuilder setRetryConfiguration( PrometheusSinkConfiguration.RetryConfiguration retryConfiguration) { this.retryConfiguration = retryConfiguration; return this; } - public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { + public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { this.socketTimeoutMs = socketTimeoutMs; return this; } - public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) { + public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) { this.httpUserAgent = httpUserAgent; return this; } - public PrometheusSinkBuilder setErrorHandlingBehaviorConfiguration( + public PrometheusSinkBuilder setErrorHandlingBehaviorConfiguration( PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig) { this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; return this; } - public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) { + public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) { this.metricGroupName = metricGroupName; return this; } @@ -184,20 +193,20 @@ public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) { /** Not supported. Use setMaxBatchSizeInSamples(int) instead */ @Override - public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) { + public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) { throw new UnsupportedOperationException("maxBatchSize is not supported by this sink"); } /** Not supported. Use setMaxBatchSizeInSamples(int) instead */ @Override - public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) { + public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) { throw new UnsupportedOperationException( "maxBatchSizeInBytes is not supported by this sink"); } /** Not supported. Use setMaxRecordSizeInSamples(int) instead */ @Override - public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) { + public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) { throw new UnsupportedOperationException( "maxRecordSizeInBytes is not supported by this sink"); } diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java index f06f451..89ae7fc 100644 --- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java +++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java @@ -41,7 +41,7 @@ import java.util.function.Consumer; /** - * Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. + * Writer, taking care of batching the {@link IN} and handling retries. * *

The batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable number. @@ -59,7 +59,7 @@ * maxBatchSizeInBytes. */ @Internal -public class PrometheusSinkWriter extends AsyncSinkWriter { +public class PrometheusSinkWriter extends AsyncSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); private final SinkMetricsCallback metricsCallback; @@ -69,7 +69,7 @@ public class PrometheusSinkWriter extends AsyncSinkWriter elementConverter, + ElementConverter elementConverter, Sink.InitContext context, int maxInFlightRequests, int maxBufferedRequests, @@ -101,7 +101,7 @@ public PrometheusSinkWriter( } public PrometheusSinkWriter( - ElementConverter elementConverter, + ElementConverter elementConverter, Sink.InitContext context, int maxInFlightRequests, int maxBufferedRequests, diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java index 80fa6ac..de84421 100644 --- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java +++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.prometheus.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -114,6 +115,7 @@ public int hashCode() { private final Label[] labels; private final Sample[] samples; private final String metricName; + private static final String METRIC_NAME_LABEL_NAME = "__name__"; public PrometheusTimeSeries(String metricName, Label[] labels, Sample[] samples) { this.metricName = metricName; @@ -142,6 +144,35 @@ public static Builder builderFrom(PrometheusTimeSeries other) { Arrays.asList(other.labels), Arrays.asList(other.samples), other.metricName); } + public Types.TimeSeries toTimeSeries() { + + Types.TimeSeries.Builder builder = + Types.TimeSeries.newBuilder() + .addLabels( + Types.Label.newBuilder() + .setName(METRIC_NAME_LABEL_NAME) + .setValue(getMetricName()) + .build()); + + for (PrometheusTimeSeries.Label label : getLabels()) { + builder.addLabels( + Types.Label.newBuilder() + .setName(label.getName()) + .setValue(label.getValue()) + .build()); + } + + for (PrometheusTimeSeries.Sample sample : getSamples()) { + builder.addSamples( + Types.Sample.newBuilder() + .setValue(sample.getValue()) + .setTimestamp(sample.getTimestamp()) + .build()); + } + + return builder.build(); + } + /** Builder for sink input pojo instance. */ public static final class Builder { private List

The application expects a single configuration parameter, with the RemoteWrite endpoint URL: + * --prometheusRemoteWriteUrl <URL> + * + *

The application generates rowData internally and sinks to Prometheus. + */ +public class TableAPIExample { + private static final Logger LOGGER = LoggerFactory.getLogger(TableAPIExample.class); + + public static void main(String[] args) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + ParameterTool applicationParameters = ParameterTool.fromArgs(args); + + // Prometheus remote-write URL + String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); + LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + + tEnv.createTable( + "PrometheusSinkTable", + TableDescriptor.forConnector("prometheus") + .schema( + Schema.newBuilder() + .column("test_metric_name", DataTypes.STRING()) + .column("test_label_name", DataTypes.STRING()) + .column("test_sample_key", DataTypes.DOUBLE()) + .column("test_sample_ts_key", DataTypes.TIMESTAMP()) + .build()) + .option("metric.name", "test_metric_name") + .option("metric.label.keys", "test_label_name") + .option("metric.sample.key", "test_sample_key") + .option("metric.sample.timestamp", "test_sample_ts_key") + .option("sink.batch.max-size", "2") + .option("metric.endpoint-url", prometheusRemoteWriteUrl) + // Uncomment the following line to enable the + // AmazonManagedPrometheusWriteRequestSigner + // for signing requests when writing to Amazon Managed Prometheus + // .option("metric.request-signer", "amazon-managed-prometheus") + // .option("aws.region", "us-east-1") + // .option("aws.credentials.provider", "BASIC") + // .option("aws.credentials.provider.basic.accesskeyid", "accesskey") + // .option("aws.credentials.provider.basic.secretkey", "secretkey") + .build()); + + tEnv.fromValues( + row("test_metric_name_1", "label_1", 1.0d, Instant.now()), + row("test_metric_name_1", "label_2", 2.0d, Instant.now()), + row("test_metric_name_1", "label_3", 3.0d, Instant.now())) + .insertInto("PrometheusSinkTable") + .execute(); + } +} diff --git a/flink-sql-connector-prometheus/pom.xml b/flink-sql-connector-prometheus/pom.xml new file mode 100644 index 0000000..1cf6d69 --- /dev/null +++ b/flink-sql-connector-prometheus/pom.xml @@ -0,0 +1,112 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connector-prometheus-parent + 1.1-SNAPSHOT + ../pom.xml + + + flink-sql-connector-prometheus + Flink : Connectors : Prometheus : SQL + jar + + + + org.apache.flink + flink-connector-prometheus + ${project.version} + + + + org.apache.flink + flink-connector-prometheus-request-signer-amp + ${project.version} + + + + org.apache.flink + flink-test-utils + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-connector-aws-base + org.apache.flink:flink-connector-prometheus + org.apache.flink:flink-connector-prometheus-request-signer-amp + org.apache.httpcomponents.client5:* + org.apache.httpcomponents.core5:* + com.google.protobuf:* + software.amazon.awssdk:* + + + + + org.apache.hc.client5 + org.apache.flink.connector.prometheus.shaded.org.apache.hc.client5 + + + org.apache.hc.core5 + org.apache.flink.connector.prometheus.shaded.org.apache.hc.core5 + + + com.google.protobuf + org.apache.flink.connector.prometheus.shaded.com.google.protobuf + + + software.amazon + org.apache.flink.connector.prometheus.shaded.software.amazon + + + + + org.apache.flink:flink-connector-prometheus:* + + profile + + + + + + + + + + diff --git a/flink-sql-connector-prometheus/src/main/resources/META-INF/NOTICE b/flink-sql-connector-prometheus/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..928ae07 --- /dev/null +++ b/flink-sql-connector-prometheus/src/main/resources/META-INF/NOTICE @@ -0,0 +1,30 @@ +flink-sql-connector-dynamodb + +Copyright 2022-2024 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.google.protobuf:protobuf-java:3.22.2 +- org.apache.httpcomponents.client5:httpclient5:5.3.1 +- org.apache.httpcomponents.core5:httpcore5-h2:5.2.4 +- org.apache.httpcomponents.core5:httpcore5:5.3 +- software.amazon.awssdk:annotations:2.25.69 +- software.amazon.awssdk:auth:2.25.69 +- software.amazon.awssdk:checksums-spi:2.25.69 +- software.amazon.awssdk:checksums:2.25.69 +- software.amazon.awssdk:endpoints-spi:2.25.69 +- software.amazon.awssdk:http-auth-aws:2.25.69 +- software.amazon.awssdk:http-auth-spi:2.25.69 +- software.amazon.awssdk:http-auth:2.25.69 +- software.amazon.awssdk:http-client-spi:2.25.69 +- software.amazon.awssdk:identity-spi:2.25.69 +- software.amazon.awssdk:json-utils:2.25.69 +- software.amazon.awssdk:metrics-spi:2.25.69 +- software.amazon.awssdk:profiles:2.25.69 +- software.amazon.awssdk:regions:2.25.69 +- software.amazon.awssdk:sdk-core:2.25.69 +- software.amazon.awssdk:third-party-jackson-core:2.25.69 +- software.amazon.awssdk:utils:2.25.69 diff --git a/flink-sql-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/PackagingITCase.java b/flink-sql-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/PackagingITCase.java new file mode 100644 index 0000000..1513987 --- /dev/null +++ b/flink-sql-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/PackagingITCase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus; + +import org.apache.flink.packaging.PackagingTestUtils; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.test.resources.ResourceTestUtils; + +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.util.Arrays; + +class PackagingITCase { + + @Test + void testPackaging() throws Exception { + final Path jar = + ResourceTestUtils.getResource(".*/flink-sql-connector-prometheus[^/]*\\.jar"); + + PackagingTestUtils.assertJarContainsOnlyFilesMatching( + jar, + Arrays.asList( + "org/apache/flink/", + "META-INF/", + "mozilla/", + "google/protobuf/", + "mime.types", + "VersionInfo.java")); + PackagingTestUtils.assertJarContainsServiceEntry(jar, Factory.class); + } +} diff --git a/pom.xml b/pom.xml index 0d1dc14..a93daf2 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ under the License. flink-connector-prometheus flink-connector-prometheus-request-signer-amp + flink-sql-connector-prometheus @@ -156,6 +157,19 @@ under the License. test + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test +