Skip to content

[FLINK-33139] Add Prometheus Sink Table API #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

darenwkt
Copy link

@darenwkt darenwkt commented Apr 2, 2025

Purpose of the change

https://issues.apache.org/jira/browse/FLINK-33139 Implements Table API for Prometheus Sink

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

This change added tests and can be verified as follows:

  • Added unit tests for RowDataElementConverter and DynamicSinkTableFactory

  • Manually verified by running the Prometheus Sink TableAPI to submit data to Amazon Managed Prometheus
    Screenshot 2025-03-31 at 22 00 00

    • SQL Code ran for the test:

In the example below, a table is created where the metricName, labels are STRING and there are 2 labels (String/String) pairs.

Flink SQL> CREATE TABLE PromTable (
>   `my_metric_name` STRING,
>   `my_label_1` STRING,
>   `my_label_2` STRING,
>   `sample_value` BIGINT,
>   `sample_ts` TIMESTAMP(3)
> )
> WITH (
>   'connector' = 'prometheus',
>   'aws.region' = 'us-east-1',
>   'metric.request-signer' = 'amazon-managed-prometheus',
>   'metric.endpoint-url' = 'https://aps-workspaces.us-east-1.amazonaws.com/workspaces/redacted/api/v1/remote_write',
>   'metric.name' = 'my_metric_name',
>   'metric.label.keys' = '[my_label_1,my_label_2]',
>   'metric.sample.key' = 'sample_value',
>   'metric.sample.timestamp' = 'sample_ts'
> );
[INFO] Execute statement succeed.

Next, a row of sample data is inserted into the table where metricName="sample_name", and the label pairs are ("my_label_1":"sample_label_1", "my_label_2":"sample_label_2")

Flink SQL> INSERT INTO PromTable VALUES ('sample_name', 'sample_label_1', 'sample_label_2', 123, TIMESTAMP '2025-04-30 10:00:00.000');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 596c86fb5485ddd3c1a3594e3c837d02

DataStream API

PrometheusSink has been changed to support generic InputType, this is because in TableAPI, the inputType needs to be generic so that it can support RowData inputType.

However, the existing PrometheusSink is still supported with the default elementConverter remaining the same, which supports PrometheusTimeSeries -> Type.TimeSeries conversion. In addition, user can also define their own elementConverter to convert their InputType -> Type.TimeSeries.

  • Manually verified that existing Prometheus Sink DataStream API is still working
Screenshot 2025-04-02 at 09 38 23

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • [yes] Dependencies have been added or upgraded
  • [no] Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • [no] Serializers have been changed
  • [yes] New feature has been introduced
    • If yes, how is this documented? (JavaDocs)

Copy link

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some early review comments

Copy link

@foxus foxus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your efforts here, I've added a few comments.

@darenwkt darenwkt force-pushed the FLINK-33139 branch 2 times, most recently from cebd639 to 5644639 Compare April 23, 2025 06:29
@darenwkt darenwkt requested a review from foxus April 23, 2025 07:00
@darenwkt
Copy link
Author

Thanks @leekeiabstraction @foxus for the review, I have addressed the comments and updated the PR, please take a look

Copy link

@foxus foxus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, loving the work to migrate the AWS specific Prometheus behaviour out. I've added additional comments on the updates.

@darenwkt
Copy link
Author

Thanks for the updates, loving the work to migrate the AWS specific Prometheus behaviour out. I've added additional comments on the updates.

Thanks for the review, have updated PR to address them, please take another look

Copy link

@foxus foxus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thank you for the timely updates!

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR
I am not sure I fully understand the Table API interface and whether it matches the use case this connector is supposed to be used for. I left some specific comments about the API. Would you please clarify?

Also, I am concerned of breaking the public interface of DataStream API. Is that required?

@darenwkt darenwkt force-pushed the FLINK-33139 branch 3 times, most recently from 63061ee to 006471b Compare May 3, 2025 12:31
@darenwkt
Copy link
Author

darenwkt commented May 3, 2025

Thanks for the PR I am not sure I fully understand the Table API interface and whether it matches the use case this connector is supposed to be used for. I left some specific comments about the API. Would you please clarify?

Also, I am concerned of breaking the public interface of DataStream API. Is that required?

Thanks Lorenzo for the review, I have addressed comments on the API use case and example and I hope that it clarifies it better now.

I have also found a way to support TableAPI without breaking changes, appreciate if you can take another look at the updated PR, thank you

@georgelza
Copy link

hi guys...
is there a jar file available for this connector... looks like exactly what i've been looking for.
curious, looking at the Create table, don't see mention of username/password... credentials... just asking,
G

@georgelza
Copy link

excuse the noob question, so I did the git clone, and the build, the project folder has a couple of directories with target sub folders. which jar from which sub directory do i copy to my flink cluster/lib folder for the above to work ?

@darenwkt
Copy link
Author

Hi @georgelza.

hi guys... is there a jar file available for this connector... looks like exactly what i've been looking for. curious, looking at the Create table, don't see mention of username/password... credentials... just asking, G

You can use request signer to provide authentication/credentials, you can see the Flink doc for more info on this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/prometheus/#request-signer

excuse the noob question, so I did the git clone, and the build, the project folder has a couple of directories with target sub folders. which jar from which sub directory do i copy to my flink cluster/lib folder for the above to work ?

You should not need to build it, the jar is available in maven https://mvnrepository.com/artifact/org.apache.flink/flink-connector-prometheus/1.0.0-1.20

@georgelza
Copy link

Hi @georgelza.

hi guys... is there a jar file available for this connector... looks like exactly what i've been looking for. curious, looking at the Create table, don't see mention of username/password... credentials... just asking, G

You can use request signer to provide authentication/credentials, you can see the Flink doc for more info on this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/prometheus/#request-signer

excuse the noob question, so I did the git clone, and the build, the project folder has a couple of directories with target sub folders. which jar from which sub directory do i copy to my flink cluster/lib folder for the above to work ?

You should not need to build it, the jar is available in maven https://mvnrepository.com/artifact/org.apache.flink/flink-connector-prometheus/1.0.0-1.20

thanks, will pull the jar from maven using the above link, just want to double check does the jar thats available include the create table ... (connector=prometheus...) functionality... as per above. I'm asking as I build as per the instructions and even though the jar file was in the correct place it seems flink was not able to find the prometheus connector functionality.
would appreciate if you can ping me privately to discuss further. georgelza@gmail.com
G

@georgelza
Copy link

@darenwkt No luck. does it make sense to to this rather off here...

`Flink SQL> CREATE TABLE hive_catalog.prometheus.PromTable (

sensorid STRING,
siteId STRING,
deviceId STRING,
measurement BIGINT,
ts TIMESTAMP(3)
)
WITH (
'connector' = 'prometheus',
'metric.endpoint-url' = 'prometheus:9090/api/v1/write',
'metric.name' = 'sensorid',
'metric.label.keys' = '[siteId,deviceId]',
'metric.sample.key' = 'measurement',
'metric.sample.timestamp' = 'ts'
);
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO hive_catalog.prometheus.PromTable VALUES ('10221', '106', '1031', 60, TIMESTAMP '2025-05-15 08:21:00.000');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'prometheus' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
paimon
print
python-input-format
upsert-kafka`

@nicusX
Copy link
Contributor

nicusX commented May 17, 2025

@georgelza the released version DOES NOT support SQL. The support for SQL is still under development
Please do not use the PR discussion as a chat

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice refactoring.
I have two suggestions, but nothing blocking:

  1. It would be good validating whether table column and configuration are consistent at sink initialization rather than in the converter. Making the job failing on initialization when there is a misconfiguration is better than making it fail (and restart) on processing.
  2. It would be nice to add an example of SQL, in form of a fake test similar to DataStreamExample. It would be SQL embedded in Java but still meaningful for an end user

Also, I am unsure how the initialization of the request-signer works in SQL. Maybe another example class for that would be helpful

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class AmazonManagedPrometheusWriteRequestSignerFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested to see how this is used to set up the request signer declaratively in SQL

Copy link
Author

@darenwkt darenwkt Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request signer is now a plugin (through Java Service Provider Interface (SPI)), and can be specified by user in the table config (metric.request-signer), for example:

CREATE TABLE PromTable (
  `my_metric_name` BIGINT,
  `my_label_1` BIGINT,
  `my_label_2` BIGINT,
  `sample_value` BIGINT,
  `sample_ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'prometheus',
  'aws.region' = 'us-east-1',
  'metric.request-signer' = 'amazon-managed-prometheus',
  'metric.endpoint-url' = 'https://aps-workspaces.us-east-1.amazonaws.com/workspaces/abc',
  'metric.name' = 'my_metric_name',
  'metric.label.keys' = '[my_label_1,my_label_2]',
  'metric.sample.key' = 'sample_value',
  'metric.sample.timestamp' = 'sample_ts',
  'sink.batch.max-size' = '2',
  'sink.flush-buffer.size' = '1'
);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please explain "
'sink.batch.max-size' = '2',
'sink.flush-buffer.size' = '1'
have idea what the intend is, just want to confirm...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @georgelza,

'sink.batch.max-size' determines the max size of batch records before flushing it to sink. I would recommend checking the asyncSink FLIP for more details on how it works https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink

Another helpful resource is to check other similar connector and how these config are used https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kinesis/#sink-batch-max-size

When this PR is merged and released, we should have equivalent docs on what each of these config mean as well

builder.withMetricName(fieldValue.getStringValue());
} else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) {
sampleValue = fieldValue.getDoubleValue();
} else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Just thinking out loud: the Label name is the field name and the value is the content.
No issue as label names must follow [a-zA-Z_]([a-zA-Z0-9_])* which is a subset of possible column names.
This limits the usage a bit compared to DataStream because all metrics represented by a table must have the same set of Label names.
But I do not think anything different is possible with a table abstraction

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may require further clarification on the question here, but my understanding is that this approach allows setting any label name actually, for example, the following label names are all valid:

  'metric.label.keys' = '[my_label_1, my_label_2, LABEL@123, 123%label, label(123)]'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Labels in Prometheus are {key, value}
you are adding the fieldName as key, and the content of the field as value

builder.addLabel(fieldName, fieldValue.getStringValue())

Which is fine. But it means all records will have the same set of Label keys, because in a table abstraction all records have the same fields by definition. Label values will be different, being the content of the fields.
As I mentioned, it's limited compared to DataStream but acceptable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"LABEL@123", "123%label" and "label(123)" would be illegal Label's key (names) for Prometheus BTW (see specs). They would be rejected by Prometheus. But the behaviour is not be inconsistent with the DataStream version where illegal label names (which are data in that case, not configurations) are rejected at runtime when you try writing. I am not suggesting a change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, yes I agree, the behaviour does differ from datastream API in that the label key/name is fixed per table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Lorenzo, I have also added TableAPIExample as well, feel free to take a look. It seems like TableAPI groupBy is not the same as keyBy operation, so there might be some limitation around parallelism > 1

} else if (fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) {
sampleTimestamp = fieldValue.getLongValue();
} else {
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be really nice to move this validation to the builder or the sink constructor. This would cause the job to fail when created, instead of failing when the first record is processed (putting the job in a fail-and-restart loop).
A more user friendly behaviour considering this is a misconfiguration rather than a runtime problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a good suggestion, and I can confirm that the validation are already in place, please take a look at:

  • PrometheusDynamicSinkFactory.validateMetricConfigKeys()
  • PrometheusDynamicSinkFactory.requiredOptions()

They are both called in sink constructor which aligns with your suggestion

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved because my notes in my last review are not blockers

@georgelza
Copy link

georgelza commented Jun 2, 2025 via email

@georgelza
Copy link

georgelza commented Jun 2, 2025 via email

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @darenwkt for the PR!
I'm not a big fan of the way the PR is currently implemented, but I can see the reasons motivating it (non-generic interfaces). I've mainly put in larger comments, but will progress on essential "formatting" comments (e.g. @PublicEvolving etc.) after we agree on the below:

Main concerns:
1/ There is currently tight coupling between PrometheusSinkBase and PrometheusSink, which is a code smell, of wrongly directed dependency! Given this is a relatively new connector, shall we consider instead taking the hit of making a breaking public interface change, and bumping the major version, to include only 1 PrometheusSink with the generic type?

2/ The current type handling for RowData -> String on metric labels is a little bit too open, in my opinion. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types There is a fixed set of Flink SQL data types. We should consider fixing the list of available types for fields (e.g. CHAR, VARCHAR, etc) and rejecting types that are funky like (RAW, MULTISET etc.)

@darenwkt darenwkt force-pushed the FLINK-33139 branch 2 times, most recently from 6deca3f to 9de55ca Compare June 4, 2025 07:44
@darenwkt
Copy link
Author

darenwkt commented Jun 4, 2025

Thanks @hlteoh37 for the review! I have addressed the comments and appreciate if you could help take another look, thank you

1/ There is currently tight coupling between PrometheusSinkBase and PrometheusSink, which is a code smell, of wrongly directed dependency! Given this is a relatively new connector, shall we consider instead taking the hit of making a breaking public interface change, and bumping the major version, to include only 1 PrometheusSink with the generic type?

Yes, +1 to this, I have updated this in the latest revision

2/ The current type handling for RowData -> String on metric labels is a little bit too open, in my opinion. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types There is a fixed set of Flink SQL data types. We should consider fixing the list of available types for fields (e.g. CHAR, VARCHAR, etc) and rejecting types that are funky like (RAW, MULTISET etc.)

This is a good callout, we should only support (CHAR, VARCHAR, STRING) for metric name and labels, I have updated this in the latest revision

@darenwkt darenwkt requested a review from hlteoh37 June 4, 2025 07:49
@georgelza
Copy link

georgelza commented Jun 4, 2025 via email

@darenwkt
Copy link
Author

darenwkt commented Jun 8, 2025

Hi @georgelza ,

Is sink.batch.max-size and sink.flush-buffer.size part of the current build.

Can I clarify on your question please? Do you mean if the current TableAPI connector implementation allows user to configure their own sink.batch.max-size and sink.flush-buffer.size?

I noticed metric.name needs to map to a column in the table, I will argue
that this is not desirable... you might have multiple tables being that
will push to the prometheus store. based on the table being used the user
will want to define a text string as the name of the metrics contained...

In the Datastream API connector, we allow PrometheusSink to submit to different MetricName according to what is set in TimeSeries. Hence, the TableAPI connector is consistent in this regard as well. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/prometheus/#populating-a-prometheustimeseries

@georgelza
Copy link

@darenwkt
thanks for the responses.

w.r.t. sink.batch.max-size and sink.flush-buffer.size? using it ran into the inherent rules of the one has to be bigger than the other, but when I followed that had some other errors... so eventually for the blog i simply removed both,

got it working, and it works nicely :) took some figuring out how the connector works, how to use Prometheus remote write and then also the time of the prometheus server and time of my metrics can't be to far in past (historic data) or to Marin future, time offset between prometheus server and data source TZ.

wrote a blog, using it, thats scheduled to come out tomorrow actually.
Will be watching this one, think it will have some good legs on it.
thanks for the work guys, appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants