Skip to content

Commit c68146a

Browse files
authored
Remove adding hardcoded dates use rowtime instead (#1579)
1 parent 2ebbb8a commit c68146a

File tree

14 files changed

+70
-77
lines changed

14 files changed

+70
-77
lines changed

_includes/tutorials/finding-distinct/ksql/code/Makefile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ tutorial:
1111
mkdir $(DEV_OUTPUTS_DIR)
1212
mkdir -p $(TEST_OUTPUTS_DIR)
1313
harness-runner ../../../../../_data/harnesses/finding-distinct/ksql.yml $(TEMP_DIR) $(SEQUENCE)
14-
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-transient-window.log $(DEV_OUTPUTS_DIR)/transient-window/output-0.log
15-
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-transient-query.log $(DEV_OUTPUTS_DIR)/transient-query/output-0.log
16-
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-print-topic.log $(DEV_OUTPUTS_DIR)/print-topic/output-0.log
14+
1715
bash -c "diff --strip-trailing-cr <(sort $(STEPS_DIR)/test/expected-results.log) <(sort $(TEST_OUTPUTS_DIR)/test-results.log)"
1816
reset

_includes/tutorials/finding-distinct/ksql/code/src/statements.sql

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
1-
CREATE STREAM CLICKS (IP_ADDRESS STRING, URL STRING, TIMESTAMP STRING)
1+
CREATE STREAM CLICKS (IP_ADDRESS STRING, URL STRING)
22
WITH (KAFKA_TOPIC = 'CLICKS',
33
FORMAT = 'JSON',
4-
TIMESTAMP = 'TIMESTAMP',
5-
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ssXXX',
64
PARTITIONS = 1);
75

86
CREATE TABLE DETECTED_CLICKS AS
97
SELECT
108
IP_ADDRESS AS KEY1,
119
URL AS KEY2,
12-
TIMESTAMP AS KEY3,
1310
AS_VALUE(IP_ADDRESS) AS IP_ADDRESS,
1411
COUNT(IP_ADDRESS) as IP_COUNT,
1512
AS_VALUE(URL) AS URL,
16-
AS_VALUE(TIMESTAMP) AS TIMESTAMP
13+
FORMAT_TIMESTAMP(FROM_UNIXTIME(EARLIEST_BY_OFFSET(ROWTIME)), 'yyyy-MM-dd HH:mm:ss.SSS') AS TIMESTAMP
1714
FROM CLICKS WINDOW TUMBLING (SIZE 2 MINUTES, RETENTION 1000 DAYS)
18-
GROUP BY IP_ADDRESS, URL, TIMESTAMP;
15+
GROUP BY IP_ADDRESS, URL;
1916

2017
CREATE STREAM RAW_VALUES_CLICKS (IP_ADDRESS STRING, IP_COUNT BIGINT, URL STRING, TIMESTAMP STRING)
2118
WITH (KAFKA_TOPIC = 'DETECTED_CLICKS',

_includes/tutorials/finding-distinct/ksql/code/test/input.json

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,50 @@
22
"inputs": [
33
{
44
"topic": "CLICKS",
5+
"timestamp": 1685644010383,
56
"value": {
67
"IP_ADDRESS": "10.0.0.1",
7-
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
8-
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
8+
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html"
99
}
1010
},
1111
{
1212
"topic": "CLICKS",
13+
"timestamp": 1685644295242,
1314
"value": {
1415
"IP_ADDRESS": "10.0.0.12",
15-
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
16-
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
16+
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
1717
}
1818
},
1919
{
2020
"topic": "CLICKS",
21+
"timestamp": 1685644521624,
2122
"value": {
2223
"IP_ADDRESS": "10.0.0.13",
23-
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
24-
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
24+
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
2525
}
2626
},
2727
{
2828
"topic": "CLICKS",
29+
"timestamp": 1685644010383,
2930
"value": {
3031
"IP_ADDRESS": "10.0.0.1",
31-
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
32-
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
32+
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html"
3333
}
3434
},
3535
{
3636
"topic": "CLICKS",
37+
"timestamp": 1685644295242,
3738
"value": {
3839
"IP_ADDRESS": "10.0.0.12",
39-
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
40-
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
40+
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
4141
}
4242
},
4343
{
4444
"topic": "CLICKS",
45+
"timestamp": 1685644521624,
4546
"value": {
4647
"IP_ADDRESS": "10.0.0.13",
47-
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
48-
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
48+
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
4949
}
5050
}
5151
]

_includes/tutorials/finding-distinct/ksql/code/test/output.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,27 @@
55
"key": "10.0.0.1",
66
"value": {
77
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
8-
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
8+
"TIMESTAMP": "2023-06-01 18:26:50.383"
99
},
10-
"timestamp": 1610895043000
10+
"timestamp": 1685644010383
1111
},
1212
{
1313
"topic": "DISTINCT_CLICKS",
1414
"key": "10.0.0.12",
1515
"value": {
1616
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
17-
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
17+
"TIMESTAMP": "2023-06-01 18:31:35.242"
1818
},
19-
"timestamp": 1610895164000
19+
"timestamp": 1685644295242
2020
},
2121
{
2222
"topic": "DISTINCT_CLICKS",
2323
"key": "10.0.0.13",
2424
"value": {
2525
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
26-
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
26+
"TIMESTAMP": "2023-06-01 18:35:21.624"
2727
},
28-
"timestamp": 1610895225000
28+
"timestamp": 1685644521624
2929
}
3030
]
3131
}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
CREATE STREAM CLICKS (IP_ADDRESS VARCHAR, URL VARCHAR, TIMESTAMP VARCHAR)
1+
CREATE STREAM CLICKS (IP_ADDRESS VARCHAR, URL VARCHAR)
22
WITH (KAFKA_TOPIC = 'CLICKS',
33
FORMAT = 'JSON',
4-
TIMESTAMP = 'TIMESTAMP',
5-
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ssXXX',
64
PARTITIONS = 1);

_includes/tutorials/finding-distinct/ksql/code/tutorial-steps/dev/create-outputs.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ CREATE TABLE DETECTED_CLICKS AS
22
SELECT
33
IP_ADDRESS AS KEY1,
44
URL AS KEY2,
5-
TIMESTAMP AS KEY3,
65
AS_VALUE(IP_ADDRESS) AS IP_ADDRESS,
76
COUNT(IP_ADDRESS) as IP_COUNT,
87
AS_VALUE(URL) AS URL,
9-
AS_VALUE(TIMESTAMP) AS TIMESTAMP
8+
FORMAT_TIMESTAMP(FROM_UNIXTIME(EARLIEST_BY_OFFSET(ROWTIME)), 'yyyy-MM-dd HH:mm:ss.SSS') AS TIMESTAMP
109
FROM CLICKS WINDOW TUMBLING (SIZE 2 MINUTES, RETENTION 1000 DAYS)
11-
GROUP BY IP_ADDRESS, URL, TIMESTAMP;
10+
GROUP BY IP_ADDRESS, URL;
1211

1312
CREATE STREAM RAW_VALUES_CLICKS (IP_ADDRESS VARCHAR, IP_COUNT BIGINT, URL VARCHAR, TIMESTAMP VARCHAR)
1413
WITH (KAFKA_TOPIC = 'DETECTED_CLICKS',
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
22
Value format: JSON or KAFKA_STRING
3-
rowtime: 2021/01/17 14:50:43.000 Z, key: "10.0.0.1", value: {"URL":"https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html","TIMESTAMP":"2021-01-17T14:50:43+00:00"}, partition: 0
4-
rowtime: 2021/01/17 14:52:44.000 Z, key: "10.0.0.12", value: {"URL":"https://www.confluent.io/hub/confluentinc/kafka-connect-datagen","TIMESTAMP":"2021-01-17T14:53:44+00:01"}, partition: 0
5-
rowtime: 2021/01/17 14:53:45.000 Z, key: "10.0.0.13", value: {"URL":"https://www.confluent.io/hub/confluentinc/kafka-connect-datagen","TIMESTAMP":"2021-01-17T14:56:45+00:03"}, partition: 0
6-
Topic printing ceased
3+
rowtime: 2023/05/31 17:40:03.024 Z, key: "10.0.0.1", value: {"URL":"https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html","TIMESTAMP":"2023-05-31 17:40:03.024"}, partition: 0
4+
rowtime: 2023/05/31 17:40:03.052 Z, key: "10.0.0.12", value: {"URL":"https://www.confluent.io/hub/confluentinc/kafka-connect-datagen","TIMESTAMP":"2023-05-31 17:40:03.052"}, partition: 0
5+
rowtime: 2023/05/31 17:40:03.077 Z, key: "10.0.0.13", value: {"URL":"https://www.confluent.io/hub/confluentinc/kafka-connect-datagen","TIMESTAMP":"2023-05-31 17:40:03.077"}, partition: 0
6+
Topic printing ceased
Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
+-----------------------------------------+-----------------------------------------+-----------------------------------------+
2-
|IP_ADDRESS |URL |TIMESTAMP |
3-
+-----------------------------------------+-----------------------------------------+-----------------------------------------+
4-
|10.0.0.1 |https://docs.confluent.io/current/tutoria|2021-01-17T14:50:43+00:00 |
5-
| |ls/examples/kubernetes/gke-base/docs/inde| |
6-
| |x.html | |
7-
|10.0.0.12 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:53:44+00:01 |
8-
| |/kafka-connect-datagen | |
9-
|10.0.0.13 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:56:45+00:03 |
10-
| |/kafka-connect-datagen | |
1+
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
2+
|IP_ADDRESS |URL |TIMESTAMP |
3+
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
4+
|10.0.0.1 |https://docs.confluent.io/current/tutorials/exampl|2023-05-31 17:40:03.024 |
5+
| |es/kubernetes/gke-base/docs/index.html | |
6+
|10.0.0.12 |https://www.confluent.io/hub/confluentinc/kafka-co|2023-05-31 17:40:03.052 |
7+
| |nnect-datagen | |
8+
|10.0.0.13 |https://www.confluent.io/hub/confluentinc/kafka-co|2023-05-31 17:40:03.077 |
9+
| |nnect-datagen | |
1110
Limit Reached
12-
Query terminated
11+
Query terminated
Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+
2-
|IP_ADDRESS |IP_COUNT |URL |TIMESTAMP |
3-
+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+
4-
|10.0.0.1 |1 |https://docs.confluent.io/current/tutoria|2021-01-17T14:50:43+00:00 |
5-
| | |ls/examples/kubernetes/gke-base/docs/inde| |
6-
| | |x.html | |
7-
|10.0.0.12 |1 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:53:44+00:01 |
8-
| | |/kafka-connect-datagen | |
9-
|10.0.0.13 |1 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:56:45+00:03 |
10-
| | |/kafka-connect-datagen | |
11-
|10.0.0.1 |2 |https://docs.confluent.io/current/tutoria|2021-01-17T14:50:43+00:00 |
12-
| | |ls/examples/kubernetes/gke-base/docs/inde| |
13-
| | |x.html | |
14-
|10.0.0.12 |2 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:53:44+00:01 |
15-
| | |/kafka-connect-datagen | |
16-
|10.0.0.13 |2 |https://www.confluent.io/hub/confluentinc|2021-01-17T14:56:45+00:03 |
17-
| | |/kafka-connect-datagen | |
1+
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
2+
|IP_ADDRESS |IP_COUNT |URL |TIMESTAMP |
3+
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
4+
|10.0.0.1 |1 |https://docs.confluent.io/current/tut|2023-05-31 17:40:03.024 |
5+
| | |orials/examples/kubernetes/gke-base/d| |
6+
| | |ocs/index.html | |
7+
|10.0.0.12 |1 |https://www.confluent.io/hub/confluen|2023-05-31 17:40:03.052 |
8+
| | |tinc/kafka-connect-datagen | |
9+
|10.0.0.13 |1 |https://www.confluent.io/hub/confluen|2023-05-31 17:40:03.077 |
10+
| | |tinc/kafka-connect-datagen | |
11+
|10.0.0.1 |2 |https://docs.confluent.io/current/tut|2023-05-31 17:40:03.024 |
12+
| | |orials/examples/kubernetes/gke-base/d| |
13+
| | |ocs/index.html | |
14+
|10.0.0.12 |2 |https://www.confluent.io/hub/confluen|2023-05-31 17:40:03.052 |
15+
| | |tinc/kafka-connect-datagen | |
16+
|10.0.0.13 |2 |https://www.confluent.io/hub/confluen|2023-05-31 17:40:03.077 |
17+
| | |tinc/kafka-connect-datagen | |
1818
Limit Reached
19-
Query terminated
19+
Query terminated
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html', '2021-01-17T14:50:43+00:00');
2-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2021-01-17T14:53:44+00:01');
3-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2021-01-17T14:56:45+00:03');
1+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html');
2+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen');
3+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen');
44

5-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html', '2021-01-17T14:50:43+00:00');
6-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2021-01-17T14:53:44+00:01');
7-
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2021-01-17T14:56:45+00:03');
5+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html');
6+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen');
7+
INSERT INTO CLICKS (IP_ADDRESS, URL) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen');

0 commit comments

Comments
 (0)