Skip to content

Commit 5644639

Browse files
committed
[FLINK-33139] Add Prometheus Sink Table API
1 parent 233e2ff commit 5644639

27 files changed

+1425
-104
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ This change added tests and can be verified as follows:
3232
*(example:)*
3333
- *Added integration tests for end-to-end deployment*
3434
- *Added unit tests*
35-
- *Manually verified by running the Kinesis connector on a local Flink cluster.*
35+
- *Manually verified by running the Prometheus connector on a local Flink cluster.*
3636

3737
## Significant changes
3838
*(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*

docs/data/prometheus.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
version: 1.1-SNAPSHOT
20+
variants:
21+
- maven: flink-connector-prometheus
22+
- maven: flink-connector-prometheus-request-signer-amp

flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ private AwsCredentialsProvider getCredentialsProvider() {
9898
return credentialsProvider;
9999
}
100100

101+
@VisibleForTesting
102+
String getAwsRegion() {
103+
return awsRegion;
104+
}
105+
101106
/**
102107
* Add the additional Http request headers required by Amazon Managed Prometheus:
103108
* 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' and 'Authorization`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
4+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
5+
import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
6+
7+
import java.util.regex.Matcher;
8+
import java.util.regex.Pattern;
9+
10+
public class AmazonManagedPrometheusWriteRequestSignerFactory
11+
implements PrometheusDynamicRequestSignerFactory {
12+
@Override
13+
public String requestSignerIdentifer() {
14+
return "amazon-managed-prometheus";
15+
}
16+
17+
@Override
18+
public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
19+
return new AmazonManagedPrometheusWriteRequestSigner(
20+
config.getRemoteWriteEndpointUrl(),
21+
getAwsRegionFromEndpointUrl(config.getRemoteWriteEndpointUrl()));
22+
}
23+
24+
private String getAwsRegionFromEndpointUrl(String endpointUrl) {
25+
String regex = "aps-workspaces\\.([^.]+)\\.amazonaws\\.com";
26+
Pattern pattern = Pattern.compile(regex);
27+
Matcher matcher = pattern.matcher(endpointUrl);
28+
29+
if (matcher.find()) {
30+
return matcher.group(1);
31+
} else {
32+
throw new IllegalArgumentException(
33+
"Failed to extract AWS region from endpoint URL: " + endpointUrl);
34+
}
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
5+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
6+
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.apache.flink.connector.prometheus.table.PrometheusConnectorOption.METRIC_REMOTE_WRITE_URL;
11+
import static org.junit.jupiter.api.Assertions.assertEquals;
12+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
13+
import static org.junit.jupiter.api.Assertions.assertThrows;
14+
15+
class AmazonManagedPrometheusWriteRequestSignerFactoryTest {
16+
private AmazonManagedPrometheusWriteRequestSignerFactory factory;
17+
18+
@BeforeEach
19+
public void setup() {
20+
factory = new AmazonManagedPrometheusWriteRequestSignerFactory();
21+
}
22+
23+
@Test
24+
void testIdentifier() {
25+
assertEquals(factory.requestSignerIdentifer(), "amazon-managed-prometheus");
26+
}
27+
28+
@Test
29+
void testCreateRequestSigner() {
30+
Configuration config = new Configuration();
31+
config.set(
32+
METRIC_REMOTE_WRITE_URL,
33+
"https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-abc/api/v1/remote_write");
34+
35+
PrometheusRequestSigner requestSigner =
36+
factory.getRequestSigner(new PrometheusConfig(config));
37+
38+
assertInstanceOf(AmazonManagedPrometheusWriteRequestSigner.class, requestSigner);
39+
assertEquals(
40+
((AmazonManagedPrometheusWriteRequestSigner) requestSigner).getAwsRegion(),
41+
"us-east-1");
42+
}
43+
44+
@Test
45+
void testCreateRequestSignerFailsWithInvalidRegion() {
46+
Configuration config = new Configuration();
47+
config.set(
48+
METRIC_REMOTE_WRITE_URL,
49+
"https://aps-workspaces.amazonaws.com/workspaces/ws-abc/api/v1/remote_write");
50+
51+
assertThrows(
52+
IllegalArgumentException.class,
53+
() -> factory.getRequestSigner(new PrometheusConfig(config)));
54+
}
55+
}

flink-connector-prometheus/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ under the License.
7777
<artifactId>httpcore5</artifactId>
7878
</dependency>
7979

80+
<!--Table API dependencies-->
81+
<dependency>
82+
<groupId>org.apache.flink</groupId>
83+
<artifactId>flink-table-common</artifactId>
84+
<version>${flink.version}</version>
85+
<type>test-jar</type>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>org.apache.flink</groupId>
91+
<artifactId>flink-table-api-java-bridge</artifactId>
92+
<version>${flink.version}</version>
93+
<scope>provided</scope>
94+
<optional>true</optional>
95+
</dependency>
96+
97+
<dependency>
98+
<groupId>org.apache.flink</groupId>
99+
<artifactId>flink-table-runtime</artifactId>
100+
<version>${flink.version}</version>
101+
<scope>provided</scope>
102+
<optional>true</optional>
103+
</dependency>
104+
80105
<!-- test -->
81106
<dependency>
82107
<groupId>org.apache.flink</groupId>

flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
3939
@PublicEvolving
40-
public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> {
40+
public class PrometheusSink<IN> extends AsyncSinkBase<IN, Types.TimeSeries> {
4141
private final String prometheusRemoteWriteUrl;
4242
private final PrometheusAsyncHttpClientBuilder clientBuilder;
4343
private final PrometheusRequestSigner requestSigner;
@@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti
4949

5050
@SuppressWarnings("checkstyle:RegexpSingleline")
5151
protected PrometheusSink(
52-
ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter,
52+
ElementConverter<IN, Types.TimeSeries> elementConverter,
5353
int maxInFlightRequests,
5454
int maxBufferedRequests,
5555
int maxBatchSizeInSamples,
56-
int maxRecordSizeInSamples,
56+
long maxRecordSizeInSamples,
5757
long maxTimeInBufferMS,
5858
String prometheusRemoteWriteUrl,
5959
PrometheusAsyncHttpClientBuilder clientBuilder,
@@ -102,16 +102,16 @@ protected PrometheusSink(
102102
}
103103

104104
@Override
105-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
106-
createWriter(InitContext initContext) {
105+
public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> createWriter(
106+
InitContext initContext) {
107107
SinkMetricsCallback metricsCallback =
108108
new SinkMetricsCallback(
109109
SinkMetrics.registerSinkMetrics(
110110
initContext.metricGroup().addGroup(metricGroupName)));
111111
CloseableHttpAsyncClient asyncHttpClient =
112112
clientBuilder.buildAndStartClient(metricsCallback);
113113

114-
return new PrometheusSinkWriter(
114+
return new PrometheusSinkWriter<>(
115115
getElementConverter(),
116116
initContext,
117117
getMaxInFlightRequests(),
@@ -128,17 +128,16 @@ protected PrometheusSink(
128128
}
129129

130130
@Override
131-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
132-
restoreWriter(
133-
InitContext initContext,
134-
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
131+
public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> restoreWriter(
132+
InitContext initContext,
133+
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
135134
SinkMetricsCallback metricsCallback =
136135
new SinkMetricsCallback(
137136
SinkMetrics.registerSinkMetrics(
138137
initContext.metricGroup().addGroup(metricGroupName)));
139138
CloseableHttpAsyncClient asyncHttpClient =
140139
clientBuilder.buildAndStartClient(metricsCallback);
141-
return new PrometheusSinkWriter(
140+
return new PrometheusSinkWriter<>(
142141
getElementConverter(),
143142
initContext,
144143
getMaxInFlightRequests(),
@@ -155,8 +154,8 @@ protected PrometheusSink(
155154
recoveredState);
156155
}
157156

158-
public static PrometheusSinkBuilder builder() {
159-
return new PrometheusSinkBuilder();
157+
public static <IN> PrometheusSinkBuilder<IN> builder() {
158+
return new PrometheusSinkBuilder<>();
160159
}
161160

162161
@Override

0 commit comments

Comments
 (0)