Skip to content

Commit 399fb8a

Browse files
authored
Feat add finding distinct flinksql (#1626)
* initial commit for flinksql deduplication * Completed init, cli, and validate sections * Completed tutorial * Update harness file * Updates per comments * Missed one comment
1 parent 548459a commit 399fb8a

File tree

65 files changed

+1191
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1191
-1
lines changed

.semaphore/semaphore.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,4 +499,6 @@ blocks:
499499
- name: Flink SQL test for filtering
500500
commands:
501501
- make -C _includes/tutorials/filtering/flinksql/code tutorial
502-
502+
- name: Flink SQL test for deduplication
503+
commands:
504+
- make -C _includes/tutorials/finding-distinct/flinksql/code tutorial
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
dev:
2+
steps:
3+
- title: Prerequisites
4+
content:
5+
- action: skip
6+
render:
7+
file: shared/markup/dev/docker-prerequisite.adoc
8+
9+
- title: Initialize the project
10+
content:
11+
- action: execute
12+
file: tutorial-steps/dev/init.sh
13+
render:
14+
file: tutorials/finding-distinct/flinksql/markup/dev/init.adoc
15+
16+
- title: Get Confluent Platform
17+
content:
18+
- action: make_file
19+
file: docker-compose.yml
20+
render:
21+
file: tutorials/finding-distinct/flinksql/markup/dev/make-docker-compose.adoc
22+
- action: execute_async
23+
file: tutorial-steps/dev/docker-compose-up.sh
24+
render:
25+
file: tutorials/finding-distinct/flinksql/markup/dev/start-compose.adoc
26+
- action: execute
27+
file: tutorial-steps/dev/wait-for-containers.sh
28+
render:
29+
skip: true
30+
31+
- title: Write the program interactively using the CLI
32+
content:
33+
- action: docker_flinksql_cli_session
34+
container: flink-sql-client
35+
docker_bootup_file: tutorial-steps/dev/start-cli.sh
36+
column_width: 20
37+
render:
38+
file: tutorials/aggregating-count/flinksql/markup/dev/start-cli.adoc
39+
stdin:
40+
- file: tutorial-steps/dev/create-clicks-table.sql
41+
render:
42+
file: tutorials/finding-distinct/flinksql/markup/dev/create-clicks-table.adoc
43+
44+
- file: tutorial-steps/dev/populate-clicks.sql
45+
render:
46+
file: tutorials/finding-distinct/flinksql/markup/dev/populate-clicks.adoc
47+
48+
- file: tutorial-steps/dev/deduplicate-click-events.sql
49+
render:
50+
file: tutorials/finding-distinct/flinksql/markup/dev/transient-deduplication.adoc
51+
52+
- file: tutorial-steps/dev/create-deduplicated-clicks-table.sql
53+
render:
54+
file: tutorials/finding-distinct/flinksql/markup/dev/create-deduplicated-clicks-table.adoc
55+
56+
- file: tutorial-steps/dev/populate-deduplicated-clicks-table.sql
57+
render:
58+
file: tutorials/finding-distinct/flinksql/markup/dev/populate-deduplicated-clicks-table.adoc
59+
60+
stdout:
61+
directory: tutorial-steps/dev/outputs
62+
63+
- name: wait for table execution to complete
64+
action: sleep
65+
ms: 10000
66+
render:
67+
skip: true
68+
69+
- title: Validate output
70+
content:
71+
- action: execute
72+
file: tutorial-steps/dev/validate-deduplicated-clicks.sh
73+
stdout: tutorial-steps/dev/outputs/actual-deduplicated-clicks.txt
74+
render:
75+
file: tutorials/finding-distinct/flinksql/markup/dev/validate-deduplicated-clicks-table.adoc
76+
77+
test:
78+
steps:
79+
- title: Decide what testing tools to use
80+
content:
81+
- action: skip
82+
render:
83+
file: tutorials/finding-distinct/flinksql/markup/test/test-architecture.adoc
84+
85+
- title: Create the test skeleton
86+
content:
87+
- action: execute
88+
file: tutorial-steps/test/make-test-dirs.sh
89+
render:
90+
file: tutorials/finding-distinct/flinksql/markup/test/make-test-dirs.adoc
91+
92+
- action: make_file
93+
file: build.gradle
94+
render:
95+
file: tutorials/finding-distinct/flinksql/markup/test/make-build-gradle.adoc
96+
97+
- action: execute
98+
file: tutorial-steps/test/gradle-wrapper.sh
99+
render:
100+
file: tutorials/finding-distinct/flinksql/markup/test/make-gradle-wrapper.adoc
101+
102+
- title: Create SQL resources
103+
content:
104+
- action: make_file
105+
file: src/test/resources/create-clicks-table.sql.template
106+
render:
107+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-create-clicks-table.sql.template.adoc
108+
109+
- action: make_file
110+
file: src/test/resources/populate-clicks.sql
111+
render:
112+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-populate-clicks.sql.adoc
113+
114+
- action: make_file
115+
file: src/test/resources/create-deduplicated-clicks-table.sql.template
116+
render:
117+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-deduplicated-clicks-table.sql.adoc
118+
119+
- action: make_file
120+
file: src/test/resources/populate-deduplicated-clicks-table.sql
121+
render:
122+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-populate-deduplicated-clicks-table.sql.adoc
123+
124+
- action: make_file
125+
file: src/test/resources/query-deduplicated-clicks.sql
126+
render:
127+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-query-deduplicated-clicks.sql.adoc
128+
129+
- action: make_file
130+
file: src/test/resources/expected-deduplicated-clicks.txt
131+
render:
132+
file: tutorials/finding-distinct/flinksql/markup/test/create-resource-expected-deduplicated-clicks.txt.adoc
133+
134+
- title: Write a test
135+
content:
136+
- action: make_file
137+
file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
138+
render:
139+
file: tutorials/finding-distinct/flinksql/markup/test/make-test-base.adoc
140+
141+
- action: make_file
142+
file: src/test/java/io/confluent/developer/FlinkSqlFindingDistinctTest.java
143+
render:
144+
file: tutorials/finding-distinct/flinksql/markup/test/make-test.adoc
145+
146+
- title: Invoke the test
147+
content:
148+
- action: execute
149+
file: tutorial-steps/test/invoke-test.sh
150+
render:
151+
file: tutorials/finding-distinct/flinksql/markup/test/invoke-test.adoc
152+
153+
ccloud:
154+
steps:
155+
- title: Run your app to Confluent Cloud
156+
content:
157+
- action: skip
158+
render:
159+
file: shared/markup/ccloud/try-ccloud.adoc

_data/tutorials.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ finding-distinct:
289289
kstreams: enabled
290290
kafka: disabled
291291
confluent: enabled
292+
flinksql: enabled
292293
window-final-result:
293294
title: Emit a final result from a time window
294295
meta-description: emit a final result from a time window
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
extensions/
2+
tutorial-steps/dev/outputs/
3+
tutorial-steps/test/outputs/
4+
tutorial-steps/prod/outputs/
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
STEPS_DIR := tutorial-steps
2+
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
3+
TEST_OUTPUTS_DIR := $(STEPS_DIR)/test/outputs
4+
PROD_OUTPUTS_DIR := $(STEPS_DIR)/prod/outputs
5+
TEMP_DIR := $(shell mktemp -d)
6+
SEQUENCE := "dev, test, prod, ccloud"
7+
8+
tutorial:
9+
rm -r $(DEV_OUTPUTS_DIR) || true
10+
rm -r $(TEST_OUTPUTS_DIR) || true
11+
mkdir $(DEV_OUTPUTS_DIR)
12+
mkdir $(TEST_OUTPUTS_DIR)
13+
harness-runner ../../../../../_data/harnesses/finding-distinct/flinksql.yml $(TEMP_DIR) $(SEQUENCE)
14+
bash -c "diff --strip-trailing-cr <(cat $(STEPS_DIR)/dev/expected-print-output-topic.log | sort) <(cat $(DEV_OUTPUTS_DIR)/actual-deduplicated-clicks.txt | sort)"
15+
16+
reset
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
}
6+
7+
plugins {
8+
id "java"
9+
id "idea"
10+
}
11+
12+
sourceCompatibility = JavaVersion.VERSION_11
13+
targetCompatibility = JavaVersion.VERSION_11
14+
version = "0.0.1"
15+
16+
repositories {
17+
mavenCentral()
18+
}
19+
20+
dependencies {
21+
testImplementation "com.google.guava:guava:31.1-jre"
22+
testImplementation "junit:junit:4.13.2"
23+
testImplementation 'org.testcontainers:testcontainers:1.17.6'
24+
testImplementation 'org.testcontainers:kafka:1.17.6'
25+
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.17.1"
26+
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.17.1"
27+
testImplementation "org.apache.flink:flink-test-utils:1.17.1"
28+
testImplementation "org.apache.flink:flink-test-utils-junit:1.17.1"
29+
testImplementation 'org.apache.flink:flink-json:1.17.1'
30+
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.17.0"
31+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1"
32+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1:tests"
33+
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.17.1"
34+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
---
2+
version: '2'
3+
4+
services:
5+
broker:
6+
image: confluentinc/cp-kafka:7.4.1
7+
hostname: broker
8+
container_name: broker
9+
ports:
10+
- 9092:9092
11+
environment:
12+
KAFKA_BROKER_ID: 1
13+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
14+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
15+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
16+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
17+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
18+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
19+
KAFKA_PROCESS_ROLES: 'broker,controller'
20+
KAFKA_NODE_ID: 1
21+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
22+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
23+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
24+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
25+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
26+
# This is done for running a single broker in combined mode for local development only
27+
# For multi-node deployments you should generate using the following
28+
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
29+
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
30+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
31+
32+
flink-sql-client:
33+
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
34+
hostname: flink-sql-client
35+
container_name: flink-sql-client
36+
depends_on:
37+
- flink-jobmanager
38+
environment:
39+
FLINK_JOBMANAGER_HOST: flink-jobmanager
40+
volumes:
41+
- ./settings/:/settings
42+
43+
flink-jobmanager:
44+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
45+
hostname: flink-jobmanager
46+
container_name: flink-jobmanager
47+
ports:
48+
- "9081:9081"
49+
command: jobmanager
50+
environment:
51+
- |
52+
FLINK_PROPERTIES=
53+
jobmanager.rpc.address: flink-jobmanager
54+
rest.bind-port: 9081
55+
56+
flink-taskmanager:
57+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
58+
hostname: flink-taskmanager
59+
container_name: flink-taskmanager
60+
depends_on:
61+
- flink-jobmanager
62+
command: taskmanager
63+
scale: 1
64+
environment:
65+
- |
66+
FLINK_PROPERTIES=
67+
jobmanager.rpc.address: flink-jobmanager
68+
taskmanager.numberOfTaskSlots: 10
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
4+
networkTimeout=10000
5+
validateDistributionUrl=true
6+
zipStoreBase=GRADLE_USER_HOME
7+
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)