Skip to content

Commit c847a3a

Browse files
committed
[FLINK-33139] Add Prometheus Sink Table API
1 parent 233e2ff commit c847a3a

File tree

35 files changed

+1835
-117
lines changed

35 files changed

+1835
-117
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ This change added tests and can be verified as follows:
3232
*(example:)*
3333
- *Added integration tests for end-to-end deployment*
3434
- *Added unit tests*
35-
- *Manually verified by running the Kinesis connector on a local Flink cluster.*
35+
- *Manually verified by running the Prometheus connector on a local Flink cluster.*
3636

3737
## Significant changes
3838
*(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*

docs/data/prometheus.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
version: 1.1-SNAPSHOT
20+
variants:
21+
- maven: flink-connector-prometheus
22+
- maven: flink-connector-prometheus-request-signer-amp

flink-connector-prometheus-request-signer-amp/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ under the License.
5454
<scope>provided</scope>
5555
</dependency>
5656

57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-connector-aws-base</artifactId>
60+
<version>5.0.0-1.20</version>
61+
</dependency>
62+
5763
<dependency>
5864
<groupId>software.amazon.awssdk</groupId>
5965
<artifactId>auth</artifactId>

flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,19 @@ public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequ
5959
* @param awsRegion Region of the AMP workspace
6060
*/
6161
public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) {
62+
this(remoteWriteUrl, awsRegion, DefaultCredentialsProvider.create());
63+
}
64+
65+
public AmazonManagedPrometheusWriteRequestSigner(
66+
String remoteWriteUrl, String awsRegion, AwsCredentialsProvider credentialsProvider) {
6267
Preconditions.checkArgument(
6368
StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
6469
Preconditions.checkArgument(
6570
StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
6671

6772
this.awsRegion = awsRegion;
73+
this.credentialsProvider = credentialsProvider;
74+
6875
try {
6976
this.remoteWriteUrl = new URL(remoteWriteUrl);
7077
} catch (MalformedURLException e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.prometheus.sink.aws;
19+
20+
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
21+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
22+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
23+
import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
24+
25+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
26+
27+
import java.util.Properties;
28+
29+
public class AmazonManagedPrometheusWriteRequestSignerFactory
30+
implements PrometheusDynamicRequestSignerFactory {
31+
public AmazonManagedPrometheusWriteRequestSignerFactory() {}
32+
33+
@Override
34+
public String requestSignerIdentifer() {
35+
return "amazon-managed-prometheus";
36+
}
37+
38+
@Override
39+
public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
40+
Properties properties = config.toProperties();
41+
AWSGeneralUtil.validateAwsConfiguration(properties);
42+
43+
final AwsCredentialsProvider credentialsProvider =
44+
AWSGeneralUtil.getCredentialsProvider(properties);
45+
final String awsRegion = AWSGeneralUtil.getRegion(properties).toString();
46+
final String remoteWriteUrl = config.getRemoteWriteEndpointUrl();
47+
48+
return new AmazonManagedPrometheusWriteRequestSigner(
49+
remoteWriteUrl, awsRegion, credentialsProvider);
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
5+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
6+
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
11+
import static org.apache.flink.connector.prometheus.table.PrometheusConnectorOption.METRIC_REMOTE_WRITE_URL;
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
16+
class AmazonManagedPrometheusWriteRequestSignerFactoryTest {
17+
private AmazonManagedPrometheusWriteRequestSignerFactory factory;
18+
19+
@BeforeEach
20+
public void setup() {
21+
factory = new AmazonManagedPrometheusWriteRequestSignerFactory();
22+
}
23+
24+
@Test
25+
void testIdentifier() {
26+
assertEquals(factory.requestSignerIdentifer(), "amazon-managed-prometheus");
27+
}
28+
29+
@Test
30+
void testCreateRequestSigner() {
31+
final String endpoint =
32+
"https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write";
33+
final String region = "us-east-1";
34+
Configuration config = new Configuration();
35+
config.set(METRIC_REMOTE_WRITE_URL, endpoint);
36+
config.setString(AWS_REGION, region);
37+
38+
PrometheusRequestSigner requestSigner =
39+
factory.getRequestSigner(new PrometheusConfig(config));
40+
41+
assertInstanceOf(AmazonManagedPrometheusWriteRequestSigner.class, requestSigner);
42+
}
43+
44+
@Test
45+
void testCreateRequestSignerFailsWithInvalidRegion() {
46+
final String endpoint =
47+
"https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc/api/v1/remote_write";
48+
final String region = "invalid-region";
49+
Configuration config = new Configuration();
50+
config.set(METRIC_REMOTE_WRITE_URL, endpoint);
51+
config.setString(AWS_REGION, region);
52+
53+
assertThrows(
54+
IllegalArgumentException.class,
55+
() -> factory.getRequestSigner(new PrometheusConfig(config)));
56+
}
57+
58+
@Test
59+
void testCreateRequestSignerFailsWithInvalidURL() {
60+
Configuration config = new Configuration();
61+
config.set(METRIC_REMOTE_WRITE_URL, "invalid-endpoint");
62+
config.setString(AWS_REGION, "us-east-1");
63+
64+
assertThrows(
65+
IllegalArgumentException.class,
66+
() -> factory.getRequestSigner(new PrometheusConfig(config)));
67+
}
68+
}

flink-connector-prometheus/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ under the License.
7777
<artifactId>httpcore5</artifactId>
7878
</dependency>
7979

80+
<!--Table API dependencies-->
81+
<dependency>
82+
<groupId>org.apache.flink</groupId>
83+
<artifactId>flink-table-common</artifactId>
84+
<version>${flink.version}</version>
85+
<type>test-jar</type>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>org.apache.flink</groupId>
91+
<artifactId>flink-table-api-java-bridge</artifactId>
92+
<version>${flink.version}</version>
93+
<scope>provided</scope>
94+
<optional>true</optional>
95+
</dependency>
96+
97+
<dependency>
98+
<groupId>org.apache.flink</groupId>
99+
<artifactId>flink-table-runtime</artifactId>
100+
<version>${flink.version}</version>
101+
<scope>provided</scope>
102+
<optional>true</optional>
103+
</dependency>
104+
80105
<!-- test -->
81106
<dependency>
82107
<groupId>org.apache.flink</groupId>

flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,20 @@
3737

3838
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
3939
@PublicEvolving
40-
public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> {
40+
public class PrometheusSink<IN> extends AsyncSinkBase<IN, Types.TimeSeries> {
4141
private final String prometheusRemoteWriteUrl;
4242
private final PrometheusAsyncHttpClientBuilder clientBuilder;
4343
private final PrometheusRequestSigner requestSigner;
4444
private final int maxBatchSizeInSamples;
45+
private final int maxRecordSizeInSamples;
4546
private final String httpUserAgent;
4647
private final PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
4748
errorHandlingBehaviorConfig;
4849
private final String metricGroupName;
4950

5051
@SuppressWarnings("checkstyle:RegexpSingleline")
5152
protected PrometheusSink(
52-
ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter,
53+
ElementConverter<IN, Types.TimeSeries> elementConverter,
5354
int maxInFlightRequests,
5455
int maxBufferedRequests,
5556
int maxBatchSizeInSamples,
@@ -93,6 +94,7 @@ protected PrometheusSink(
9394
Preconditions.checkArgument(
9495
StringUtils.isNotBlank(metricGroupName), "Missing metric group name");
9596
this.maxBatchSizeInSamples = maxBatchSizeInSamples;
97+
this.maxRecordSizeInSamples = maxRecordSizeInSamples;
9698
this.requestSigner = requestSigner;
9799
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
98100
this.clientBuilder = clientBuilder;
@@ -102,22 +104,22 @@ protected PrometheusSink(
102104
}
103105

104106
@Override
105-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
106-
createWriter(InitContext initContext) {
107+
public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> createWriter(
108+
InitContext initContext) {
107109
SinkMetricsCallback metricsCallback =
108110
new SinkMetricsCallback(
109111
SinkMetrics.registerSinkMetrics(
110112
initContext.metricGroup().addGroup(metricGroupName)));
111113
CloseableHttpAsyncClient asyncHttpClient =
112114
clientBuilder.buildAndStartClient(metricsCallback);
113115

114-
return new PrometheusSinkWriter(
116+
return new PrometheusSinkWriter<>(
115117
getElementConverter(),
116118
initContext,
117119
getMaxInFlightRequests(),
118120
getMaxBufferedRequests(),
119121
maxBatchSizeInSamples,
120-
getMaxRecordSizeInBytes(),
122+
maxRecordSizeInSamples,
121123
getMaxTimeInBufferMS(),
122124
prometheusRemoteWriteUrl,
123125
asyncHttpClient,
@@ -128,23 +130,22 @@ protected PrometheusSink(
128130
}
129131

130132
@Override
131-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
132-
restoreWriter(
133-
InitContext initContext,
134-
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
133+
public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> restoreWriter(
134+
InitContext initContext,
135+
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
135136
SinkMetricsCallback metricsCallback =
136137
new SinkMetricsCallback(
137138
SinkMetrics.registerSinkMetrics(
138139
initContext.metricGroup().addGroup(metricGroupName)));
139140
CloseableHttpAsyncClient asyncHttpClient =
140141
clientBuilder.buildAndStartClient(metricsCallback);
141-
return new PrometheusSinkWriter(
142+
return new PrometheusSinkWriter<>(
142143
getElementConverter(),
143144
initContext,
144145
getMaxInFlightRequests(),
145146
getMaxBufferedRequests(),
146147
maxBatchSizeInSamples,
147-
getMaxRecordSizeInBytes(),
148+
maxRecordSizeInSamples,
148149
getMaxTimeInBufferMS(),
149150
prometheusRemoteWriteUrl,
150151
asyncHttpClient,
@@ -155,8 +156,8 @@ protected PrometheusSink(
155156
recoveredState);
156157
}
157158

158-
public static PrometheusSinkBuilder builder() {
159-
return new PrometheusSinkBuilder();
159+
public static <IN> PrometheusSinkBuilder<IN> builder() {
160+
return new PrometheusSinkBuilder<>();
160161
}
161162

162163
@Override

0 commit comments

Comments
 (0)