diff --git a/README.md b/README.md index b9dc92d..64807ea 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f - [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\ - [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics - +#### Utilities +- [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing. ### Python Examples diff --git a/java/FlinkDataGenerator/README.md b/java/FlinkDataGenerator/README.md new file mode 100644 index 0000000..84376f9 --- /dev/null +++ b/java/FlinkDataGenerator/README.md @@ -0,0 +1,144 @@ +# Flink JSON Data Generator to Kinesis or Kafka + +This example demonstrates how you can use Apache Flink's as a data generator for load testing. + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) +* Flink connectors: DataGen, Kafka Sink, Kinesis Sink + +The application generates random stock prices at fixed rate. +Depending on runtime configuration it will send generated records, as JSON, either to a Kinesis Data Stream +or an MSK/Kafka topic (or both). + +The application can easily scale to generate high throughput. For example, with 3 KPU you can generate more than 64,000 records per second. +See [Using the data generator for load testing](#using-the-data-generator-for-load-testing). + +It can be easily modified to generate different type of records, changing the implementation of the record class +[StockPrice](src/main/java/com/amazonaws/services/msf/domain/StockPrice.java), and the function generating data [StockPriceGeneratorFunction](src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java) + +### Prerequisites + +The data generator application must be able to write to the Kinesis Stream or the Kafka topic +* Kafka/MSK + * The Managed Flink application must have VPC networking. + * Routing and Security must allow the application to reach the Kafka cluster. + * Any Kafka/MSK authentication must be added to the application (this application writes unauthenticated) + * Kafka ACL or IAM must allow the application writing to the topic +* Kinesis Data Stream + * The Managed Flink application IAM Role must have permissions to write to the stream + * Ensure the Kinesis Stream has sufficient capacity for the generated throughput + * If the application has VPC networking, you must also create a VPC Endpoint for Kinesis to be able to write to the Stream + + + +### Runtime configuration + +The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink, +or, when running locally, from the [`src/main/resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file. +All parameters are case-sensitive. + +The presence of the configuration group `KafkaSink` enables the Kafka sink. +Likewise, `KinesisSink` enables the Kinesis sink. + + +| Group ID | Key | Description | +|---------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of records per second generated across all subtasks. | +| `KinesisSink` | `stream.arn` | ARN of the Kinesis Stream | +| `KinesisSink` | `aws.region` | Region of the Kinesis Stream | +| `KinesisSink` | (any other parameter) | Any other parameters in this group is passed to the Kinesis sink connector as KinesisClientProperties. See [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) | +| `KafkaSink` | `bootstrap.servers` | Kafka bootstrap servers. | +| `KafkaSink` | `topic` | Name of the Kafka topic. | +| `KafkaSink` | (any other parameter) | Any other parameters in this group is passed to the Kafka sink connector as KafkaProducerConfig. See [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#kafka-sink) | + + +> Renaming `KafkaSink` or `KinesisSink` groups to something different, for example `KinesisSink-DISABLE` prevents +> the generator creating that particular sink. + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](../running-examples-locally.md) for details. + +--- + +## Data Generation + +This example generates random stock price records similar to the following: + +```json +{ + "event_time": "2024-01-15T10:30:45.123", + "ticker": "AAPL", + "price": 150.25 +} +``` + +The data generation can be easily customized to match your specific records, modifying two components: + +* The class [StockPrice](src/main/java/com/amazonaws/services/msf/domain/StockPrice.java) representing the record. + You can use [Jackson annotations](https://github.com/FasterXML/jackson-annotations/wiki/Jackson-Annotations) to customize the generated JSON. +* The class [StockPriceGeneratorFunction](src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java) + contains the logic for generating each record. + +### Partitioning + +Records published in Kinesis or Kafka are partitioned by the `ticker` field. + +If you customize the data object you also need to modify the `PartitionKeyGenerator` and `SerializationSchema` +extracting the key in the Kinesis and Kafka sink respectively. + +## Using the data generator for load testing + +This application can be used to load test other applications. + +Make sure the data generator application has sufficient resources to generate the desired throughput. + +Also, make sure the Kafka/MSK cluster or the Kinesis Stream have sufficient capacity to ingest the generated throughput. + +⚠️ If the destination system or the data generator Flink application are underprovisioned, you may generate a throughput lower than expected. + +For reference, the following configuration allows generating ~64,000 records/sec to either Kinesis or Kafka: +* `Parallelism = 3`, `Parallelism-per-KPU = 1` (`3 KPU`) +* `DataGen` `records.per.second` = `64000` + +> We recommend to overprovision the data generator to ensure the required throughput can be achieved. +> Use the provided [CloudWatch dashboard](#cloudwatch-dashboard) to monitor the generator. + +### Monitoring the data generator + +The application exposes 3 custom metrics to CloudWatch: +* `generatedRecordCount`: count of generated record, per parallelism +* `generatedRecordRatePerParallelism`: generated records per second, per parallelism +* `taskParallelism`: parallelism of the data generator + +> ⚠️ Flink custom metrics are not global. Each subtask maintains its own metrics. +> Also, for each metric Amazon Managed Service for Apache Flink exports to CloudWatch 4 datapoints per minute, per subtask. +> That considered, to calculate the total generated record and rate, across the entire application, you need to apply +> the following maths: +> - Total generatedRecordCount = `SUM(generatedRecordCount) / 4`, over 1 minute +> - Total generatedRecordsPerSec = `AVG(generatedRecordRatePerParallelism) * AVG(taskParallelism)`, over 1 minute + +#### CloudWatch Dashboard + +The CloudFormation template [dashboard-cfn.yaml](tools/dashboard-cfn.yaml) provided can be used to create a CloudWatch Dashboard +to monitor the data generator + +![Flink Data Generator dashboard](images/dashboard.png). + +When creating the CloudFormation stack you need to provide: +* The name of the Managed Flink application +* The Region +* The name of the Kinesis Stream, if publishing to Kinesis +* The name of the MSK cluster and topic, if publishing to MSK + +> Note: the dashboard assumes an MSK cluster with up to 6 brokers. +> If you have a cluster with more than 6 brokers you need to adjust the *Kafka output* widget + +### Known limitations and possible extensions + +* Only JSON serialization is supported. +* Data generation is stateless. The logic generating each record does not know about other records previously generated. +* Fixed record rate only. No ramp up or ramp down. diff --git a/java/FlinkDataGenerator/images/dashboard.png b/java/FlinkDataGenerator/images/dashboard.png new file mode 100644 index 0000000..248e36c Binary files /dev/null and b/java/FlinkDataGenerator/images/dashboard.png differ diff --git a/java/FlinkDataGenerator/pom.xml b/java/FlinkDataGenerator/pom.xml new file mode 100644 index 0000000..0ea1d7b --- /dev/null +++ b/java/FlinkDataGenerator/pom.xml @@ -0,0 +1,185 @@ + + + 4.0.0 + + com.amazonaws + flink-data-generator + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 5.0.0-1.20 + 3.3.0-1.20 + 1.2.0 + 2.23.1 + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + org.apache.flink + flink-connector-aws-kinesis-streams + ${aws.connector.version} + + + + + org.apache.flink + flink-connector-kafka + ${kafka.connector.version} + + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + junit + junit + 4.13.2 + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.DataGeneratorJob + + + + + + + + + diff --git a/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/DataGeneratorJob.java b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/DataGeneratorJob.java new file mode 100644 index 0000000..d41161f --- /dev/null +++ b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/DataGeneratorJob.java @@ -0,0 +1,225 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; +import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator; +import org.apache.flink.formats.json.JsonSerializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +/** + * A Flink application that generates random stock data using DataGeneratorSource + * and sends it to Kinesis Data Streams and/or Kafka as JSON based on configuration. + * At least one sink (KinesisSink or KafkaSink) must be configured. + * The generated data matches the schema used by the Python data generator. + */ +public class DataGeneratorJob { + private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + // Default values for configuration + private static final int DEFAULT_RECORDS_PER_SECOND = 10; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + DataGeneratorJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + /** + * Create a DataGeneratorSource with configurable rate from DataGen properties + * + * @param dataGenProperties Properties from the "DataGen" property group + * @param generatorFunction The generator function to use for data generation + * @param typeInformation Type information for the generated data type + * @param The type of data to generate + * @return Configured DataGeneratorSource + */ + private static DataGeneratorSource createDataGeneratorSource( + Properties dataGenProperties, + GeneratorFunction generatorFunction, + TypeInformation typeInformation) { + + int recordsPerSecond; + if (dataGenProperties != null) { + String recordsPerSecondStr = dataGenProperties.getProperty("records.per.second"); + if (recordsPerSecondStr != null && !recordsPerSecondStr.trim().isEmpty()) { + try { + recordsPerSecond = Integer.parseInt(recordsPerSecondStr.trim()); + } catch (NumberFormatException e) { + LOG.error("Invalid records.per.second value: '{}'. Must be a valid integer. ", recordsPerSecondStr); + throw e; + } + } else { + LOG.info("No records.per.second configured. Using default: {}", DEFAULT_RECORDS_PER_SECOND); + recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; + } + } else { + LOG.info("No DataGen properties found. Using default records per second: {}", DEFAULT_RECORDS_PER_SECOND); + recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; + } + + Preconditions.checkArgument(recordsPerSecond > 0, + "Invalid records.per.second value. Must be positive."); + + + return new DataGeneratorSource( + generatorFunction, + Long.MAX_VALUE, // Generate (practically) unlimited records + RateLimiterStrategy.perSecond(recordsPerSecond), // Configurable rate + typeInformation // Explicit type information + ); + } + + /** + * Create a Kinesis Sink + * + * @param outputProperties Properties from the "KinesisSink" property group + * @param serializationSchema Serialization schema + * @param partitionKeyGenerator Partition key generator + * @param The type of data to sink + * @return an instance of KinesisStreamsSink + */ + private static KinesisStreamsSink createKinesisSink(Properties outputProperties, final SerializationSchema serializationSchema, final PartitionKeyGenerator partitionKeyGenerator + ) { + final String outputStreamArn = outputProperties.getProperty("stream.arn"); + return KinesisStreamsSink.builder() + .setStreamArn(outputStreamArn) + .setKinesisClientProperties(outputProperties) + .setSerializationSchema(serializationSchema) + .setPartitionKeyGenerator(partitionKeyGenerator) + .build(); + } + + /** + * Create a KafkaSink + * + * @param kafkaProperties Properties from the "KafkaSink" property group + * @param recordSerializationSchema Record serialization schema + * @param The type of data to sink + * @return an instance of KafkaSink + */ + private static KafkaSink createKafkaSink(Properties kafkaProperties, KafkaRecordSerializationSchema recordSerializationSchema) { + return KafkaSink.builder() + .setBootstrapServers(kafkaProperties.getProperty("bootstrap.servers")) + .setKafkaProducerConfig(kafkaProperties) + .setRecordSerializer(recordSerializationSchema) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + } + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Allows Flink to reuse objects across forwarded operators, as opposed to do a deep copy + // (this is safe because record objects are never mutated or passed by reference) + env.getConfig().enableObjectReuse(); + + LOG.info("Starting Flink Data Generator Job with conditional sinks"); + + // Load application properties + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + // Create a DataGeneratorSource that generates Stock objects using the generic method + DataGeneratorSource source = createDataGeneratorSource( + applicationProperties.get("DataGen"), + new StockPriceGeneratorFunction(), + TypeInformation.of(StockPrice.class) + ); + + // Create the data stream from the source + DataStream stockPricesStream = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "Data Generator" + ).uid("data generator"); + + // Add a passthrough operator exposing basic metrics + var outputStream = stockPricesStream.map(new MetricEmitterNoOpMap<>()).uid("metric-emitter"); + + + // Check if at least one sink is configured + Properties kinesisProperties = applicationProperties.get("KinesisSink"); + Properties kafkaProperties = applicationProperties.get("KafkaSink"); + boolean hasKinesisSink = kinesisProperties != null; + boolean hasKafkaSink = kafkaProperties != null; + + if (!hasKinesisSink && !hasKafkaSink) { + throw new IllegalArgumentException( + "At least one sink must be configured. Please provide either 'KinesisSink' or 'KafkaSink' configuration group."); + } + + // Create Kinesis sink with JSON serialization (only if configured) + if (hasKinesisSink) { + PartitionKeyGenerator partitionKeyGenerator = (record) -> String.valueOf(record.getTicker()); + KinesisStreamsSink kinesisSink = createKinesisSink( + kinesisProperties, + // Serialize the Kinesis record as JSON + new JsonSerializationSchema<>(), + // Shard by `ticker` + partitionKeyGenerator + ); + outputStream.sinkTo(kinesisSink).uid("kinesis-sink").disableChaining(); + LOG.info("Kinesis sink configured"); + } + + // Create Kafka sink with JSON serialization (only if configured) + if (hasKafkaSink) { + String kafkaTopic = Preconditions.checkNotNull(StringUtils.trimToNull(kafkaProperties.getProperty("topic")), "Kafka topic not defined"); + SerializationSchema valueSerializationSchema = new JsonSerializationSchema<>(); + SerializationSchema keySerializationSchema = (stockPrice) -> stockPrice.getTicker().getBytes(); + KafkaRecordSerializationSchema kafkaRecordSerializationSchema = + KafkaRecordSerializationSchema.builder() + .setTopic(kafkaTopic) + // Serialize the Kafka record value (payload) as JSON + .setValueSerializationSchema(valueSerializationSchema) + // Partition by `ticker` + .setKeySerializationSchema(keySerializationSchema) + .build(); + + KafkaSink kafkaSink = createKafkaSink(kafkaProperties, kafkaRecordSerializationSchema); + outputStream.sinkTo(kafkaSink).uid("kafka-sink").disableChaining(); + LOG.info("Kafka sink configured"); + } + + // Execute the job + env.execute("Flink Data Generator Job"); + } +} diff --git a/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/MetricEmitterNoOpMap.java b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/MetricEmitterNoOpMap.java new file mode 100644 index 0000000..c441b3b --- /dev/null +++ b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/MetricEmitterNoOpMap.java @@ -0,0 +1,52 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; + +/** + * No-op Map function exposing 3 custom metrics: generatedRecordCount, generatedRecordRatePerParallelism, and taskParallelism. + * Note each subtask emits its own metrics. + */ +public class MetricEmitterNoOpMap extends RichMapFunction { + private transient Counter recordCounter; + private transient int taskParallelism = 0; + private transient Meter recordMeter; + + @Override + public void open(OpenContext openContext) throws Exception { + this.recordCounter = getRuntimeContext() + .getMetricGroup() + .addGroup("kinesisAnalytics") // Automatically export metric to CloudWatch + .counter("generatedRecordCount"); + + this.recordMeter = getRuntimeContext() + .getMetricGroup() + .addGroup("kinesisAnalytics") // Automatically export metric to CloudWatch + .meter("generatedRecordRatePerParallelism", new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); + + getRuntimeContext() + .getMetricGroup() + .addGroup("kinesisAnalytics") // Automatically export metric to CloudWatch + .gauge("taskParallelism", new Gauge() { + @Override + public Integer getValue() { + return taskParallelism; + } + }); + + // Capture the task parallelism + taskParallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + } + + @Override + public T map(T record) throws Exception { + recordCounter.inc(); + recordMeter.markEvent(); + + return record; + } +} diff --git a/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 0000000..f159036 --- /dev/null +++ b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,70 @@ +package com.amazonaws.services.msf.domain; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class StockPrice { + // This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the + // appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization + @JsonProperty("event_time") + private String eventTime; + private String ticker; + private float price; + + public StockPrice() {} + + public StockPrice(String eventTime, String ticker, float price) { + this.eventTime = eventTime; + this.ticker = ticker; + this.price = price; + } + + public String getEventTime() { + return eventTime; + } + + public void setEventTime(String eventTime) { + this.eventTime = eventTime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public float getPrice() { + return price; + } + + public void setPrice(float price) { + this.price = price; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StockPrice stock = (StockPrice) o; + return Float.compare(stock.price, price) == 0 && + Objects.equals(eventTime, stock.eventTime) && + Objects.equals(ticker, stock.ticker); + } + + @Override + public int hashCode() { + return Objects.hash(eventTime, ticker, price); + } + + @Override + public String toString() { + return "Stock{" + + "event_time='" + eventTime + '\'' + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..cb32db2 --- /dev/null +++ b/java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java @@ -0,0 +1,59 @@ +package com.amazonaws.services.msf.domain; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Random; + +/** + * Generator function that creates random Stock objects. + * Implements GeneratorFunction to work with DataGeneratorSource. + * + * Modify this class to generate a different record type + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + // Stock tickers to randomly choose from (same as Python data generator) + private static final String[] TICKERS = { + "AAPL", + "MSFT", + "AMZN", + "GOOGL", + "META", + "NVDA", + "TSLA", + "INTC", + "ADBE", + "NFLX", + "PYPL", + "CSCO", + "PEP", + "AVGO", + "AMD", + "COST", + "QCOM", + "AMGN", + "SBUX", + "BKNG" + }; + + // Random number generator + private static final Random RANDOM = new Random(); + + // Date formatter for ISO format timestamps + private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + + @Override + public StockPrice map(Long value) throws Exception { + // Generate current timestamp in ISO format + String eventTime = LocalDateTime.now().format(ISO_FORMATTER); + + // Randomly select a ticker + String ticker = TICKERS[RANDOM.nextInt(TICKERS.length)]; + + // Generate random price between 0 and 100, rounded to 2 decimal places + float price = Math.round(RANDOM.nextFloat() * 100 * 100.0f) / 100.0f; + + return new StockPrice(eventTime, ticker, price); + } +} diff --git a/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json b/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..03a4ae1 --- /dev/null +++ b/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,22 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "1000" + } + }, + { + "PropertyGroupId": "KinesisSink", + "PropertyMap": { + "stream.arn": "arn:aws:kinesis:eu-west-1::stream/FlinkDataGeneratorTestStream", + "aws.region": "eu-west-1" + } + }, + { + "PropertyGroupId": "KafkaSink-DISABLE", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "stock-prices" + } + } +] diff --git a/java/FlinkDataGenerator/src/main/resources/log4j2.properties b/java/FlinkDataGenerator/src/main/resources/log4j2.properties new file mode 100644 index 0000000..9503413 --- /dev/null +++ b/java/FlinkDataGenerator/src/main/resources/log4j2.properties @@ -0,0 +1,16 @@ +# Set to debug or trace if log4j initialization is failing +status = warn + +# Name of the configuration +name = ConsoleLogConfigDemo + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info +# Root logger referring to console appender +rootLogger.appenderRef.stdout.ref = consoleLogger diff --git a/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/DataGeneratorJobTest.java b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/DataGeneratorJobTest.java new file mode 100644 index 0000000..61cd802 --- /dev/null +++ b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/DataGeneratorJobTest.java @@ -0,0 +1,150 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.junit.Test; +import static org.junit.Assert.*; +import java.util.Properties; +import java.util.HashMap; +import java.util.Map; +import java.lang.reflect.Method; + +public class DataGeneratorJobTest { + + @Test + public void testCreateDataGeneratorSource() throws Exception { + // Use reflection to test the private createDataGeneratorSource method + Method createDataGeneratorSourceMethod = DataGeneratorJob.class.getDeclaredMethod( + "createDataGeneratorSource", Properties.class, GeneratorFunction.class, TypeInformation.class); + createDataGeneratorSourceMethod.setAccessible(true); + + // Test with valid configuration + Properties dataGenProps = new Properties(); + dataGenProps.setProperty("records.per.second", "15"); + + StockPriceGeneratorFunction generatorFunction = new StockPriceGeneratorFunction(); + TypeInformation typeInfo = TypeInformation.of(StockPrice.class); + + DataGeneratorSource source = (DataGeneratorSource) createDataGeneratorSourceMethod.invoke( + null, dataGenProps, generatorFunction, typeInfo); + + assertNotNull("DataGeneratorSource should not be null", source); + + // Test with null properties (should use default rate) + source = (DataGeneratorSource) createDataGeneratorSourceMethod.invoke( + null, null, generatorFunction, typeInfo); + + assertNotNull("DataGeneratorSource should not be null with null properties", source); + + // Test with empty properties (should use default rate) + Properties emptyProps = new Properties(); + source = (DataGeneratorSource) createDataGeneratorSourceMethod.invoke( + null, emptyProps, generatorFunction, typeInfo); + + assertNotNull("DataGeneratorSource should not be null with empty properties", source); + } + + @Test + public void testCreateKafkaSink() throws Exception { + // Use reflection to test the private createKafkaSink method + Method createKafkaSinkMethod = DataGeneratorJob.class.getDeclaredMethod( + "createKafkaSink", Properties.class, KafkaRecordSerializationSchema.class); + createKafkaSinkMethod.setAccessible(true); + + // Test with valid Kafka properties + Properties kafkaProps = new Properties(); + kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); + kafkaProps.setProperty("topic", "test-topic"); + + // Create a mock KafkaRecordSerializationSchema + KafkaRecordSerializationSchema recordSerializationSchema = + KafkaRecordSerializationSchema.builder() + .setTopic("test-topic") + .setKeySerializationSchema(stock -> stock.getTicker().getBytes()) + .setValueSerializationSchema(new org.apache.flink.formats.json.JsonSerializationSchema<>()) + .build(); + + KafkaSink kafkaSink = (KafkaSink) createKafkaSinkMethod.invoke( + null, kafkaProps, recordSerializationSchema); + + assertNotNull("KafkaSink should not be null", kafkaSink); + } + + @Test + public void testKafkaPartitioningKey() { + // Test that ticker symbol can be used as Kafka partition key + StockPrice stock1 = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); + StockPrice stock2 = new StockPrice("2024-01-15T10:30:46", "MSFT", 200.50f); + + // Test that ticker can be converted to bytes for Kafka key + byte[] key1 = stock1.getTicker().getBytes(); + byte[] key2 = stock2.getTicker().getBytes(); + + assertNotNull("Kafka key should not be null", key1); + assertNotNull("Kafka key should not be null", key2); + assertTrue("Kafka key should not be empty", key1.length > 0); + assertTrue("Kafka key should not be empty", key2.length > 0); + + // Test that different tickers produce different keys + assertFalse("Different tickers should produce different keys", + java.util.Arrays.equals(key1, key2)); + + // Test that same ticker produces same key + StockPrice stock3 = new StockPrice("2024-01-15T10:30:47", "AAPL", 175.50f); + byte[] key3 = stock3.getTicker().getBytes(); + assertTrue("Same ticker should produce same key", + java.util.Arrays.equals(key1, key3)); + } + + @Test + public void testConditionalSinkValidation() { + // Test that the application validates sink configuration properly + Map appProperties = new HashMap<>(); + + // Test with no sinks configured - should be invalid + boolean hasKinesis = appProperties.get("KinesisSink") != null; + boolean hasKafka = appProperties.get("KafkaSink") != null; + assertFalse("Should not have Kinesis sink when not configured", hasKinesis); + assertFalse("Should not have Kafka sink when not configured", hasKafka); + assertTrue("Should require at least one sink", !hasKinesis && !hasKafka); + + // Test with only Kinesis configured - should be valid + Properties kinesisProps = new Properties(); + kinesisProps.setProperty("stream.arn", "test-arn"); + kinesisProps.setProperty("aws.region", "us-east-1"); + appProperties.put("KinesisSink", kinesisProps); + + hasKinesis = appProperties.get("KinesisSink") != null; + hasKafka = appProperties.get("KafkaSink") != null; + assertTrue("Should have Kinesis sink when configured", hasKinesis); + assertFalse("Should not have Kafka sink when not configured", hasKafka); + assertTrue("Should be valid with one sink", hasKinesis || hasKafka); + + // Test with only Kafka configured - should be valid + appProperties.clear(); + Properties kafkaProps = new Properties(); + kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); + kafkaProps.setProperty("topic", "test-topic"); + appProperties.put("KafkaSink", kafkaProps); + + hasKinesis = appProperties.get("KinesisSink") != null; + hasKafka = appProperties.get("KafkaSink") != null; + assertFalse("Should not have Kinesis sink when not configured", hasKinesis); + assertTrue("Should have Kafka sink when configured", hasKafka); + assertTrue("Should be valid with one sink", hasKinesis || hasKafka); + + // Test with both configured - should be valid + appProperties.put("KinesisSink", kinesisProps); + + hasKinesis = appProperties.get("KinesisSink") != null; + hasKafka = appProperties.get("KafkaSink") != null; + assertTrue("Should have Kinesis sink when configured", hasKinesis); + assertTrue("Should have Kafka sink when configured", hasKafka); + assertTrue("Should be valid with both sinks", hasKinesis || hasKafka); + } +} diff --git a/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunctionTest.java b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunctionTest.java new file mode 100644 index 0000000..a1dc83c --- /dev/null +++ b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunctionTest.java @@ -0,0 +1,77 @@ +package com.amazonaws.services.msf.domain; + +import org.junit.Test; +import static org.junit.Assert.*; +import java.util.Arrays; +import java.util.List; + +public class StockPriceGeneratorFunctionTest { + private static final String[] TICKERS = { + "AAPL", + "MSFT", + "AMZN", + "GOOGL", + "META", + "NVDA", + "TSLA", + "INTC", + "ADBE", + "NFLX", + "PYPL", + "CSCO", + "PEP", + "AVGO", + "AMD", + "COST", + "QCOM", + "AMGN", + "SBUX", + "BKNG" + }; + + @Test + public void testStockGeneratorFunction() throws Exception { + StockPriceGeneratorFunction generator = new StockPriceGeneratorFunction(); + + // Generate a stock record + StockPrice stock = generator.map(1L); + + // Verify the stock is not null + assertNotNull(stock); + + // Verify event_time is not null and not empty + assertNotNull(stock.getEventTime()); + assertFalse(stock.getEventTime().isEmpty()); + + // Verify ticker is one of the expected values + List expectedTickers = Arrays.asList(TICKERS); + assertTrue("Ticker should be one of the expected values", + expectedTickers.contains(stock.getTicker())); + + // Verify price is within expected range (0 to 100) + assertTrue("Price should be >= 0", stock.getPrice() >= 0); + assertTrue("Price should be <= 100", stock.getPrice() <= 100); + + // Verify price has at most 2 decimal places + String priceStr = String.valueOf(stock.getPrice()); + int decimalIndex = priceStr.indexOf('.'); + if (decimalIndex != -1) { + int decimalPlaces = priceStr.length() - decimalIndex - 1; + assertTrue("Price should have at most 2 decimal places", decimalPlaces <= 2); + } + } + + @Test + public void testMultipleGenerations() throws Exception { + StockPriceGeneratorFunction generator = new StockPriceGeneratorFunction(); + + // Generate multiple records to ensure randomness + for (int i = 0; i < 10; i++) { + StockPrice stock = generator.map((long) i); + assertNotNull(stock); + assertNotNull(stock.getEventTime()); + assertNotNull(stock.getTicker()); + assertTrue(stock.getPrice() >= 0 && stock.getPrice() <= 100); + } + } +} diff --git a/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceTest.java b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceTest.java new file mode 100644 index 0000000..32f6c13 --- /dev/null +++ b/java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceTest.java @@ -0,0 +1,64 @@ +package com.amazonaws.services.msf.domain; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class StockPriceTest { + + @Test + public void testStockCreation() { + StockPrice stock = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); + + assertEquals("2024-01-15T10:30:45", stock.getEventTime()); + assertEquals("AAPL", stock.getTicker()); + assertEquals(150.25f, stock.getPrice(), 0.001); + } + + @Test + public void testStockToString() { + StockPrice stock = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); + String expected = "Stock{event_time='2024-01-15T10:30:45', ticker='AAPL', price=150.25}"; + assertEquals(expected, stock.toString()); + } + + @Test + public void testStockSetters() { + StockPrice stock = new StockPrice(); + stock.setEventTime("2024-01-15T10:30:45"); + stock.setTicker("MSFT"); + stock.setPrice(200.50f); + + assertEquals("2024-01-15T10:30:45", stock.getEventTime()); + assertEquals("MSFT", stock.getTicker()); + assertEquals(200.50f, stock.getPrice(), 0.001); + } + + @Test + public void testStockHashCodeForPartitioning() { + // Create test stock objects + StockPrice stock1 = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); + StockPrice stock2 = new StockPrice("2024-01-15T10:30:46", "MSFT", 200.50f); + StockPrice stock3 = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); // Same as stock1 + + // Test that hashCode is consistent for equal objects + assertEquals("Equal stock objects should have same hashCode", + stock1.hashCode(), stock3.hashCode()); + + // Test that equals works correctly + assertEquals("Same stock objects should be equal", stock1, stock3); + assertNotEquals("Different stock objects should not be equal", stock1, stock2); + + // Test that different stocks likely have different hashCodes + assertNotEquals("Different stock objects should likely have different hashCodes", + stock1.hashCode(), stock2.hashCode()); + + // Test that hashCode can be used as partition key (should not throw exception) + String partitionKey1 = String.valueOf(stock1.hashCode()); + String partitionKey2 = String.valueOf(stock2.hashCode()); + + assertNotNull("Partition key should not be null", partitionKey1); + assertNotNull("Partition key should not be null", partitionKey2); + assertFalse("Partition key should not be empty", partitionKey1.isEmpty()); + assertFalse("Partition key should not be empty", partitionKey2.isEmpty()); + } +} diff --git a/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kafka-only.json b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kafka-only.json new file mode 100644 index 0000000..477b711 --- /dev/null +++ b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kafka-only.json @@ -0,0 +1,15 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "KafkaSink", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "test-topic" + } + } +] diff --git a/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kinesis-only.json b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kinesis-only.json new file mode 100644 index 0000000..7cea8f4 --- /dev/null +++ b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-kinesis-only.json @@ -0,0 +1,15 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "KinesisSink", + "PropertyMap": { + "stream.arn": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "aws.region": "us-east-1" + } + } +] diff --git a/java/FlinkDataGenerator/src/test/resources/flink-application-properties-no-sinks.json b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-no-sinks.json new file mode 100644 index 0000000..d44e034 --- /dev/null +++ b/java/FlinkDataGenerator/src/test/resources/flink-application-properties-no-sinks.json @@ -0,0 +1,8 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + } +] diff --git a/java/FlinkDataGenerator/tools/dashboard-cfn.yaml b/java/FlinkDataGenerator/tools/dashboard-cfn.yaml new file mode 100644 index 0000000..8097940 --- /dev/null +++ b/java/FlinkDataGenerator/tools/dashboard-cfn.yaml @@ -0,0 +1,578 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: 'CloudWatch Dashboard for Flink Data Generator with conditional Kinesis and Kafka widgets' + +Metadata: + AWS::CloudFormation::Interface: + ParameterGroups: + - Label: + default: "Dashboard Configuration" + Parameters: + - DashboardName + - Application + - Region + - Label: + default: "Kinesis Configuration" + Parameters: + - StreamName + - Label: + default: "MSK Configuration" + Parameters: + - ClusterName + - Topic + ParameterLabels: + DashboardName: + default: "Dashboard Name" + Application: + default: "Application Name" + Region: + default: "AWS Region" + StreamName: + default: "Kinesis Stream Name" + ClusterName: + default: "MSK Cluster Name" + Topic: + default: "Kafka Topic Name" + +Parameters: + DashboardName: + Type: String + Default: FlinkDataGenerator + Description: Name of the CloudWatch Dashboard + + Application: + Type: String + Default: FlinkDataGenerator + Description: Name of the Flink application + + Region: + Type: String + Default: eu-west-1 + Description: AWS region where resources are deployed + AllowedValues: + - us-east-1 + - us-east-2 + - us-west-1 + - us-west-2 + - eu-west-1 + - eu-west-2 + - eu-west-3 + - eu-central-1 + - ap-northeast-1 + - ap-northeast-2 + - ap-southeast-1 + - ap-southeast-2 + - ap-south-1 + - sa-east-1 + + StreamName: + Type: String + Default: '' + Description: Name of the Kinesis Data Stream (leave empty to skip Kinesis widget) + + ClusterName: + Type: String + Default: '' + Description: Name of the MSK (Kafka) cluster (leave empty to skip Kafka widget) + + Topic: + Type: String + Default: '' + Description: Name of the Kafka topic (leave empty to skip Kafka widget) + +Conditions: + HasKafkaConfig: !And + - !Not [!Equals [!Ref ClusterName, '']] + - !Not [!Equals [!Ref Topic, '']] + + HasKinesisConfig: !Not [!Equals [!Ref StreamName, '']] + + HasBothSinks: !And + - !Condition HasKafkaConfig + - !Condition HasKinesisConfig + + HasOnlyKafka: !And + - !Condition HasKafkaConfig + - !Not [!Condition HasKinesisConfig] + + HasOnlyKinesis: !And + - !Condition HasKinesisConfig + - !Not [!Condition HasKafkaConfig] + +Resources: + # Dashboard with both Kafka and Kinesis widgets + DashboardWithBothSinks: + Type: AWS::CloudWatch::Dashboard + Condition: HasBothSinks + Properties: + DashboardName: !Ref DashboardName + DashboardBody: !Sub | + { + "widgets": [ + { + "type": "metric", + "x": 0, + "y": 0, + "width": 8, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m1 / 4", "label": "globalGeneratedRecordCount", "id": "e1", "region": "${Region}"}], + [{"expression": "m2 * m3", "label": "globalGeneratedRecordsPerSec", "id": "e2", "yAxis": "right", "region": "${Region}"}], + ["AWS/KinesisAnalytics", "generatedRecordCount", "Application", "${Application}", {"stat": "Sum", "id": "m1", "visible": false, "region": "${Region}"}], + [".", "generatedRecordRatePerParallelism", ".", ".", {"id": "m2", "visible": false, "region": "${Region}"}], + [".", "taskParallelism", ".", ".", {"id": "m3", "visible": false, "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Average", + "period": 60, + "title": "Data Generator", + "yAxis": { + "left": {"label": "Record count", "showUnits": false}, + "right": {"label": "Record/sec", "showUnits": false} + } + } + }, + { + "type": "metric", + "x": 8, + "y": 0, + "width": 8, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m1+m2+m3+m4+m5+m6", "label": "Total bytesInPerSec", "id": "e1", "yAxis": "right", "region": "${Region}"}], + ["AWS/Kafka", "BytesInPerSec", "Cluster Name", "${ClusterName}", "Broker ID", "1", "Topic", "${Topic}", {"yAxis": "right", "region": "${Region}", "id": "m1", "visible": false}], + ["...", "2", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m2", "visible": false}], + ["...", "3", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m3", "visible": false}], + ["...", "4", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m4", "visible": false}], + ["...", "5", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m5", "visible": false}], + ["...", "6", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m6", "visible": false}], + [{"expression": "m7+m8+m9+m10+m11+m12", "label": "Total MessagesInPerSec", "id": "e2", "region": "${Region}", "color": "#aec7e8"}], + ["AWS/Kafka", "MessagesInPerSec", "Cluster Name", "${ClusterName}", "Broker ID", "1", "Topic", "${Topic}", {"region": "${Region}", "id": "m7", "visible": false}], + ["...", "2", ".", ".", {"region": "${Region}", "id": "m8", "visible": false}], + ["...", "3", ".", ".", {"region": "${Region}", "id": "m9", "visible": false}], + ["...", "4", ".", ".", {"region": "${Region}", "id": "m10", "visible": false}], + ["...", "5", ".", ".", {"region": "${Region}", "id": "m11", "visible": false}], + ["...", "6", ".", ".", {"region": "${Region}", "id": "m12", "visible": false}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "period": 60, + "yAxis": { + "left": {"label": "Messages/Sec", "showUnits": false, "min": 0}, + "right": {"label": "Bytes/Sec", "showUnits": false, "min": 0} + }, + "title": "Kafka output", + "stat": "Average" + } + }, + { + "type": "metric", + "x": 16, + "y": 0, + "width": 8, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m3 / 60", "label": "PublishedRecordsPerSec", "id": "e1", "yAxis": "right"}], + ["AWS/Kinesis", "PutRecords.ThrottledRecords", "StreamName", "${StreamName}", {"region": "${Region}", "id": "m1"}], + [".", "WriteProvisionedThroughputExceeded", ".", ".", {"id": "m2", "region": "${Region}", "stat": "Average"}], + [".", "IncomingRecords", ".", ".", {"id": "m3", "region": "${Region}", "yAxis": "right", "stat": "Sum", "visible": false}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Maximum", + "period": 60, + "yAxis": { + "left": {"label": "Throttling", "showUnits": false, "min": 0}, + "right": {"label": "Records/sec", "showUnits": false, "min": 0} + }, + "title": "Kinesis Data Stream output" + } + }, + { + "type": "metric", + "x": 0, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "containerMemoryUtilization", "Application", "${Application}", {"region": "${Region}", "label": "containerMemoryUtilization", "id": "m1"}], + [".", "containerCPUUtilization", ".", ".", {"yAxis": "right", "region": "${Region}", "label": "containerCPUUtilization", "id": "m2"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "title": "Generator resource utilization", + "stat": "Average", + "period": 60, + "yAxis": { + "left": {"label": "Mem (%)", "showUnits": false, "min": 0, "max": 100}, + "right": {"label": "CPU (%)", "showUnits": false, "min": 0, "max": 100} + }, + "annotations": { + "horizontal": [{"label": "Threshold", "value": 90}] + } + } + }, + { + "type": "metric", + "x": 8, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "view": "timeSeries", + "stacked": false, + "metrics": [ + ["AWS/KinesisAnalytics", "busyTimeMsPerSecond", "Application", "${Application}", {"region": "${Region}"}], + [".", "backPressuredTimeMsPerSecond", ".", ".", {"region": "${Region}"}], + [".", "idleTimeMsPerSecond", ".", ".", {"region": "${Region}"}] + ], + "region": "${Region}", + "title": "Generator application busy-ness", + "yAxis": { + "left": {"label": "1/1000", "showUnits": false, "min": 0, "max": 1000}, + "right": {"label": ""} + }, + "period": 300, + "liveData": true + } + }, + { + "type": "metric", + "x": 16, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "uptime", "Application", "${Application}", {"region": "${Region}"}], + [".", "fullRestarts", ".", ".", {"yAxis": "right", "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Maximum", + "period": 60, + "title": "Generator uptime and restart" + } + } + ] + } + # Dashboard with only Kafka widget + DashboardWithKafkaOnly: + Type: AWS::CloudWatch::Dashboard + Condition: HasOnlyKafka + Properties: + DashboardName: !Ref DashboardName + DashboardBody: !Sub | + { + "widgets": [ + { + "type": "metric", + "x": 0, + "y": 0, + "width": 12, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m1 / 4", "label": "globalGeneratedRecordCount", "id": "e1", "region": "${Region}"}], + [{"expression": "m2 * m3", "label": "globalGeneratedRecordsPerSec", "id": "e2", "yAxis": "right", "region": "${Region}"}], + ["AWS/KinesisAnalytics", "generatedRecordCount", "Application", "${Application}", {"stat": "Sum", "id": "m1", "visible": false, "region": "${Region}"}], + [".", "generatedRecordRatePerParallelism", ".", ".", {"id": "m2", "visible": false, "region": "${Region}"}], + [".", "taskParallelism", ".", ".", {"id": "m3", "visible": false, "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Average", + "period": 60, + "title": "Data Generator", + "yAxis": { + "left": {"label": "Record count", "showUnits": false}, + "right": {"label": "Record/sec", "showUnits": false} + } + } + }, + { + "type": "metric", + "x": 12, + "y": 0, + "width": 12, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m1+m2+m3+m4+m5+m6", "label": "Total bytesInPerSec", "id": "e1", "yAxis": "right", "region": "${Region}"}], + ["AWS/Kafka", "BytesInPerSec", "Cluster Name", "${ClusterName}", "Broker ID", "1", "Topic", "${Topic}", {"yAxis": "right", "region": "${Region}", "id": "m1", "visible": false}], + ["...", "2", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m2", "visible": false}], + ["...", "3", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m3", "visible": false}], + ["...", "4", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m4", "visible": false}], + ["...", "5", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m5", "visible": false}], + ["...", "6", ".", ".", {"yAxis": "right", "region": "${Region}", "id": "m6", "visible": false}], + [{"expression": "m7+m8+m9+m10+m11+m12", "label": "Total MessagesInPerSec", "id": "e2", "region": "${Region}", "color": "#aec7e8"}], + ["AWS/Kafka", "MessagesInPerSec", "Cluster Name", "${ClusterName}", "Broker ID", "1", "Topic", "${Topic}", {"region": "${Region}", "id": "m7", "visible": false}], + ["...", "2", ".", ".", {"region": "${Region}", "id": "m8", "visible": false}], + ["...", "3", ".", ".", {"region": "${Region}", "id": "m9", "visible": false}], + ["...", "4", ".", ".", {"region": "${Region}", "id": "m10", "visible": false}], + ["...", "5", ".", ".", {"region": "${Region}", "id": "m11", "visible": false}], + ["...", "6", ".", ".", {"region": "${Region}", "id": "m12", "visible": false}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "period": 60, + "yAxis": { + "left": {"label": "Messages/Sec", "showUnits": false, "min": 0}, + "right": {"label": "Bytes/Sec", "showUnits": false, "min": 0} + }, + "title": "Kafka output", + "stat": "Average" + } + }, + { + "type": "metric", + "x": 0, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "containerMemoryUtilization", "Application", "${Application}", {"region": "${Region}", "label": "containerMemoryUtilization", "id": "m1"}], + [".", "containerCPUUtilization", ".", ".", {"yAxis": "right", "region": "${Region}", "label": "containerCPUUtilization", "id": "m2"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "title": "Generator resource utilization", + "stat": "Average", + "period": 60, + "yAxis": { + "left": {"label": "Mem (%)", "showUnits": false, "min": 0, "max": 100}, + "right": {"label": "CPU (%)", "showUnits": false, "min": 0, "max": 100} + }, + "annotations": { + "horizontal": [{"label": "Threshold", "value": 90}] + } + } + }, + { + "type": "metric", + "x": 8, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "view": "timeSeries", + "stacked": false, + "metrics": [ + ["AWS/KinesisAnalytics", "busyTimeMsPerSecond", "Application", "${Application}", {"region": "${Region}"}], + [".", "backPressuredTimeMsPerSecond", ".", ".", {"region": "${Region}"}], + [".", "idleTimeMsPerSecond", ".", ".", {"region": "${Region}"}] + ], + "region": "${Region}", + "title": "Generator application busy-ness", + "yAxis": { + "left": {"label": "1/1000", "showUnits": false, "min": 0, "max": 1000}, + "right": {"label": ""} + }, + "period": 300, + "liveData": true + } + }, + { + "type": "metric", + "x": 16, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "uptime", "Application", "${Application}", {"region": "${Region}"}], + [".", "fullRestarts", ".", ".", {"yAxis": "right", "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Maximum", + "period": 60, + "title": "Generator uptime and restart" + } + } + ] + } + # Dashboard with only Kinesis widget + DashboardWithKinesisOnly: + Type: AWS::CloudWatch::Dashboard + Condition: HasOnlyKinesis + Properties: + DashboardName: !Ref DashboardName + DashboardBody: !Sub | + { + "widgets": [ + { + "type": "metric", + "x": 0, + "y": 0, + "width": 12, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m1 / 4", "label": "globalGeneratedRecordCount", "id": "e1", "region": "${Region}"}], + [{"expression": "m2 * m3", "label": "globalGeneratedRecordsPerSec", "id": "e2", "yAxis": "right", "region": "${Region}"}], + ["AWS/KinesisAnalytics", "generatedRecordCount", "Application", "${Application}", {"stat": "Sum", "id": "m1", "visible": false, "region": "${Region}"}], + [".", "generatedRecordRatePerParallelism", ".", ".", {"id": "m2", "visible": false, "region": "${Region}"}], + [".", "taskParallelism", ".", ".", {"id": "m3", "visible": false, "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Average", + "period": 60, + "title": "Data Generator", + "yAxis": { + "left": {"label": "Record count", "showUnits": false}, + "right": {"label": "Record/sec", "showUnits": false} + } + } + }, + { + "type": "metric", + "x": 12, + "y": 0, + "width": 12, + "height": 6, + "properties": { + "metrics": [ + [{"expression": "m3 / 60", "label": "PublishedRecordsPerSec", "id": "e1", "yAxis": "right"}], + ["AWS/Kinesis", "PutRecords.ThrottledRecords", "StreamName", "${StreamName}", {"region": "${Region}", "id": "m1"}], + [".", "WriteProvisionedThroughputExceeded", ".", ".", {"id": "m2", "region": "${Region}", "stat": "Average"}], + [".", "IncomingRecords", ".", ".", {"id": "m3", "region": "${Region}", "yAxis": "right", "stat": "Sum", "visible": false}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Maximum", + "period": 60, + "yAxis": { + "left": {"label": "Throttling", "showUnits": false, "min": 0}, + "right": {"label": "Records/sec", "showUnits": false, "min": 0} + }, + "title": "Kinesis Data Stream output" + } + }, + { + "type": "metric", + "x": 0, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "containerMemoryUtilization", "Application", "${Application}", {"region": "${Region}", "label": "containerMemoryUtilization", "id": "m1"}], + [".", "containerCPUUtilization", ".", ".", {"yAxis": "right", "region": "${Region}", "label": "containerCPUUtilization", "id": "m2"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "title": "Generator resource utilization", + "stat": "Average", + "period": 60, + "yAxis": { + "left": {"label": "Mem (%)", "showUnits": false, "min": 0, "max": 100}, + "right": {"label": "CPU (%)", "showUnits": false, "min": 0, "max": 100} + }, + "annotations": { + "horizontal": [{"label": "Threshold", "value": 90}] + } + } + }, + { + "type": "metric", + "x": 8, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "view": "timeSeries", + "stacked": false, + "metrics": [ + ["AWS/KinesisAnalytics", "busyTimeMsPerSecond", "Application", "${Application}", {"region": "${Region}"}], + [".", "backPressuredTimeMsPerSecond", ".", ".", {"region": "${Region}"}], + [".", "idleTimeMsPerSecond", ".", ".", {"region": "${Region}"}] + ], + "region": "${Region}", + "title": "Generator application busy-ness", + "yAxis": { + "left": {"label": "1/1000", "showUnits": false, "min": 0, "max": 1000}, + "right": {"label": ""} + }, + "period": 300, + "liveData": true + } + }, + { + "type": "metric", + "x": 16, + "y": 6, + "width": 8, + "height": 5, + "properties": { + "metrics": [ + ["AWS/KinesisAnalytics", "uptime", "Application", "${Application}", {"region": "${Region}"}], + [".", "fullRestarts", ".", ".", {"yAxis": "right", "region": "${Region}"}] + ], + "view": "timeSeries", + "stacked": false, + "region": "${Region}", + "stat": "Maximum", + "period": 60, + "title": "Generator uptime and restart" + } + } + ] + } +Outputs: + DashboardName: + Description: Name of the created CloudWatch Dashboard + Value: !If + - HasBothSinks + - !Ref DashboardWithBothSinks + - !If + - HasOnlyKafka + - !Ref DashboardWithKafkaOnly + - !If + - HasOnlyKinesis + - !Ref DashboardWithKinesisOnly + - 'No dashboard created - missing required parameters' + + DashboardURL: + Description: URL to access the CloudWatch Dashboard + Value: !If + - HasBothSinks + - !Sub 'https://${Region}.console.aws.amazon.com/cloudwatch/home?region=${Region}#dashboards:name=${DashboardName}' + - !If + - HasOnlyKafka + - !Sub 'https://${Region}.console.aws.amazon.com/cloudwatch/home?region=${Region}#dashboards:name=${DashboardName}' + - !If + - HasOnlyKinesis + - !Sub 'https://${Region}.console.aws.amazon.com/cloudwatch/home?region=${Region}#dashboards:name=${DashboardName}' + - 'No dashboard URL - missing required parameters' + + ConfigurationSummary: + Description: Summary of the dashboard configuration + Value: !If + - HasBothSinks + - 'Dashboard created with both Kafka and Kinesis widgets' + - !If + - HasOnlyKafka + - 'Dashboard created with Kafka widget only' + - !If + - HasOnlyKinesis + - 'Dashboard created with Kinesis widget only' + - 'No dashboard created - ClusterName+Topic or StreamName required' diff --git a/java/pom.xml b/java/pom.xml index 877b596..344a44d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -43,5 +43,6 @@ S3AvroSink S3AvroSource FlinkCDC/FlinkCDCSQLServerSource + FlinkDataGenerator \ No newline at end of file