Skip to content

Commit bab2041

Browse files
authored
Task: Create Flink SQL aggregate count (#1539)
* Tutorial for count in flink sql * Updates for comments * another update for missing changes from min/max clone
1 parent c0bbedd commit bab2041

File tree

66 files changed

+1229
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1229
-3
lines changed

.semaphore/semaphore.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,3 +499,6 @@ blocks:
499499
- name: Flink SQL aggregation MIN/MAX tests
500500
commands:
501501
- make -C _includes/tutorials/aggregating-minmax/flinksql/code tutorial
502+
- name: Flink SQL test for aggregating count
503+
commands:
504+
- make -C _includes/tutorials/aggregating-count/flinksql/code tutorial

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ GEM
6161
rexml
6262
kramdown-parser-gfm (1.1.0)
6363
kramdown (~> 2.0)
64-
liquid (4.0.3)
64+
liquid (4.0.4)
6565
listen (3.7.1)
6666
rb-fsevent (~> 0.10, >= 0.10.3)
6767
rb-inotify (~> 0.9, >= 0.9.10)
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
dev:
2+
steps:
3+
- title: Prerequisites
4+
content:
5+
- action: skip
6+
render:
7+
file: shared/markup/dev/docker-prerequisite.adoc
8+
9+
- title: Initialize the project
10+
content:
11+
- action: execute
12+
file: tutorial-steps/dev/init.sh
13+
render:
14+
file: tutorials/aggregating-count/flinksql/markup/dev/init.adoc
15+
16+
- title: Get Confluent Platform
17+
content:
18+
- action: make_file
19+
file: docker-compose.yml
20+
render:
21+
file: tutorials/aggregating-count/flinksql/markup/dev/make-docker-compose.adoc
22+
23+
- action: execute_async
24+
file: tutorial-steps/dev/docker-compose-up.sh
25+
render:
26+
file: tutorials/aggregating-count/flinksql/markup/dev/start-compose.adoc
27+
28+
- action: execute
29+
file: tutorial-steps/dev/wait-for-containers.sh
30+
render:
31+
skip: true
32+
33+
- title: Write the program interactively using the CLI
34+
content:
35+
- action: docker_flinksql_cli_session
36+
container: flink-sql-client
37+
docker_bootup_file: tutorial-steps/dev/start-cli.sh
38+
column_width: 20
39+
render:
40+
file: tutorials/aggregating-count/flinksql/markup/dev/start-cli.adoc
41+
stdin:
42+
- file: tutorial-steps/dev/create-movie-ticket-sales.sql
43+
render:
44+
file: tutorials/aggregating-count/flinksql/markup/dev/create-movie-ticket-sales.adoc
45+
46+
- file: tutorial-steps/dev/populate-movie-ticket-sales.sql
47+
render:
48+
file: tutorials/aggregating-count/flinksql/markup/dev/populate-movie-ticket-sales.adoc
49+
50+
- file: tutorial-steps/dev/transient-query.sql
51+
render:
52+
file: tutorials/aggregating-count/flinksql/markup/dev/transient-query.adoc
53+
54+
- file: tutorial-steps/dev/create-movie-sales-by-title.sql
55+
render:
56+
file: tutorials/aggregating-count/flinksql/markup/dev/create-movie-sales-by-title.adoc
57+
58+
- file: tutorial-steps/dev/populate-movie-sales-by-title.sql
59+
render:
60+
file: tutorials/aggregating-count/flinksql/markup/dev/populate-movie-sales-by-title.adoc
61+
62+
63+
stdout:
64+
directory: tutorial-steps/dev/outputs
65+
66+
- title: Validate output
67+
content:
68+
- action: execute
69+
file: tutorial-steps/dev/validate-movie-sales-by-title.sh
70+
stdout: tutorial-steps/dev/outputs/validate-movie-sales-by-title.log
71+
render:
72+
file: tutorials/aggregating-count/flinksql/markup/dev/validate-movie-sales-by-title.adoc
73+
74+
test:
75+
steps:
76+
- title: Decide what testing tools to use
77+
content:
78+
- action: skip
79+
render:
80+
file: tutorials/aggregating-count/flinksql/markup/test/test-architecture.adoc
81+
82+
- title: Create the test skeleton
83+
content:
84+
- action: execute
85+
file: tutorial-steps/test/make-test-dirs.sh
86+
render:
87+
file: tutorials/aggregating-count/flinksql/markup/test/make-test-dirs.adoc
88+
89+
- action: make_file
90+
file: build.gradle
91+
render:
92+
file: tutorials/aggregating-count/flinksql/markup/test/make-build-gradle.adoc
93+
94+
- action: execute
95+
file: tutorial-steps/test/gradle-wrapper.sh
96+
render:
97+
file: tutorials/aggregating-count/flinksql/markup/test/make-gradle-wrapper.adoc
98+
99+
- title: Create SQL resources
100+
content:
101+
- action: make_file
102+
file: src/test/resources/create-movie-sales.sql.template
103+
render:
104+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-create-movie-sales.sql.template.adoc
105+
106+
- action: make_file
107+
file: src/test/resources/populate-movie-sales.sql
108+
render:
109+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-populate-movie-sales.sql.adoc
110+
111+
- action: make_file
112+
file: src/test/resources/create-movie-sales-by-title.sql.template
113+
render:
114+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-create-movie-sales-by-title.sql.template.adoc
115+
116+
- action: make_file
117+
file: src/test/resources/populate-movie-sales-by-title.sql
118+
render:
119+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-populate-movie-sales-by-title.sql.adoc
120+
121+
- action: make_file
122+
file: src/test/resources/query-movie-sales-by-title.sql
123+
render:
124+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-query-movie-sales-by-title.sql.adoc
125+
126+
- action: make_file
127+
file: src/test/resources/expected-movie-sales-by-title.txt
128+
render:
129+
file: tutorials/aggregating-count/flinksql/markup/test/create-resource-expected-movie-sales-by-title.txt.adoc
130+
131+
- title: Write a test
132+
content:
133+
- action: make_file
134+
file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
135+
render:
136+
file: tutorials/aggregating-count/flinksql/markup/test/make-test-base.adoc
137+
138+
- action: make_file
139+
file: src/test/java/io/confluent/developer/FlinkSqlAggregatingCountTest.java
140+
render:
141+
file: tutorials/aggregating-count/flinksql/markup/test/make-test.adoc
142+
143+
- title: Invoke the test
144+
content:
145+
- action: execute
146+
file: tutorial-steps/test/invoke-test.sh
147+
render:
148+
file: tutorials/aggregating-count/flinksql/markup/test/invoke-test.adoc
149+
150+
ccloud:
151+
steps:
152+
- title: Run your app to Confluent Cloud
153+
content:
154+
- action: skip
155+
render:
156+
file: shared/markup/ccloud/try-ccloud.adoc

_data/tutorials.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,14 @@ aggregating-count:
205205
question: How can you count the number of events in a Kafka topic based on some
206206
criteria?
207207
introduction: Suppose you have a topic with events that represent ticket sales for
208-
movies. In this tutorial, you'll see an example of 'groupby count' in Kafka Streams
209-
and ksqlDB. We'll write a program that calculates the total number of tickets
208+
movies. In this tutorial, you'll see an example of 'groupby count' in Kafka Streams,
209+
ksqlDB, and Flink SQL. We'll write a program that calculates the total number of tickets
210210
sold per movie.
211211
status:
212212
ksql: enabled
213213
kstreams: enabled
214214
confluent: enabled
215+
flinksql: enabled
215216
aggregating-minmax:
216217
title: How to find the min/max in a stream of events
217218
meta-description: find the minimum or maximum value of a field in a stream of events
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
tutorial-steps/dev/outputs/
2+
3+
# Ignore Gradle project-specific cache directory
4+
.gradle
5+
6+
# Ignore Gradle build output directory
7+
build
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
STEPS_DIR := tutorial-steps
2+
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
3+
TEMP_DIR := $(shell mktemp -d)
4+
SEQUENCE := "dev, test, ccloud"
5+
6+
tutorial:
7+
rm -r $(DEV_OUTPUTS_DIR) || true
8+
mkdir $(DEV_OUTPUTS_DIR)
9+
harness-runner ../../../../../_data/harnesses/aggregating-count/flinksql.yml $(TEMP_DIR) $(SEQUENCE)
10+
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-movie-sales-by-title.log $(DEV_OUTPUTS_DIR)/validate-movie-sales-by-title.log
11+
reset
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
}
6+
7+
plugins {
8+
id "java"
9+
}
10+
11+
sourceCompatibility = JavaVersion.VERSION_11
12+
targetCompatibility = JavaVersion.VERSION_11
13+
version = "0.0.1"
14+
15+
repositories {
16+
mavenCentral()
17+
}
18+
19+
dependencies {
20+
testImplementation "com.google.guava:guava:31.1-jre"
21+
testImplementation "junit:junit:4.13.2"
22+
testImplementation 'org.testcontainers:testcontainers:1.17.6'
23+
testImplementation 'org.testcontainers:kafka:1.17.6'
24+
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.16.1"
25+
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.16.1"
26+
testImplementation "org.apache.flink:flink-test-utils:1.16.1"
27+
testImplementation "org.apache.flink:flink-test-utils-junit:1.16.1"
28+
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.16.1"
29+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1"
30+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1:tests"
31+
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.16.1"
32+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
---
2+
version: '2'
3+
4+
services:
5+
zookeeper:
6+
image: confluentinc/cp-zookeeper:7.3.0
7+
hostname: zookeeper
8+
container_name: zookeeper
9+
ports:
10+
- "2181:2181"
11+
environment:
12+
ZOOKEEPER_CLIENT_PORT: 2181
13+
ZOOKEEPER_TICK_TIME: 2000
14+
15+
broker:
16+
image: confluentinc/cp-kafka:7.3.0
17+
hostname: broker
18+
container_name: broker
19+
depends_on:
20+
- zookeeper
21+
ports:
22+
- "29092:29092"
23+
environment:
24+
KAFKA_BROKER_ID: 1
25+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
26+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
27+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
28+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
29+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
30+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
31+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
32+
33+
schema-registry:
34+
image: confluentinc/cp-schema-registry:7.3.0
35+
hostname: schema-registry
36+
container_name: schema-registry
37+
depends_on:
38+
- broker
39+
ports:
40+
- "8081:8081"
41+
environment:
42+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
43+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
44+
45+
flink-sql-client:
46+
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
47+
hostname: flink-sql-client
48+
container_name: flink-sql-client
49+
depends_on:
50+
- flink-jobmanager
51+
environment:
52+
FLINK_JOBMANAGER_HOST: flink-jobmanager
53+
volumes:
54+
- ./settings/:/settings
55+
56+
flink-jobmanager:
57+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
58+
hostname: flink-jobmanager
59+
container_name: flink-jobmanager
60+
ports:
61+
- "9081:9081"
62+
command: jobmanager
63+
environment:
64+
- |
65+
FLINK_PROPERTIES=
66+
jobmanager.rpc.address: flink-jobmanager
67+
rest.bind-port: 9081
68+
69+
flink-taskmanager:
70+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
71+
hostname: flink-taskmanager
72+
container_name: flink-taskmanager
73+
depends_on:
74+
- flink-jobmanager
75+
command: taskmanager
76+
scale: 1
77+
environment:
78+
- |
79+
FLINK_PROPERTIES=
80+
jobmanager.rpc.address: flink-jobmanager
81+
taskmanager.numberOfTaskSlots: 10
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
4+
zipStoreBase=GRADLE_USER_HOME
5+
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)