Skip to content

Flink JSON data generator #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f
- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\
- [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics


#### Utilities
- [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing.

### Python Examples

Expand Down
144 changes: 144 additions & 0 deletions java/FlinkDataGenerator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Flink JSON Data Generator to Kinesis or Kafka

This example demonstrates how you can use Apache Flink's as a data generator for load testing.

* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
* Flink connectors: DataGen, Kafka Sink, Kinesis Sink

The application generates random stock prices at fixed rate.
Depending on runtime configuration it will send generated records, as JSON, either to a Kinesis Data Stream
or an MSK/Kafka topic (or both).

The application can easily scale to generate high throughput. For example, with 3 KPU you can generate more than 64,000 records per second.
See [Using the data generator for load testing](#using-the-data-generator-for-load-testing).

It can be easily modified to generate different type of records, changing the implementation of the record class
[StockPrice](src/main/java/com/amazonaws/services/msf/domain/StockPrice.java), and the function generating data [StockPriceGeneratorFunction](src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java)

### Prerequisites

The data generator application must be able to write to the Kinesis Stream or the Kafka topic
* Kafka/MSK
* The Managed Flink application must have VPC networking.
* Routing and Security must allow the application to reach the Kafka cluster.
* Any Kafka/MSK authentication must be added to the application (this application writes unauthenticated)
* Kafka ACL or IAM must allow the application writing to the topic
* Kinesis Data Stream
* The Managed Flink application IAM Role must have permissions to write to the stream
* Ensure the Kinesis Stream has sufficient capacity for the generated throughput
* If the application has VPC networking, you must also create a VPC Endpoint for Kinesis to be able to write to the Stream



### Runtime configuration

The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
or, when running locally, from the [`src/main/resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file.
All parameters are case-sensitive.

The presence of the configuration group `KafkaSink` enables the Kafka sink.
Likewise, `KinesisSink` enables the Kinesis sink.


| Group ID | Key | Description |
|---------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `DataGen` | `records.per.second` | Number of records per second generated across all subtasks. |
| `KinesisSink` | `stream.arn` | ARN of the Kinesis Stream |
| `KinesisSink` | `aws.region` | Region of the Kinesis Stream |
| `KinesisSink` | (any other parameter) | Any other parameters in this group is passed to the Kinesis sink connector as KinesisClientProperties. See [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) |
| `KafkaSink` | `bootstrap.servers` | Kafka bootstrap servers. |
| `KafkaSink` | `topic` | Name of the Kafka topic. |
| `KafkaSink` | (any other parameter) | Any other parameters in this group is passed to the Kafka sink connector as KafkaProducerConfig. See [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#kafka-sink) |


> Renaming `KafkaSink` or `KinesisSink` groups to something different, for example `KinesisSink-DISABLE` prevents
> the generator creating that particular sink.

### Running in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](../running-examples-locally.md) for details.

---

## Data Generation

This example generates random stock price records similar to the following:

```json
{
"event_time": "2024-01-15T10:30:45.123",
"ticker": "AAPL",
"price": 150.25
}
```

The data generation can be easily customized to match your specific records, modifying two components:

* The class [StockPrice](src/main/java/com/amazonaws/services/msf/domain/StockPrice.java) representing the record.
You can use [Jackson annotations](https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations) to customize the generated JSON.
* The class [StockPriceGeneratorFunction](src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java)
contains the logic for generating each record.

### Partitioning

Records published in Kinesis or Kafka are partitioned by the `ticker` field.

If you customize the data object you also need to modify the `PartitionKeyGenerator<T>` and `SerializationSchema<T>`
extracting the key in the Kinesis and Kafka sink respectively.

## Using the data generator for load testing

This application can be used to load test other applications.

Make sure the data generator application has sufficient resources to generate the desired throughput.

Also, make sure the Kafka/MSK cluster or the Kinesis Stream have sufficient capacity to ingest the generated throughput.

⚠️ If the destination system or the data generator Flink application are underprovisioned, you may generate a throughput lower than expected.

For reference, the following configuration allows generating ~64,000 records/sec to either Kinesis or Kafka:
* `Parallelism = 3`, `Parallelism-per-KPU = 1` (`3 KPU`)
* `DataGen` `records.per.second` = `64000`

> We recommend to overprovision the data generator to ensure the required throughput can be achieved.
> Use the provided [CloudWatch dashboard](#cloudwatch-dashboard) to monitor the generator.

### Monitoring the data generator

The application exposes 3 custom metrics to CloudWatch:
* `generatedRecordCount`: count of generated record, per parallelism
* `generatedRecordRatePerParallelism`: generated records per second, per parallelism
* `taskParallelism`: parallelism of the data generator

> ⚠️ Flink custom metrics are not global. Each subtask maintains its own metrics.
> Also, for each metric Amazon Managed Service for Apache Flink exports to CloudWatch 4 datapoints per minute, per subtask.
> That considered, to calculate the total generated record and rate, across the entire application, you need to apply
> the following maths:
> - Total generatedRecordCount = `SUM(generatedRecordCount) / 4`, over 1 minute
> - Total generatedRecordsPerSec = `AVG(generatedRecordRatePerParallelism) * AVG(taskParallelism)`, over 1 minute

#### CloudWatch Dashboard

The CloudFormation template [dashboard-cfn.yaml](tools/dashboard-cfn.yaml) provided can be used to create a CloudWatch Dashboard
to monitor the data generator

![Flink Data Generator dashboard](images/dashboard.png).

When creating the CloudFormation stack you need to provide:
* The name of the Managed Flink application
* The Region
* The name of the Kinesis Stream, if publishing to Kinesis
* The name of the MSK cluster and topic, if publishing to MSK

> Note: the dashboard assumes an MSK cluster with up to 6 brokers.
> If you have a cluster with more than 6 brokers you need to adjust the *Kafka output* widget

### Known limitations and possible extensions

* Only JSON serialization is supported.
* Data generation is stateless. The logic generating each record does not know about other records previously generated.
* Fixed record rate only. No ramp up or ramp down.
Binary file added java/FlinkDataGenerator/images/dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
185 changes: 185 additions & 0 deletions java/FlinkDataGenerator/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>flink-data-generator</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<aws.connector.version>5.0.0-1.20</aws.connector.version>
<kafka.connector.version>3.3.0-1.20</kafka.connector.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- DataGen connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Kinesis connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${aws.connector.version}</version>
</dependency>

<!-- Kafka connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${kafka.connector.version}</version>
</dependency>



<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<directory>${buildDirectory}</directory>
<finalName>${jar.finalName}</finalName>

<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.DataGeneratorJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading
Loading