Skip to content

Commit b12a53b

Browse files
authored
feat: Flinksql new tutorial stream stream join (#1611)
* Merge rebase commits from master * Fixed tutorial harness file * Change command in validation from make_file to execute * Ugh - When will the bleeding stop?? corrected file name * Adding some missing files * Missed one from last time * Make consumer sync * Address review comments add pause for validation step * Move sleep outside of stdout block
1 parent 8fe8009 commit b12a53b

File tree

77 files changed

+1405
-9
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1405
-9
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ harness_runner/harness_runner.pyc
3030
# Ignore Gradle build output directory
3131
build
3232

33-
**/tutorial-steps/dev/outputs/
33+
34+
**/tutorial-steps/dev/outputs/

.semaphore/semaphore.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,6 @@ blocks:
490490
- name: Flink SQL test for aggregating count
491491
commands:
492492
- make -C _includes/tutorials/aggregating-count/flinksql/code tutorial
493+
- name: Flink SQL test for joins
494+
commands:
495+
- make -C _includes/tutorials/joining-stream-stream/flinksql/code tutorial
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
dev:
2+
steps:
3+
- title: Prerequisites
4+
content:
5+
- action: skip
6+
render:
7+
file: shared/markup/dev/docker-prerequisite.adoc
8+
9+
- title: Initialize the project
10+
content:
11+
- action: execute
12+
file: tutorial-steps/dev/init.sh
13+
render:
14+
file: tutorials/joining-stream-stream/flinksql/markup/dev/init.adoc
15+
16+
- title: Get Confluent Platform
17+
content:
18+
- action: make_file
19+
file: docker-compose.yml
20+
render:
21+
file: tutorials/joining-stream-stream/flinksql/markup/dev/make-docker-compose.adoc
22+
- action: execute_async
23+
file: tutorial-steps/dev/docker-compose-up.sh
24+
render:
25+
file: tutorials/joining-stream-stream/flinksql/markup/dev/start-compose.adoc
26+
- action: execute
27+
file: tutorial-steps/dev/wait-for-containers.sh
28+
render:
29+
skip: true
30+
31+
- title: Write the program interactively using the CLI
32+
content:
33+
- action: docker_flinksql_cli_session
34+
container: flink-sql-client
35+
docker_bootup_file: tutorial-steps/dev/start-cli.sh
36+
column_width: 20
37+
render:
38+
file: tutorials/aggregating-count/flinksql/markup/dev/start-cli.adoc
39+
stdin:
40+
- file: tutorial-steps/dev/create-orders-table.sql
41+
render:
42+
file: tutorials/joining-stream-stream/flinksql/markup/dev/create-orders-table.adoc
43+
44+
- file: tutorial-steps/dev/create-shipments-table.sql
45+
render:
46+
file: tutorials/joining-stream-stream/flinksql/markup/dev/create-shipments-table.adoc
47+
48+
- file: tutorial-steps/dev/populate-orders.sql
49+
render:
50+
file: tutorials/joining-stream-stream/flinksql/markup/dev/populate-orders.adoc
51+
52+
- file: tutorial-steps/dev/populate-shipments.sql
53+
render:
54+
file: tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipments.adoc
55+
56+
- file: tutorial-steps/dev/create-join-results-table.sql
57+
render:
58+
file: tutorials/joining-stream-stream/flinksql/markup/dev/transient-join.adoc
59+
60+
- file: tutorial-steps/dev/populate-shipped-orders-table.sql
61+
render:
62+
file: tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipped-orders-table.adoc
63+
64+
stdout:
65+
directory: tutorial-steps/dev/outputs
66+
67+
- name: wait for table execution to complete
68+
action: sleep
69+
ms: 10000
70+
render:
71+
skip: true
72+
73+
- title: Validate output
74+
content:
75+
- action: execute
76+
file: tutorial-steps/dev/validate-shipped-orders.sh
77+
stdout: tutorial-steps/dev/outputs/actual-shipped-orders.txt
78+
render:
79+
file: tutorials/joining-stream-stream/flinksql/markup/dev/validate-shipped-orders-table.adoc
80+
81+
test:
82+
steps:
83+
- title: Decide what testing tools to use
84+
content:
85+
- action: skip
86+
render:
87+
file: tutorials/joining-stream-stream/flinksql/markup/test/test-architecture.adoc
88+
89+
- title: Create the test skeleton
90+
content:
91+
- action: execute
92+
file: tutorial-steps/test/make-test-dirs.sh
93+
render:
94+
file: tutorials/joining-stream-stream/flinksql/markup/test/make-test-dirs.adoc
95+
96+
- action: make_file
97+
file: build.gradle
98+
render:
99+
file: tutorials/joining-stream-stream/flinksql/markup/test/make-build-gradle.adoc
100+
101+
- action: execute
102+
file: tutorial-steps/test/gradle-wrapper.sh
103+
render:
104+
file: tutorials/joining-stream-stream/flinksql/markup/test/make-gradle-wrapper.adoc
105+
106+
- title: Create SQL resources
107+
content:
108+
- action: make_file
109+
file: src/test/resources/create-orders.sql.template
110+
render:
111+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-create-orders.sql.template.adoc
112+
113+
- action: make_file
114+
file: src/test/resources/create-shipments.sql.template
115+
render:
116+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-create-shipments.sql.template.adoc
117+
118+
- action: make_file
119+
file: src/test/resources/populate-orders.sql
120+
render:
121+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-populate-orders.sql.adoc
122+
123+
- action: make_file
124+
file: src/test/resources/populate-shipments.sql
125+
render:
126+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-populate-shipments.sql.adoc
127+
128+
- action: make_file
129+
file: src/test/resources/create-shipped-orders.sql.template
130+
render:
131+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-shipped-orders.sql.adoc
132+
133+
- action: make_file
134+
file: src/test/resources/populate-shipped-orders-table.sql
135+
render:
136+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-populate-shipped-orders-table.sql.adoc
137+
138+
- action: make_file
139+
file: src/test/resources/query-join-order-shipments.sql
140+
render:
141+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-query-join-order-shipments.sql.adoc
142+
143+
- action: make_file
144+
file: src/test/resources/expected-shipped-orders.txt
145+
render:
146+
file: tutorials/joining-stream-stream/flinksql/markup/test/create-resource-expected-shipped-orders.txt.adoc
147+
148+
- title: Write a test
149+
content:
150+
- action: make_file
151+
file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
152+
render:
153+
file: tutorials/joining-stream-stream/flinksql/markup/test/make-test-base.adoc
154+
155+
- action: make_file
156+
file: src/test/java/io/confluent/developer/FlinkSqlIntervalJoinTest.java
157+
render:
158+
file: tutorials/joining-stream-stream/flinksql/markup/test/make-test.adoc
159+
160+
- title: Invoke the test
161+
content:
162+
- action: execute
163+
file: tutorial-steps/test/invoke-test.sh
164+
render:
165+
file: tutorials/joining-stream-stream/flinksql/markup/test/invoke-test.adoc
166+
167+
ccloud:
168+
steps:
169+
- title: Run your app to Confluent Cloud
170+
content:
171+
- action: skip
172+
render:
173+
file: shared/markup/ccloud/try-ccloud.adoc

_data/tutorials.yaml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ joining-stream-stream:
107107
status:
108108
ksql: enabled
109109
kstreams: enabled
110+
flinksql: enabled
110111
fk-joins:
111112
title: How to join a table and a table with a foreign key
112113
meta-description: join two tables with different primary keys
@@ -1529,14 +1530,19 @@ survey-responses:
15291530
confluent: disabled
15301531
versioned-ktables:
15311532
title: Versioned KTables for temporal join accuracy
1532-
meta-description: Ensure proper stream-table temporal join semantics using a versioned state store to back your KTable
1533+
meta-description: Ensure proper stream-table temporal join semantics using a versioned
1534+
state store to back your KTable
15331535
canonical: confluent
15341536
slug: /versioned-ktables
15351537
question: How can you ensure proper temporal semantics in stream-table joins?
1536-
introduction: It used to be when Kafka Streams executes a stream-table join the stream side event would join the the latest available
1537-
record with the same key on the table side. But sometimes it's important for the stream event to match up with a table record by timestamp
1538-
as well as key, think of a stream of stock transactions and a table of stock prices - it's essential the transaction joins with the stock price
1539-
at the time of the transaction, not the latest price. A versioned state store tracks multiple record versions for the same key,
1540-
rather than the single latest record per key, as is the case for standard non-versioned stores.
1538+
introduction: It used to be when Kafka Streams executes a stream-table join the
1539+
stream side event would join the the latest available record with the same key
1540+
on the table side. But sometimes it's important for the stream event to match
1541+
up with a table record by timestamp as well as key, think of a stream of stock
1542+
transactions and a table of stock prices - it's essential the transaction joins
1543+
with the stock price at the time of the transaction, not the latest price. A
1544+
versioned state store tracks multiple record versions for the same key, rather
1545+
than the single latest record per key, as is the case for standard non-versioned
1546+
stores.
15411547
status:
15421548
kstreams: enabled

_includes/tutorials/aggregating-count/flinksql/markup/test/make-test.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
44
<pre class="snippet"><code class="java">{% include_raw tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlAggregatingCountTest.java %}</code></pre>
55
+++++
66

7-
The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
7+
The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
extensions/
2+
tutorial-steps/dev/outputs/
3+
tutorial-steps/test/outputs/
4+
tutorial-steps/prod/outputs/
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
STEPS_DIR := tutorial-steps
2+
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
3+
TEST_OUTPUTS_DIR := $(STEPS_DIR)/test/outputs
4+
PROD_OUTPUTS_DIR := $(STEPS_DIR)/prod/outputs
5+
TEMP_DIR := $(shell mktemp -d)
6+
SEQUENCE := "dev, test, prod, ccloud"
7+
8+
tutorial:
9+
rm -r $(DEV_OUTPUTS_DIR) || true
10+
rm -r $(TEST_OUTPUTS_DIR) || true
11+
mkdir $(DEV_OUTPUTS_DIR)
12+
mkdir $(TEST_OUTPUTS_DIR)
13+
harness-runner ../../../../../_data/harnesses/joining-stream-stream/flinksql.yml $(TEMP_DIR) $(SEQUENCE)
14+
bash -c "diff --strip-trailing-cr <(cat $(STEPS_DIR)/dev/expected-print-output-topic.log | sort) <(cat $(DEV_OUTPUTS_DIR)/actual-shipped-orders.txt | sort)"
15+
16+
reset
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
}
6+
7+
plugins {
8+
id "java"
9+
}
10+
11+
sourceCompatibility = JavaVersion.VERSION_11
12+
targetCompatibility = JavaVersion.VERSION_11
13+
version = "0.0.1"
14+
15+
repositories {
16+
mavenCentral()
17+
}
18+
19+
dependencies {
20+
testImplementation "com.google.guava:guava:31.1-jre"
21+
testImplementation "junit:junit:4.13.2"
22+
testImplementation 'org.testcontainers:testcontainers:1.17.6'
23+
testImplementation 'org.testcontainers:kafka:1.17.6'
24+
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.17.1"
25+
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.17.1"
26+
testImplementation "org.apache.flink:flink-test-utils:1.17.1"
27+
testImplementation "org.apache.flink:flink-test-utils-junit:1.17.1"
28+
testImplementation 'org.apache.flink:flink-json:1.17.1'
29+
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.17.0"
30+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1"
31+
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1:tests"
32+
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.17.1"
33+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
---
2+
version: '2'
3+
4+
services:
5+
broker:
6+
image: confluentinc/cp-kafka:7.4.1
7+
hostname: broker
8+
container_name: broker
9+
ports:
10+
- 9092:9092
11+
environment:
12+
KAFKA_BROKER_ID: 1
13+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
14+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
15+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
16+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
17+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
18+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
19+
KAFKA_PROCESS_ROLES: 'broker,controller'
20+
KAFKA_NODE_ID: 1
21+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
22+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
23+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
24+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
25+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
26+
# This is done for running a single broker in combined mode for local development only
27+
# For multi-node deployments you should generate using the following
28+
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
29+
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
30+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
31+
32+
flink-sql-client:
33+
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
34+
hostname: flink-sql-client
35+
container_name: flink-sql-client
36+
depends_on:
37+
- flink-jobmanager
38+
environment:
39+
FLINK_JOBMANAGER_HOST: flink-jobmanager
40+
volumes:
41+
- ./settings/:/settings
42+
43+
flink-jobmanager:
44+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
45+
hostname: flink-jobmanager
46+
container_name: flink-jobmanager
47+
ports:
48+
- "9081:9081"
49+
command: jobmanager
50+
environment:
51+
- |
52+
FLINK_PROPERTIES=
53+
jobmanager.rpc.address: flink-jobmanager
54+
rest.bind-port: 9081
55+
56+
flink-taskmanager:
57+
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
58+
hostname: flink-taskmanager
59+
container_name: flink-taskmanager
60+
depends_on:
61+
- flink-jobmanager
62+
command: taskmanager
63+
scale: 1
64+
environment:
65+
- |
66+
FLINK_PROPERTIES=
67+
jobmanager.rpc.address: flink-jobmanager
68+
taskmanager.numberOfTaskSlots: 10

0 commit comments

Comments
 (0)