@@ -557,7 +557,7 @@ def on_message_processed(*_):
557
557
558
558
processed_count = 0
559
559
560
- timestamp = 1000
560
+ timestamp_ms = int ( time . time () * 1000 )
561
561
user_id = "abc123"
562
562
value_in = {"user" : user_id }
563
563
expected_message_count = 1
@@ -589,7 +589,7 @@ def on_message_processed(*_):
589
589
590
590
with app .get_producer () as producer :
591
591
msg = app_topic_in .serialize (
592
- key = "some_key" , value = value_in , timestamp_ms = timestamp
592
+ key = "some_key" , value = value_in , timestamp_ms = timestamp_ms
593
593
)
594
594
producer .produce (
595
595
app_topic_in .name , key = msg .key , value = msg .value , timestamp = msg .timestamp
@@ -615,7 +615,7 @@ def on_message_processed(*_):
615
615
# as original one
616
616
assert row .value == {
617
617
"user" : user_id ,
618
- "groupby_timestamp" : timestamp ,
618
+ "groupby_timestamp" : timestamp_ms ,
619
619
}
620
620
621
621
@pytest .mark .parametrize ("processing_guarantee" , ["exactly-once" , "at-least-once" ])
@@ -643,7 +643,10 @@ def on_message_processed(*_):
643
643
644
644
processed_count = 0
645
645
646
- timestamp = 1000
646
+ window_duration_ms = 1000
647
+ timestamp_ms = int (time .time () * 1000 )
648
+ # use a "window-friendly" timestamp for easier testing
649
+ timestamp_ms = timestamp_ms - (timestamp_ms % window_duration_ms )
647
650
user_id = "abc123"
648
651
value_in = {"user" : user_id }
649
652
expected_message_count = 1
@@ -672,12 +675,12 @@ def on_message_processed(*_):
672
675
sdf ["groupby_timestamp" ] = sdf .apply (
673
676
lambda value , key , timestamp_ , headers : timestamp_ , metadata = True
674
677
)
675
- sdf = sdf .tumbling_window (duration_ms = 1000 ).count ().current ()
678
+ sdf = sdf .tumbling_window (duration_ms = window_duration_ms ).count ().current ()
676
679
sdf = sdf .to_topic (app_topic_out )
677
680
678
681
with app .get_producer () as producer :
679
682
msg = app_topic_in .serialize (
680
- key = "some_key" , value = value_in , timestamp_ms = timestamp
683
+ key = "some_key" , value = value_in , timestamp_ms = timestamp_ms
681
684
)
682
685
producer .produce (
683
686
app_topic_in .name , key = msg .key , value = msg .value , timestamp = msg .timestamp
@@ -700,7 +703,11 @@ def on_message_processed(*_):
700
703
# Check that "user_id" is now used as a message key
701
704
assert row .key .decode () == user_id
702
705
# Check that window is calculated based on the original timestamp
703
- assert row .value == {"start" : 1000 , "end" : 2000 , "value" : 1 }
706
+ assert row .value == {
707
+ "start" : timestamp_ms ,
708
+ "end" : timestamp_ms + window_duration_ms ,
709
+ "value" : 1 ,
710
+ }
704
711
705
712
706
713
class TestAppExactlyOnce :
0 commit comments