Open
Description
The Finding Distinct Events KT is susceptible to returning inconsistent results, because the test data has old timestamps (early 2020):
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html', '2020-01-17T14:50:43+00:00');
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2020-01-17T14:53:44+00:01');
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2020-01-17T14:56:45+00:03');
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.1', 'https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html', '2020-01-17T14:50:43+00:00');
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.12', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2020-01-17T14:53:44+00:01');
INSERT INTO CLICKS (IP_ADDRESS, URL, TIMESTAMP) VALUES ('10.0.0.13', 'https://www.confluent.io/hub/confluentinc/kafka-connect-datagen', '2020-01-17T14:56:45+00:03');
which can cause results to be deleted by Kafka topic retention policy almost immediately after being produced:
broker | [2021-01-28 06:28:51,398] INFO [Log partition=DETECTED_CLICKS-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log)
The test passes for this KT, because it's not using real topics with retention policies.
A couple of ways to address this:
-
Increase timestamps in test records above to a more recent date (would require ongoing maintenance to keep current).
-
Set longer or infinite retention for topics in the test cluster. For example, add the following to the
docker-compose
config for the broker:
KAFKA_LOG_RETENTION_MS: -1
In this case, would also need to increase internal windowed changelog retention in the CTAS. For example:
CREATE TABLE DETECTED_CLICKS AS
SELECT
IP_ADDRESS AS KEY1,
URL AS KEY2,
TIMESTAMP AS KEY3,
AS_VALUE(IP_ADDRESS) AS IP_ADDRESS,
AS_VALUE(URL) AS URL,
AS_VALUE(TIMESTAMP) AS TIMESTAMP
FROM CLICKS WINDOW TUMBLING (SIZE 2 MINUTES, RETENTION 3650 DAYS)
GROUP BY IP_ADDRESS, URL, TIMESTAMP
HAVING COUNT(IP_ADDRESS) = 1;