Skip to content

Commit 3cabc3c

Browse files
committed
[FLINK-33139] Add Prometheus Sink Table API
1 parent 233e2ff commit 3cabc3c

File tree

34 files changed

+2123
-357
lines changed

34 files changed

+2123
-357
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ This change added tests and can be verified as follows:
3232
*(example:)*
3333
- *Added integration tests for end-to-end deployment*
3434
- *Added unit tests*
35-
- *Manually verified by running the Kinesis connector on a local Flink cluster.*
35+
- *Manually verified by running the Prometheus connector on a local Flink cluster.*
3636

3737
## Significant changes
3838
*(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*

docs/data/prometheus.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
version: 1.1-SNAPSHOT
20+
variants:
21+
- maven: flink-connector-prometheus
22+
- maven: flink-connector-prometheus-request-signer-amp

flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ private AwsCredentialsProvider getCredentialsProvider() {
9898
return credentialsProvider;
9999
}
100100

101+
@VisibleForTesting
102+
String getAwsRegion() {
103+
return awsRegion;
104+
}
105+
101106
/**
102107
* Add the additional Http request headers required by Amazon Managed Prometheus:
103108
* 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' and 'Authorization`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
4+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
5+
import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
6+
7+
import java.util.regex.Matcher;
8+
import java.util.regex.Pattern;
9+
10+
public class AmazonManagedPrometheusWriteRequestSignerFactory
11+
implements PrometheusDynamicRequestSignerFactory {
12+
@Override
13+
public String requestSignerIdentifer() {
14+
return "amazon-managed-prometheus";
15+
}
16+
17+
@Override
18+
public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
19+
return new AmazonManagedPrometheusWriteRequestSigner(
20+
config.getRemoteWriteEndpointUrl(),
21+
getAwsRegionFromEndpointUrl(config.getRemoteWriteEndpointUrl()));
22+
}
23+
24+
private String getAwsRegionFromEndpointUrl(String endpointUrl) {
25+
String regex = "aps[a-zA-Z0-9-]*\\.([^.]+)\\.";
26+
Pattern pattern = Pattern.compile(regex);
27+
Matcher matcher = pattern.matcher(endpointUrl);
28+
29+
if (matcher.find()) {
30+
return matcher.group(1);
31+
} else {
32+
throw new IllegalArgumentException(
33+
"Failed to extract AWS region from endpoint URL: " + endpointUrl);
34+
}
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
5+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
6+
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.params.ParameterizedTest;
10+
import org.junit.jupiter.params.provider.Arguments;
11+
import org.junit.jupiter.params.provider.MethodSource;
12+
13+
import java.util.stream.Stream;
14+
15+
import static org.apache.flink.connector.prometheus.table.PrometheusConnectorOption.METRIC_REMOTE_WRITE_URL;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
20+
class AmazonManagedPrometheusWriteRequestSignerFactoryTest {
21+
private AmazonManagedPrometheusWriteRequestSignerFactory factory;
22+
23+
@BeforeEach
24+
public void setup() {
25+
factory = new AmazonManagedPrometheusWriteRequestSignerFactory();
26+
}
27+
28+
@Test
29+
void testIdentifier() {
30+
assertEquals(factory.requestSignerIdentifer(), "amazon-managed-prometheus");
31+
}
32+
33+
@ParameterizedTest
34+
@MethodSource("provideAMPEndpoint")
35+
void testCreateRequestSigner(final String endpoint, final String expectedRegion) {
36+
Configuration config = new Configuration();
37+
config.set(METRIC_REMOTE_WRITE_URL, endpoint);
38+
39+
PrometheusRequestSigner requestSigner =
40+
factory.getRequestSigner(new PrometheusConfig(config));
41+
42+
assertInstanceOf(AmazonManagedPrometheusWriteRequestSigner.class, requestSigner);
43+
assertEquals(
44+
((AmazonManagedPrometheusWriteRequestSigner) requestSigner).getAwsRegion(),
45+
expectedRegion);
46+
}
47+
48+
private static Stream<Arguments> provideAMPEndpoint() {
49+
// Extracted from https://docs.aws.amazon.com/general/latest/gr/prometheus-service.html
50+
return Stream.of(
51+
Arguments.of("https://aps.us-east-2.amazonaws.com", "us-east-2"),
52+
Arguments.of("https://aps-workspaces.us-east-2.amazonaws.com", "us-east-2"),
53+
Arguments.of("https://aps-workspaces-fips.us-east-2.amazonaws.com", "us-east-2"),
54+
Arguments.of("https://aps-workspaces-fips.us-east-2.api.aws", "us-east-2"),
55+
Arguments.of("https://aps-workspaces.us-east-2.api.aws", "us-east-2"),
56+
Arguments.of("https://aps-fips.us-east-2.amazonaws.com", "us-east-2"),
57+
Arguments.of("https://aps.us-east-2.api.aws", "us-east-2"),
58+
Arguments.of("https://aps-fips.us-east-2.api.aws", "us-east-2"),
59+
Arguments.of("https://aps.us-east-1.amazonaws.com", "us-east-1"),
60+
Arguments.of("https://aps-workspaces.us-east-1.amazonaws.com", "us-east-1"),
61+
Arguments.of("https://aps-workspaces-fips.us-east-1.amazonaws.com", "us-east-1"),
62+
Arguments.of("https://aps-workspaces-fips.us-east-1.api.aws", "us-east-1"),
63+
Arguments.of("https://aps-workspaces.us-east-1.api.aws", "us-east-1"),
64+
Arguments.of("https://aps-fips.us-east-1.amazonaws.com", "us-east-1"),
65+
Arguments.of("https://aps.us-east-1.api.aws", "us-east-1"),
66+
Arguments.of("https://aps-fips.us-east-1.api.aws", "us-east-1"),
67+
Arguments.of("https://aps.us-west-2.amazonaws.com", "us-west-2"),
68+
Arguments.of("https://aps-workspaces.us-west-2.amazonaws.com", "us-west-2"),
69+
Arguments.of("https://aps-workspaces-fips.us-west-2.amazonaws.com", "us-west-2"),
70+
Arguments.of("https://aps-workspaces-fips.us-west-2.api.aws", "us-west-2"),
71+
Arguments.of("https://aps-workspaces.us-west-2.api.aws", "us-west-2"),
72+
Arguments.of("https://aps-fips.us-west-2.amazonaws.com", "us-west-2"),
73+
Arguments.of("https://aps.us-west-2.api.aws", "us-west-2"),
74+
Arguments.of("https://aps-fips.us-west-2.api.aws", "us-west-2"),
75+
Arguments.of("https://aps.ap-south-1.amazonaws.com", "ap-south-1"),
76+
Arguments.of("https://aps-workspaces.ap-south-1.amazonaws.com", "ap-south-1"),
77+
Arguments.of("https://aps-workspaces.ap-south-1.api.aws", "ap-south-1"),
78+
Arguments.of("https://aps.ap-south-1.api.aws", "ap-south-1"),
79+
Arguments.of("https://aps.ap-northeast-2.amazonaws.com", "ap-northeast-2"),
80+
Arguments.of(
81+
"https://aps-workspaces.ap-northeast-2.amazonaws.com", "ap-northeast-2"),
82+
Arguments.of("https://aps-workspaces.ap-northeast-2.api.aws", "ap-northeast-2"),
83+
Arguments.of("https://aps.ap-northeast-2.api.aws ", "ap-northeast-2"),
84+
Arguments.of("https://aps.ap-southeast-1.amazonaws.com", "ap-southeast-1"),
85+
Arguments.of(
86+
"https://aps-workspaces.ap-southeast-1.amazonaws.com", "ap-southeast-1"),
87+
Arguments.of("https://aps-workspaces.ap-southeast-1.api.aws", "ap-southeast-1"),
88+
Arguments.of("https://aps.ap-southeast-1.api.aws", "ap-southeast-1"),
89+
Arguments.of("https://aps.ap-southeast-2.amazonaws.com", "ap-southeast-2"),
90+
Arguments.of(
91+
"https://aps-workspaces.ap-southeast-2.amazonaws.com", "ap-southeast-2"),
92+
Arguments.of("https://aps-workspaces.ap-southeast-2.api.aws", "ap-southeast-2"),
93+
Arguments.of("https://aps.ap-southeast-2.api.aws", "ap-southeast-2"),
94+
Arguments.of("https://aps.ap-northeast-1.amazonaws.com", "ap-northeast-1"),
95+
Arguments.of(
96+
"https://aps-workspaces.ap-northeast-1.amazonaws.com", "ap-northeast-1"),
97+
Arguments.of("https://aps-workspaces.ap-northeast-1.api.aws", "ap-northeast-1"),
98+
Arguments.of("https://aps.ap-northeast-1.api.aws", "ap-northeast-1"),
99+
Arguments.of("https://aps.eu-central-1.amazonaws.com", "eu-central-1"),
100+
Arguments.of("https://aps-workspaces.eu-central-1.amazonaws.com", "eu-central-1"),
101+
Arguments.of("https://aps-workspaces.eu-central-1.api.aws", "eu-central-1"),
102+
Arguments.of("https://aps.eu-central-1.api.aws", "eu-central-1"),
103+
Arguments.of("https://aps.eu-west-1.amazonaws.com", "eu-west-1"),
104+
Arguments.of("https://aps-workspaces.eu-west-1.amazonaws.com", "eu-west-1"),
105+
Arguments.of("https://aps-workspaces.eu-west-1.api.aws", "eu-west-1"),
106+
Arguments.of("https://aps.eu-west-1.api.aws", "eu-west-1"),
107+
Arguments.of("https://aps.eu-west-2.amazonaws.com", "eu-west-2"),
108+
Arguments.of("https://aps-workspaces.eu-west-2.amazonaws.com", "eu-west-2"),
109+
Arguments.of("https://aps-workspaces.eu-west-2.api.aws", "eu-west-2"),
110+
Arguments.of("https://aps.eu-west-2.api.aws", "eu-west-2"),
111+
Arguments.of("https://aps.eu-west-3.amazonaws.com", "eu-west-3"),
112+
Arguments.of("https://aps-workspaces.eu-west-3.amazonaws.com", "eu-west-3"),
113+
Arguments.of("https://aps-workspaces.eu-west-3.api.aws", "eu-west-3"),
114+
Arguments.of("https://aps.eu-west-3.api.aws", "eu-west-3"),
115+
Arguments.of("https://aps.eu-north-1.amazonaws.com", "eu-north-1"),
116+
Arguments.of("https://aps-workspaces.eu-north-1.amazonaws.com", "eu-north-1"),
117+
Arguments.of("https://aps-workspaces.eu-north-1.api.aws", "eu-north-1"),
118+
Arguments.of("https://aps.eu-north-1.api.aws", "eu-north-1"),
119+
Arguments.of("https://aps.sa-east-1.amazonaws.com", "sa-east-1"),
120+
Arguments.of("https://aps-workspaces.sa-east-1.amazonaws.com", "sa-east-1"),
121+
Arguments.of("https://aps-workspaces.sa-east-1.api.aws", "sa-east-1"),
122+
Arguments.of("https://aps.sa-east-1.api.aws", "sa-east-1"));
123+
}
124+
125+
@Test
126+
void testCreateRequestSignerFailsWithInvalidRegion() {
127+
Configuration config = new Configuration();
128+
config.set(METRIC_REMOTE_WRITE_URL, "https://invalid-endpoint");
129+
130+
assertThrows(
131+
IllegalArgumentException.class,
132+
() -> factory.getRequestSigner(new PrometheusConfig(config)));
133+
}
134+
}

flink-connector-prometheus/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ under the License.
7777
<artifactId>httpcore5</artifactId>
7878
</dependency>
7979

80+
<!--Table API dependencies-->
81+
<dependency>
82+
<groupId>org.apache.flink</groupId>
83+
<artifactId>flink-table-common</artifactId>
84+
<version>${flink.version}</version>
85+
<type>test-jar</type>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>org.apache.flink</groupId>
91+
<artifactId>flink-table-api-java-bridge</artifactId>
92+
<version>${flink.version}</version>
93+
<scope>provided</scope>
94+
<optional>true</optional>
95+
</dependency>
96+
97+
<dependency>
98+
<groupId>org.apache.flink</groupId>
99+
<artifactId>flink-table-runtime</artifactId>
100+
<version>${flink.version}</version>
101+
<scope>provided</scope>
102+
<optional>true</optional>
103+
</dependency>
104+
80105
<!-- test -->
81106
<dependency>
82107
<groupId>org.apache.flink</groupId>

0 commit comments

Comments
 (0)