Skip to content

Commit e200be8

Browse files
committed
[FLINK-33139] Add Prometheus Sink Table API
1 parent 233e2ff commit e200be8

25 files changed

+1373
-104
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ This change added tests and can be verified as follows:
3232
*(example:)*
3333
- *Added integration tests for end-to-end deployment*
3434
- *Added unit tests*
35-
- *Manually verified by running the Kinesis connector on a local Flink cluster.*
35+
- *Manually verified by running the Prometheus connector on a local Flink cluster.*
3636

3737
## Significant changes
3838
*(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*

docs/data/prometheus.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
version: 1.1-SNAPSHOT
20+
variants:
21+
- maven: flink-connector-prometheus
22+
- maven: flink-connector-prometheus-request-signer-amp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.apache.flink.connector.prometheus.sink.aws;
2+
3+
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
4+
import org.apache.flink.connector.prometheus.table.PrometheusConfig;
5+
import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
6+
7+
public class AmazonManagedPrometheusWriteRequestSignerFactory
8+
implements PrometheusDynamicRequestSignerFactory {
9+
@Override
10+
public String requestSignerIdentifer() {
11+
return "amazon-managed-prometheus";
12+
}
13+
14+
@Override
15+
public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
16+
return new AmazonManagedPrometheusWriteRequestSigner(
17+
config.getRemoteWriteEndpointUrl(), config.getRemoteWriteEndpointRegion());
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSignerFactory

flink-connector-prometheus/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ under the License.
7777
<artifactId>httpcore5</artifactId>
7878
</dependency>
7979

80+
<!--Table API dependencies-->
81+
<dependency>
82+
<groupId>org.apache.flink</groupId>
83+
<artifactId>flink-table-common</artifactId>
84+
<version>${flink.version}</version>
85+
<type>test-jar</type>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>org.apache.flink</groupId>
91+
<artifactId>flink-table-api-java-bridge</artifactId>
92+
<version>${flink.version}</version>
93+
<scope>provided</scope>
94+
<optional>true</optional>
95+
</dependency>
96+
97+
<dependency>
98+
<groupId>org.apache.flink</groupId>
99+
<artifactId>flink-table-runtime</artifactId>
100+
<version>${flink.version}</version>
101+
<scope>provided</scope>
102+
<optional>true</optional>
103+
</dependency>
104+
80105
<!-- test -->
81106
<dependency>
82107
<groupId>org.apache.flink</groupId>

flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
3939
@PublicEvolving
40-
public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> {
40+
public class PrometheusSink<InputT> extends AsyncSinkBase<InputT, Types.TimeSeries> {
4141
private final String prometheusRemoteWriteUrl;
4242
private final PrometheusAsyncHttpClientBuilder clientBuilder;
4343
private final PrometheusRequestSigner requestSigner;
@@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti
4949

5050
@SuppressWarnings("checkstyle:RegexpSingleline")
5151
protected PrometheusSink(
52-
ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter,
52+
ElementConverter<InputT, Types.TimeSeries> elementConverter,
5353
int maxInFlightRequests,
5454
int maxBufferedRequests,
5555
int maxBatchSizeInSamples,
56-
int maxRecordSizeInSamples,
56+
long maxRecordSizeInSamples,
5757
long maxTimeInBufferMS,
5858
String prometheusRemoteWriteUrl,
5959
PrometheusAsyncHttpClientBuilder clientBuilder,
@@ -102,16 +102,16 @@ protected PrometheusSink(
102102
}
103103

104104
@Override
105-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
106-
createWriter(InitContext initContext) {
105+
public StatefulSinkWriter<InputT, BufferedRequestState<Types.TimeSeries>> createWriter(
106+
InitContext initContext) {
107107
SinkMetricsCallback metricsCallback =
108108
new SinkMetricsCallback(
109109
SinkMetrics.registerSinkMetrics(
110110
initContext.metricGroup().addGroup(metricGroupName)));
111111
CloseableHttpAsyncClient asyncHttpClient =
112112
clientBuilder.buildAndStartClient(metricsCallback);
113113

114-
return new PrometheusSinkWriter(
114+
return new PrometheusSinkWriter<>(
115115
getElementConverter(),
116116
initContext,
117117
getMaxInFlightRequests(),
@@ -128,17 +128,16 @@ protected PrometheusSink(
128128
}
129129

130130
@Override
131-
public StatefulSinkWriter<PrometheusTimeSeries, BufferedRequestState<Types.TimeSeries>>
132-
restoreWriter(
133-
InitContext initContext,
134-
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
131+
public StatefulSinkWriter<InputT, BufferedRequestState<Types.TimeSeries>> restoreWriter(
132+
InitContext initContext,
133+
Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) {
135134
SinkMetricsCallback metricsCallback =
136135
new SinkMetricsCallback(
137136
SinkMetrics.registerSinkMetrics(
138137
initContext.metricGroup().addGroup(metricGroupName)));
139138
CloseableHttpAsyncClient asyncHttpClient =
140139
clientBuilder.buildAndStartClient(metricsCallback);
141-
return new PrometheusSinkWriter(
140+
return new PrometheusSinkWriter<>(
142141
getElementConverter(),
143142
initContext,
144143
getMaxInFlightRequests(),
@@ -155,8 +154,8 @@ protected PrometheusSink(
155154
recoveredState);
156155
}
157156

158-
public static PrometheusSinkBuilder builder() {
159-
return new PrometheusSinkBuilder();
157+
public static <InputT> PrometheusSinkBuilder<InputT> builder() {
158+
return new PrometheusSinkBuilder<>();
160159
}
161160

162161
@Override

flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.flink.connector.prometheus.sink;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
21-
import org.apache.flink.connector.base.sink.AsyncSinkBase;
2221
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
22+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
2323
import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
2424
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
2525

@@ -30,9 +30,8 @@
3030

3131
/** Builder for Sink implementation. */
3232
@PublicEvolving
33-
public class PrometheusSinkBuilder
34-
extends AsyncSinkBaseBuilder<
35-
PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> {
33+
public class PrometheusSinkBuilder<InputT>
34+
extends AsyncSinkBaseBuilder<InputT, Types.TimeSeries, PrometheusSinkBuilder<InputT>> {
3635
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class);
3736

3837
// Max batch size, in number of samples
@@ -52,14 +51,21 @@ public class PrometheusSinkBuilder
5251
private Integer socketTimeoutMs;
5352
private PrometheusRequestSigner requestSigner = null;
5453
private Integer maxBatchSizeInSamples;
55-
private Integer maxRecordSizeInSamples;
54+
private Long maxRecordSizeInSamples;
5655
private String httpUserAgent = null;
5756
private PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
5857
errorHandlingBehaviorConfig = null;
58+
private ElementConverter<InputT, Types.TimeSeries> elementConverter;
5959
private String metricGroupName = null;
6060

61+
public PrometheusSinkBuilder<InputT> setElementConverter(
62+
ElementConverter<InputT, Types.TimeSeries> elementConverter) {
63+
this.elementConverter = elementConverter;
64+
return this;
65+
}
66+
6167
@Override
62-
public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
68+
public PrometheusSink<InputT> build() {
6369

6470
int actualMaxBatchSizeInSamples =
6571
Optional.ofNullable(maxBatchSizeInSamples)
@@ -69,8 +75,9 @@ public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
6975
long actualMaxTimeInBufferMS =
7076
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
7177

72-
int actualMaxRecordSizeInSamples =
73-
Optional.ofNullable(maxRecordSizeInSamples).orElse(actualMaxBatchSizeInSamples);
78+
long actualMaxRecordSizeInSamples =
79+
Optional.ofNullable(maxRecordSizeInSamples)
80+
.orElse((long) actualMaxBatchSizeInSamples);
7481

7582
int actualSocketTimeoutMs =
7683
Optional.ofNullable(socketTimeoutMs)
@@ -116,8 +123,8 @@ public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
116123
actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
117124
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetryableError());
118125

119-
return new PrometheusSink(
120-
new PrometheusTimeSeriesConverter(),
126+
return new PrometheusSink<>(
127+
Optional.ofNullable(elementConverter).orElse(new PrometheusTimeSeriesConverter<>()),
121128
MAX_IN_FLIGHT_REQUESTS,
122129
actualMaxBufferedRequests,
123130
actualMaxBatchSizeInSamples,
@@ -132,50 +139,51 @@ public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
132139
actualMetricGroupName);
133140
}
134141

135-
public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) {
142+
public PrometheusSinkBuilder<InputT> setPrometheusRemoteWriteUrl(
143+
String prometheusRemoteWriteUrl) {
136144
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
137145
return this;
138146
}
139147

140-
public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) {
148+
public PrometheusSinkBuilder<InputT> setRequestSigner(PrometheusRequestSigner requestSigner) {
141149
this.requestSigner = requestSigner;
142150
return this;
143151
}
144152

145-
public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) {
153+
public PrometheusSinkBuilder<InputT> setMaxBatchSizeInSamples(int maxBatchSizeInSamples) {
146154
this.maxBatchSizeInSamples = maxBatchSizeInSamples;
147155
return this;
148156
}
149157

150-
public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) {
158+
public PrometheusSinkBuilder<InputT> setMaxRecordSizeInSamples(long maxRecordSizeInSamples) {
151159
this.maxRecordSizeInSamples = maxRecordSizeInSamples;
152160
return this;
153161
}
154162

155-
public PrometheusSinkBuilder setRetryConfiguration(
163+
public PrometheusSinkBuilder<InputT> setRetryConfiguration(
156164
PrometheusSinkConfiguration.RetryConfiguration retryConfiguration) {
157165
this.retryConfiguration = retryConfiguration;
158166
return this;
159167
}
160168

161-
public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
169+
public PrometheusSinkBuilder<InputT> setSocketTimeoutMs(int socketTimeoutMs) {
162170
this.socketTimeoutMs = socketTimeoutMs;
163171
return this;
164172
}
165173

166-
public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) {
174+
public PrometheusSinkBuilder<InputT> setHttpUserAgent(String httpUserAgent) {
167175
this.httpUserAgent = httpUserAgent;
168176
return this;
169177
}
170178

171-
public PrometheusSinkBuilder setErrorHandlingBehaviorConfiguration(
179+
public PrometheusSinkBuilder<InputT> setErrorHandlingBehaviorConfiguration(
172180
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
173181
errorHandlingBehaviorConfig) {
174182
this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
175183
return this;
176184
}
177185

178-
public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
186+
public PrometheusSinkBuilder<InputT> setMetricGroupName(String metricGroupName) {
179187
this.metricGroupName = metricGroupName;
180188
return this;
181189
}
@@ -184,20 +192,20 @@ public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
184192

185193
/** Not supported. Use setMaxBatchSizeInSamples(int) instead */
186194
@Override
187-
public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) {
195+
public PrometheusSinkBuilder<InputT> setMaxBatchSize(int maxBatchSize) {
188196
throw new UnsupportedOperationException("maxBatchSize is not supported by this sink");
189197
}
190198

191199
/** Not supported. Use setMaxBatchSizeInSamples(int) instead */
192200
@Override
193-
public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
201+
public PrometheusSinkBuilder<InputT> setMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
194202
throw new UnsupportedOperationException(
195203
"maxBatchSizeInBytes is not supported by this sink");
196204
}
197205

198206
/** Not supported. Use setMaxRecordSizeInSamples(int) instead */
199207
@Override
200-
public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
208+
public PrometheusSinkBuilder<InputT> setMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
201209
throw new UnsupportedOperationException(
202210
"maxRecordSizeInBytes is not supported by this sink");
203211
}

flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import java.util.function.Consumer;
4242

4343
/**
44-
* Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries.
44+
* Writer, taking care of batching the {@link InputT} and handling retries.
4545
*
4646
* <p>The batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase
4747
* the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable number.
@@ -59,7 +59,7 @@
5959
* maxBatchSizeInBytes.
6060
*/
6161
@Internal
62-
public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> {
62+
public class PrometheusSinkWriter<InputT> extends AsyncSinkWriter<InputT, Types.TimeSeries> {
6363
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class);
6464

6565
private final SinkMetricsCallback metricsCallback;
@@ -69,7 +69,7 @@ public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries,
6969
errorHandlingBehaviorConfig;
7070

7171
public PrometheusSinkWriter(
72-
ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter,
72+
ElementConverter<InputT, Types.TimeSeries> elementConverter,
7373
Sink.InitContext context,
7474
int maxInFlightRequests,
7575
int maxBufferedRequests,
@@ -101,7 +101,7 @@ public PrometheusSinkWriter(
101101
}
102102

103103
public PrometheusSinkWriter(
104-
ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter,
104+
ElementConverter<InputT, Types.TimeSeries> elementConverter,
105105
Sink.InitContext context,
106106
int maxInFlightRequests,
107107
int maxBufferedRequests,

0 commit comments

Comments
 (0)