Skip to content

Commit 3f041d7

Browse files
authored
chore: Changes for removing CTAS (#1643)
* changes for removing CTAS * changes for failing test * updates for removing CTAS * increase sleep to wait for topic creation * add semicolon to end statement * add await to create table statements * remove space * fix expected output with types
1 parent f22bb95 commit 3f041d7

23 files changed

+123
-60
lines changed

_data/harnesses/hopping-windows/flinksql.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,16 @@ dev:
5555
render:
5656
file: tutorials/hopping-windows/flinksql/markup/dev/create-temperature-by-10min-window.adoc
5757

58+
- file: tutorial-steps/dev/populate-temperature-by-10min-window.sql
59+
render:
60+
file: tutorials/hopping-windows/flinksql/markup/dev/populate-temperature-by-10min-window.adoc
61+
5862
stdout:
5963
directory: tutorial-steps/dev/outputs
6064

6165
- name: wait for table execution to complete
6266
action: sleep
63-
ms: 10000
67+
ms: 30000
6468
render:
6569
skip: true
6670

@@ -114,6 +118,11 @@ test:
114118
render:
115119
file: tutorials/hopping-windows/flinksql/markup/test/create-resource-create-temperature-by-10min-window.sql.template.adoc
116120

121+
- action: make_file
122+
file: src/test/resources/populate-temperature-by-10min-window.sql
123+
render:
124+
file: tutorials/hopping-windows/flinksql/markup/test/populate-resource-temperature-by-10min-window.sql.adoc
125+
117126
- action: make_file
118127
file: src/test/resources/query-temperature-by-10min-window.sql
119128
render:

_data/harnesses/tumbling-windows/flinksql.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,16 @@ dev:
5555
render:
5656
file: tutorials/tumbling-windows/flinksql/markup/dev/create-ratings-by-6hr-window.adoc
5757

58+
- file: tutorial-steps/dev/populate-ratings-by-6hr-window.sql
59+
render:
60+
file: tutorials/tumbling-windows/flinksql/markup/dev/populate-ratings-by-6hr-window.adoc
61+
5862
stdout:
5963
directory: tutorial-steps/dev/outputs
6064

6165
- name: wait for table execution to complete
6266
action: sleep
63-
ms: 10000
67+
ms: 30000
6468
render:
6569
skip: true
6670

@@ -114,6 +118,11 @@ test:
114118
render:
115119
file: tutorials/tumbling-windows/flinksql/markup/test/create-resource-create-ratings-by-6hr-window.sql.template.adoc
116120

121+
- action: make_file
122+
file: src/test/resources/populate-ratings-by-6hr-window.sql
123+
render:
124+
file: tutorials/tumbling-windows/flinksql/markup/test/create-resource-populate-ratings-by-6hr-window.sql.adoc
125+
117126
- action: make_file
118127
file: src/test/resources/query-ratings-by-6hr-window.sql
119128
render:

_includes/tutorials/hopping-windows/flinksql/code/settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
* in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
88
*/
99

10-
rootProject.name = 'aggregating-count-flinksql'
10+
rootProject.name = 'hopping-windows-flinksql'

_includes/tutorials/hopping-windows/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlHoppingWindowTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public void testHoppingWindows() throws Exception {
1717
Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
1818
streamTableEnv.executeSql(getResourceFileContents("populate-temperature-readings.sql"));
1919
streamTableEnv.executeSql(getResourceFileContents("create-temperature-by-10min-window.sql.template",
20-
Optional.of(kafkaPort), Optional.of(schemaRegistryPort)));
20+
Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
21+
streamTableEnv.executeSql(getResourceFileContents("populate-temperature-by-10min-window.sql"));
2122

2223
TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-temperature-by-10min-window.sql"));
2324

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
CREATE TABLE temperature_by_10min_window
2-
WITH (
1+
CREATE TABLE temperature_by_10min_window (
2+
sensor_id INT,
3+
avg_temperature DOUBLE,
4+
window_start TIMESTAMP(3),
5+
window_end TIMESTAMP(3)
6+
) WITH (
37
'connector' = 'kafka',
48
'topic' = 'temperature-by-10min-window',
59
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
@@ -10,10 +14,4 @@ WITH (
1014
'value.format' = 'avro-confluent',
1115
'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT',
1216
'value.fields-include' = 'ALL'
13-
) AS
14-
SELECT sensor_id,
15-
AVG(temperature) AS avg_temperature,
16-
window_start,
17-
window_end
18-
FROM TABLE(HOP(TABLE temperature_readings, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
19-
GROUP BY sensor_id, window_start, window_end;
17+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
INSERT INTO temperature_by_10min_window
2+
SELECT sensor_id,
3+
AVG(temperature) AS avg_temperature,
4+
window_start,
5+
window_end
6+
FROM TABLE(HOP(TABLE temperature_readings, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
7+
GROUP BY sensor_id, window_start, window_end;
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
CREATE TABLE temperature_by_10min_window
2-
WITH (
1+
CREATE TABLE temperature_by_10min_window (
2+
sensor_id INT,
3+
avg_temperature DOUBLE,
4+
window_start TIMESTAMP(3),
5+
window_end TIMESTAMP(3)
6+
) WITH (
37
'connector' = 'kafka',
48
'topic' = 'temperature-by-10min-window',
59
'properties.bootstrap.servers' = 'broker:9092',
@@ -10,10 +14,4 @@ WITH (
1014
'value.format' = 'avro-confluent',
1115
'value.avro-confluent.url' = 'http://schema-registry:8081',
1216
'value.fields-include' = 'ALL'
13-
) AS
14-
SELECT sensor_id,
15-
AVG(temperature) AS avg_temperature,
16-
window_start,
17-
window_end
18-
FROM TABLE(HOP(TABLE temperature_readings, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
19-
GROUP BY sensor_id, window_start, window_end;
17+
);
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
{"sensor_id":{"int":0},"avg_temperature":{"double":55.0},"window_start":1673748600000,"window_end":1673749200000}
2-
{"sensor_id":{"int":0},"avg_temperature":{"double":52.5},"window_start":1673748900000,"window_end":1673749500000}
3-
{"sensor_id":{"int":0},"avg_temperature":{"double":47.5},"window_start":1673749200000,"window_end":1673749800000}
4-
{"sensor_id":{"int":0},"avg_temperature":{"double":42.5},"window_start":1673749500000,"window_end":1673750100000}
5-
{"sensor_id":{"int":0},"avg_temperature":{"double":42.5},"window_start":1673749800000,"window_end":1673750400000}
6-
{"sensor_id":{"int":0},"avg_temperature":{"double":47.5},"window_start":1673750100000,"window_end":1673750700000}
7-
{"sensor_id":{"int":0},"avg_temperature":{"double":52.5},"window_start":1673750400000,"window_end":1673751000000}
1+
{"sensor_id":{"int":0},"avg_temperature":{"double":55.0},"window_start":{"long":1673748600000},"window_end":{"long":1673749200000}}
2+
{"sensor_id":{"int":0},"avg_temperature":{"double":52.5},"window_start":{"long":1673748900000},"window_end":{"long":1673749500000}}
3+
{"sensor_id":{"int":0},"avg_temperature":{"double":47.5},"window_start":{"long":1673749200000},"window_end":{"long":1673749800000}}
4+
{"sensor_id":{"int":0},"avg_temperature":{"double":42.5},"window_start":{"long":1673749500000},"window_end":{"long":1673750100000}}
5+
{"sensor_id":{"int":0},"avg_temperature":{"double":42.5},"window_start":{"long":1673749800000},"window_end":{"long":1673750400000}}
6+
{"sensor_id":{"int":0},"avg_temperature":{"double":47.5},"window_start":{"long":1673750100000},"window_end":{"long":1673750700000}}
7+
{"sensor_id":{"int":0},"avg_temperature":{"double":52.5},"window_start":{"long":1673750400000},"window_end":{"long":1673751000000}}
88
Processed a total of 7 messages
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
INSERT INTO temperature_by_10min_window
2+
SELECT sensor_id,
3+
AVG(temperature) AS avg_temperature,
4+
window_start,
5+
window_end
6+
FROM TABLE(HOP(TABLE temperature_readings, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
7+
GROUP BY sensor_id, window_start, window_end;

_includes/tutorials/hopping-windows/flinksql/markup/dev/create-temperature-by-10min-window.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Since the output of our transient query looks right, the next step is to make it persistent with the following `CREATE TABLE AS SELECT` statement. Go ahead and execute the following statement in your Flink SQL session:
1+
Since the output of our transient query looks right, the next step is to make it persistent with the following `CREATE TABLE` statement. Go ahead and execute the following statement in your Flink SQL session:
22

33
+++++
44
<pre class="snippet"><code class="sql">{% include_raw tutorials/hopping-windows/flinksql/code/tutorial-steps/dev/create-temperature-by-10min-window.sql %}</code></pre>

0 commit comments

Comments
 (0)