Skip to content

Commit 7c7da4c

Browse files
authored
chore: convert splitting and cumulating windows Flink SQL tutorials to use INSERT INTO (#1644)
1 parent 3f041d7 commit 7c7da4c

27 files changed

+121
-50
lines changed

_data/harnesses/cumulating-windows/flinksql.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ dev:
5555
render:
5656
file: tutorials/cumulating-windows/flinksql/markup/dev/create-revenue-per-hour-cumulating.adoc
5757

58+
- file: tutorial-steps/dev/populate-revenue-per-hour-cumulating.sql
59+
render:
60+
file: tutorials/cumulating-windows/flinksql/markup/dev/populate-revenue-per-hour-cumulating.adoc
61+
5862
stdout:
5963
directory: tutorial-steps/dev/outputs
6064

@@ -114,6 +118,11 @@ test:
114118
render:
115119
file: tutorials/cumulating-windows/flinksql/markup/test/create-resource-create-revenue-per-hour-cumulating.sql.template.adoc
116120

121+
- action: make_file
122+
file: src/test/resources/populate-revenue-per-hour-cumulating.sql
123+
render:
124+
file: tutorials/cumulating-windows/flinksql/markup/test/create-resource-populate-revenue-per-hour-cumulating.sql.adoc
125+
117126
- action: make_file
118127
file: src/test/resources/query-revenue-per-hour-cumulating.sql
119128
render:

_data/harnesses/splitting/flinksql.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,26 @@ dev:
5959
render:
6060
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc
6161

62+
- file: tutorial-steps/dev/populate-acting-events-drama.sql
63+
render:
64+
file: tutorials/splitting/flinksql/markup/dev/populate-acting-events-drama.adoc
65+
6266
- file: tutorial-steps/dev/create-acting-events-fantasy.sql
6367
render:
6468
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc
6569

70+
- file: tutorial-steps/dev/populate-acting-events-fantasy.sql
71+
render:
72+
file: tutorials/splitting/flinksql/markup/dev/populate-acting-events-fantasy.adoc
73+
6674
- file: tutorial-steps/dev/create-acting-events-other.sql
6775
render:
6876
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc
6977

78+
- file: tutorial-steps/dev/populate-acting-events-other.sql
79+
render:
80+
file: tutorials/splitting/flinksql/markup/dev/populate-acting-events-other.adoc
81+
7082
stdout:
7183
directory: tutorial-steps/dev/outputs
7284

@@ -120,6 +132,11 @@ test:
120132
render:
121133
file: tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc
122134

135+
- action: make_file
136+
file: src/test/resources/populate-acting-events-drama.sql
137+
render:
138+
file: tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events-drama.sql.adoc
139+
123140
- action: make_file
124141
file: src/test/resources/query-acting-events-drama.sql
125142
render:

_includes/tutorials/cumulating-windows/flinksql/code/settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
* in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
88
*/
99

10-
rootProject.name = 'aggregating-count-flinksql'
10+
rootProject.name = 'cumulating-windows-flinksql'

_includes/tutorials/cumulating-windows/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlCumulatingWindowTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public void testCumulatingWindows() throws Exception {
1717
Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
1818
streamTableEnv.executeSql(getResourceFileContents("populate-orders.sql"));
1919
streamTableEnv.executeSql(getResourceFileContents("create-revenue-per-hour-cumulating.sql.template",
20+
Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
21+
streamTableEnv.executeSql(getResourceFileContents("populate-revenue-per-hour-cumulating.sql",
2022
Optional.of(kafkaPort), Optional.of(schemaRegistryPort)));
2123

2224
TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-revenue-per-hour-cumulating.sql"));
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
CREATE TABLE revenue_per_hour_cumulating
2-
WITH (
1+
CREATE TABLE revenue_per_hour_cumulating (
2+
revenue DOUBLE,
3+
window_start TIMESTAMP(3),
4+
window_end TIMESTAMP(3)
5+
) WITH (
36
'connector' = 'kafka',
47
'topic' = 'revenue-per-hour-cumulating',
58
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
@@ -10,9 +13,4 @@ WITH (
1013
'value.format' = 'avro-confluent',
1114
'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT',
1215
'value.fields-include' = 'ALL'
13-
) AS
14-
SELECT ROUND(SUM(quantity * unit_price), 2) AS revenue,
15-
window_start,
16-
window_end
17-
FROM TABLE(CUMULATE(TABLE orders, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
18-
GROUP BY window_start, window_end;
16+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
INSERT INTO revenue_per_hour_cumulating
2+
SELECT ROUND(SUM(quantity * unit_price), 2) AS revenue,
3+
window_start,
4+
window_end
5+
FROM TABLE(CUMULATE(TABLE orders, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
6+
GROUP BY window_start, window_end;
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
CREATE TABLE revenue_per_hour_cumulating
2-
WITH (
1+
CREATE TABLE revenue_per_hour_cumulating (
2+
revenue DOUBLE,
3+
window_start TIMESTAMP(3),
4+
window_end TIMESTAMP(3)
5+
) WITH (
36
'connector' = 'kafka',
47
'topic' = 'revenue-per-hour-cumulating',
58
'properties.bootstrap.servers' = 'broker:9092',
@@ -10,9 +13,4 @@ WITH (
1013
'value.format' = 'avro-confluent',
1114
'value.avro-confluent.url' = 'http://schema-registry:8081',
1215
'value.fields-include' = 'ALL'
13-
) AS
14-
SELECT ROUND(SUM(quantity * unit_price), 2) AS revenue,
15-
window_start,
16-
window_end
17-
FROM TABLE(CUMULATE(TABLE orders, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
18-
GROUP BY window_start, window_end;
16+
);
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
{"revenue":{"double":13.96},"window_start":1673748000000,"window_end":1673748300000}
2-
{"revenue":{"double":33.94},"window_start":1673748000000,"window_end":1673748600000}
3-
{"revenue":{"double":9.99},"window_start":1673748600000,"window_end":1673749200000}
4-
{"revenue":{"double":26.67},"window_start":1673749200000,"window_end":1673749500000}
5-
{"revenue":{"double":61.57},"window_start":1673749200000,"window_end":1673749800000}
1+
{"revenue":{"double":13.96},"window_start":{"long":1673748000000},"window_end":{"long":1673748300000}}
2+
{"revenue":{"double":33.94},"window_start":{"long":1673748000000},"window_end":{"long":1673748600000}}
3+
{"revenue":{"double":9.99},"window_start":{"long":1673748600000},"window_end":{"long":1673749200000}}
4+
{"revenue":{"double":26.67},"window_start":{"long":1673749200000},"window_end":{"long":1673749500000}}
5+
{"revenue":{"double":61.57},"window_start":{"long":1673749200000},"window_end":{"long":1673749800000}}
66
Processed a total of 5 messages
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
INSERT INTO revenue_per_hour_cumulating
2+
SELECT ROUND(SUM(quantity * unit_price), 2) AS revenue,
3+
window_start,
4+
window_end
5+
FROM TABLE(CUMULATE(TABLE orders, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
6+
GROUP BY window_start, window_end;

_includes/tutorials/cumulating-windows/flinksql/markup/dev/create-revenue-per-hour-cumulating.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Since the output of our transient query looks right, the next step is to make it persistent with the following `CREATE TABLE AS SELECT` statement. Go ahead and execute the following statement in your Flink SQL session:
1+
Since the output of our transient query looks right, the next step is to make it persistent with the following statements. Go ahead and execute them in your Flink SQL session:
22

33
+++++
44
<pre class="snippet"><code class="sql">{% include_raw tutorials/cumulating-windows/flinksql/code/tutorial-steps/dev/create-revenue-per-hour-cumulating.sql %}</code></pre>

0 commit comments

Comments
 (0)