-
Notifications
You must be signed in to change notification settings - Fork 4
[FLINK-33139] Add Prometheus Sink Table API #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some early review comments
...ctor-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
Outdated
Show resolved
Hide resolved
...ometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for your efforts here, I've added a few comments.
...r-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConfig.java
Outdated
Show resolved
Hide resolved
...eus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java
Outdated
Show resolved
Hide resolved
...metheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSink.java
Outdated
Show resolved
Hide resolved
...metheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSink.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSinkFactory.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java
Outdated
Show resolved
Hide resolved
cebd639
to
5644639
Compare
Thanks @leekeiabstraction @foxus for the review, I have addressed the comments and updated the PR, please take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, loving the work to migrate the AWS specific Prometheus behaviour out. I've added additional comments on the updates.
...he/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java
Outdated
Show resolved
Hide resolved
...link/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactoryTest.java
Show resolved
Hide resolved
Thanks for the review, have updated PR to address them, please take another look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thank you for the timely updates!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR
I am not sure I fully understand the Table API interface and whether it matches the use case this connector is supposed to be used for. I left some specific comments about the API. Would you please clarify?
Also, I am concerned of breaking the public interface of DataStream API. Is that required?
...eus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java
Show resolved
Hide resolved
...ctor-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
Show resolved
Hide resolved
...ctor-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java
Outdated
Show resolved
Hide resolved
63061ee
to
006471b
Compare
Thanks Lorenzo for the review, I have addressed comments on the API use case and example and I hope that it clarifies it better now. I have also found a way to support TableAPI without breaking changes, appreciate if you can take another look at the updated PR, thank you |
hi guys... |
excuse the noob question, so I did the git clone, and the build, the project folder has a couple of directories with target sub folders. which jar from which sub directory do i copy to my flink cluster/lib folder for the above to work ? |
Hi @georgelza.
You can use request signer to provide authentication/credentials, you can see the Flink doc for more info on this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/prometheus/#request-signer
You should not need to build it, the jar is available in maven https://mvnrepository.com/artifact/org.apache.flink/flink-connector-prometheus/1.0.0-1.20 |
thanks, will pull the jar from maven using the above link, just want to double check does the jar thats available include the create table ... (connector=prometheus...) functionality... as per above. I'm asking as I build as per the instructions and even though the jar file was in the correct place it seems flink was not able to find the prometheus connector functionality. |
@darenwkt No luck. does it make sense to to this rather off here... `Flink SQL> CREATE TABLE hive_catalog.prometheus.PromTable (
Flink SQL> INSERT INTO hive_catalog.prometheus.PromTable VALUES ('10221', '106', '1031', 60, TIMESTAMP '2025-05-15 08:21:00.000'); Available factory identifiers are: blackhole |
@georgelza the released version DOES NOT support SQL. The support for SQL is still under development |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice refactoring.
I have two suggestions, but nothing blocking:
- It would be good validating whether table column and configuration are consistent at sink initialization rather than in the converter. Making the job failing on initialization when there is a misconfiguration is better than making it fail (and restart) on processing.
- It would be nice to add an example of SQL, in form of a fake test similar to DataStreamExample. It would be SQL embedded in Java but still meaningful for an end user
Also, I am unsure how the initialization of the request-signer works in SQL. Maybe another example class for that would be helpful
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
public class AmazonManagedPrometheusWriteRequestSignerFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be interested to see how this is used to set up the request signer declaratively in SQL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request signer is now a plugin (through Java Service Provider Interface (SPI)), and can be specified by user in the table config (metric.request-signer
), for example:
CREATE TABLE PromTable (
`my_metric_name` BIGINT,
`my_label_1` BIGINT,
`my_label_2` BIGINT,
`sample_value` BIGINT,
`sample_ts` TIMESTAMP(3)
)
WITH (
'connector' = 'prometheus',
'aws.region' = 'us-east-1',
'metric.request-signer' = 'amazon-managed-prometheus',
'metric.endpoint-url' = 'https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc',
'metric.name' = 'my_metric_name',
'metric.label.keys' = '[my_label_1,my_label_2]',
'metric.sample.key' = 'sample_value',
'metric.sample.timestamp' = 'sample_ts',
'sink.batch.max-size' = '2',
'sink.flush-buffer.size' = '1'
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please explain "
'sink.batch.max-size' = '2',
'sink.flush-buffer.size' = '1'
have idea what the intend is, just want to confirm...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @georgelza,
'sink.batch.max-size' determines the max size of batch records before flushing it to sink. I would recommend checking the asyncSink FLIP for more details on how it works https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
Another helpful resource is to check other similar connector and how these config are used https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kinesis/#sink-batch-max-size
When this PR is merged and released, we should have equivalent docs on what each of these config mean as well
builder.withMetricName(fieldValue.getStringValue()); | ||
} else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) { | ||
sampleValue = fieldValue.getDoubleValue(); | ||
} else if (prometheusConfig.getLabelKeys().contains(fieldName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Just thinking out loud: the Label name is the field name and the value is the content.
No issue as label names must follow [a-zA-Z_]([a-zA-Z0-9_])*
which is a subset of possible column names.
This limits the usage a bit compared to DataStream because all metrics represented by a table must have the same set of Label names.
But I do not think anything different is possible with a table abstraction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may require further clarification on the question here, but my understanding is that this approach allows setting any label name actually, for example, the following label names are all valid:
'metric.label.keys' = '[my_label_1, my_label_2, LABEL@123, 123%label, label(123)]'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Labels in Prometheus are {key, value}
you are adding the fieldName as key, and the content of the field as value
builder.addLabel(fieldName, fieldValue.getStringValue())
Which is fine. But it means all records will have the same set of Label keys, because in a table abstraction all records have the same fields by definition. Label values will be different, being the content of the fields.
As I mentioned, it's limited compared to DataStream but acceptable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LABEL@123", "123%label" and "label(123)" would be illegal Label's key (names) for Prometheus BTW (see specs). They would be rejected by Prometheus. But the behaviour is not be inconsistent with the DataStream version where illegal label names (which are data in that case, not configurations) are rejected at runtime when you try writing. I am not suggesting a change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification, yes I agree, the behaviour does differ from datastream API in that the label key/name is fixed per table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Lorenzo, I have also added TableAPIExample
as well, feel free to take a look. It seems like TableAPI groupBy
is not the same as keyBy
operation, so there might be some limitation around parallelism > 1
} else if (fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) { | ||
sampleTimestamp = fieldValue.getLongValue(); | ||
} else { | ||
throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be really nice to move this validation to the builder or the sink constructor. This would cause the job to fail when created, instead of failing when the first record is processed (putting the job in a fail-and-restart loop).
A more user friendly behaviour considering this is a misconfiguration rather than a runtime problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a good suggestion, and I can confirm that the validation are already in place, please take a look at:
PrometheusDynamicSinkFactory.validateMetricConfigKeys()
PrometheusDynamicSinkFactory.requiredOptions()
They are both called in sink constructor which aligns with your suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved because my notes in my last review are not blockers
Hi Daren
I actually think the way you defining the labels are very elegant.
G
…On Mon, Jun 2, 2025 at 3:27 PM darenwkt ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java
<#22 (comment)>
:
> + PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder();
+ Double sampleValue = null;
+ Long sampleTimestamp = null;
+
+ for (int i = 0; i < fields.size(); i++) {
+ DataTypes.Field field = fields.get(i);
+ RowData.FieldGetter fieldGetter =
+ createFieldGetter(fields.get(i).getDataType().getLogicalType(), i);
+ FieldValue fieldValue = new FieldValue(fieldGetter.getFieldOrNull(row));
+ String fieldName = field.getName();
+
+ if (fieldName.equals(prometheusConfig.getMetricName())) {
+ builder.withMetricName(fieldValue.getStringValue());
+ } else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) {
+ sampleValue = fieldValue.getDoubleValue();
+ } else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
I may require further clarification on the question here, but my
understanding is that this approach allows setting any label name actually,
for example, the following label names are all valid:
'metric.label.keys' = '[my_label_1, my_label_2, ***@***.***, 123%label, label(123)]'
—
Reply to this email directly, view it on GitHub
<#22 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AGZLFP7W3XYQO2NRRNNXMSL3BRGM3AVCNFSM6AAAAAB2JAJHWCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDQOBYGQYTKNJUGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!
|
def keen to get access to the jar that implements this sink, have a blog
just about ready to implement a IoT ->kafka->flink-prometheus-Grafana
flow...
G
…On Mon, Jun 2, 2025 at 7:40 PM darenwkt ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java
<#22 (comment)>
:
> + DataTypes.Field field = fields.get(i);
+ RowData.FieldGetter fieldGetter =
+ createFieldGetter(fields.get(i).getDataType().getLogicalType(), i);
+ FieldValue fieldValue = new FieldValue(fieldGetter.getFieldOrNull(row));
+ String fieldName = field.getName();
+
+ if (fieldName.equals(prometheusConfig.getMetricName())) {
+ builder.withMetricName(fieldValue.getStringValue());
+ } else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) {
+ sampleValue = fieldValue.getDoubleValue();
+ } else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
+ builder.addLabel(fieldName, fieldValue.getStringValue());
+ } else if (fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) {
+ sampleTimestamp = fieldValue.getLongValue();
+ } else {
+ throw new IllegalArgumentException(
Yes this is a good suggestion, and I can confirm that the validation are
already in place, please take a look at:
- PrometheusDynamicSinkFactory.validateMetricConfigKeys()
- PrometheusDynamicSinkFactory.requiredOptions()
They are both called in sink constructor which aligns with your suggestion
—
Reply to this email directly, view it on GitHub
<#22 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AGZLFP4QHMAFJJKZDWJBOE33BSEB5AVCNFSM6AAAAAB2JAJHWCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDQOBZGM4DKNJZGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @darenwkt for the PR!
I'm not a big fan of the way the PR is currently implemented, but I can see the reasons motivating it (non-generic interfaces). I've mainly put in larger comments, but will progress on essential "formatting" comments (e.g. @PublicEvolving etc.) after we agree on the below:
Main concerns:
1/ There is currently tight coupling between PrometheusSinkBase
and PrometheusSink
, which is a code smell, of wrongly directed dependency! Given this is a relatively new connector, shall we consider instead taking the hit of making a breaking public interface change, and bumping the major version, to include only 1 PrometheusSink
with the generic type?
2/ The current type handling for RowData -> String on metric labels is a little bit too open, in my opinion. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types There is a fixed set of Flink SQL data types. We should consider fixing the list of available types for fields (e.g. CHAR, VARCHAR, etc) and rejecting types that are funky like (RAW, MULTISET etc.)
...he/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java
Outdated
Show resolved
Hide resolved
...rg/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
Outdated
Show resolved
Hide resolved
...-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java
Outdated
Show resolved
Hide resolved
...metheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSink.java
Show resolved
Hide resolved
...ctor-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
Outdated
Show resolved
Hide resolved
...-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java
Outdated
Show resolved
Hide resolved
6deca3f
to
9de55ca
Compare
Thanks @hlteoh37 for the review! I have addressed the comments and appreciate if you could help take another look, thank you
Yes, +1 to this, I have updated this in the latest revision
This is a good callout, we should only support (CHAR, VARCHAR, STRING) for metric name and labels, I have updated this in the latest revision |
Hi Darren
Is sink.batch.max-size and sink.flush-buffer.size part of the current build.
I noticed metric.name needs to map to a column in the table, I will argue
that this is not desirable... you might have multiple tables being that
will push to the prometheus store. based on the table being used the user
will want to define a text string as the name of the metrics contained...
G
…On Wed, Jun 4, 2025 at 9:57 AM darenwkt ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java
<#22 (comment)>
:
> @@ -0,0 +1,36 @@
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.table.PrometheusConfig;
+import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class AmazonManagedPrometheusWriteRequestSignerFactory
Hi @georgelza <https://github.com/georgelza>,
'sink.batch.max-size' determines the max size of batch records before
flushing it to sink. I would recommend checking the asyncSink FLIP for more
details on how it works
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
Another helpful resource is to check other similar connector and how these
config are used
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kinesis/#sink-batch-max-size
When this PR is merged and released, we should have equivalent docs on
what each of these config mean as well
—
Reply to this email directly, view it on GitHub
<#22 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AGZLFPZT6VNQTEBKLASEMFD3B2RFFAVCNFSM6AAAAAB2JAJHWCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDQOJVGY4TINRQHE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!
|
Hi @georgelza ,
Can I clarify on your question please? Do you mean if the current TableAPI connector implementation allows user to configure their own
In the Datastream API connector, we allow PrometheusSink to submit to different MetricName according to what is set in TimeSeries. Hence, the TableAPI connector is consistent in this regard as well. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/prometheus/#populating-a-prometheustimeseries |
@darenwkt w.r.t. sink.batch.max-size and sink.flush-buffer.size? using it ran into the inherent rules of the one has to be bigger than the other, but when I followed that had some other errors... so eventually for the blog i simply removed both, got it working, and it works nicely :) took some figuring out how the connector works, how to use Prometheus remote write and then also the time of the prometheus server and time of my metrics can't be to far in past (historic data) or to Marin future, time offset between prometheus server and data source TZ. wrote a blog, using it, thats scheduled to come out tomorrow actually. |
Purpose of the change
https://issues.apache.org/jira/browse/FLINK-33139 Implements Table API for Prometheus Sink
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
This change added tests and can be verified as follows:
Added unit tests for RowDataElementConverter and DynamicSinkTableFactory
Manually verified by running the Prometheus Sink TableAPI to submit data to Amazon Managed Prometheus

In the example below, a table is created where the metricName, labels are STRING and there are 2 labels (String/String) pairs.
Next, a row of sample data is inserted into the table where metricName="sample_name", and the label pairs are ("my_label_1":"sample_label_1", "my_label_2":"sample_label_2")
DataStream API
PrometheusSink has been changed to support generic InputType, this is because in TableAPI, the inputType needs to be generic so that it can support RowData inputType.
However, the existing PrometheusSink is still supported with the default elementConverter remaining the same, which supports PrometheusTimeSeries -> Type.TimeSeries conversion. In addition, user can also define their own elementConverter to convert their InputType -> Type.TimeSeries.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving)
)