@@ -590,8 +590,7 @@ mod tests {
590
590
use futures:: stream;
591
591
use indoc:: indoc;
592
592
use pretty_assertions:: assert_eq;
593
- use tokio:: { sync:: mpsc, sync:: oneshot:: error:: TryRecvError , time} ;
594
- use tokio_stream:: wrappers:: UnboundedReceiverStream ;
593
+ use tokio:: { sync:: oneshot:: error:: TryRecvError , time} ;
595
594
use vector_common:: finalization:: { BatchNotifier , BatchStatus } ;
596
595
use vector_core:: { event:: StatisticKind , samples} ;
597
596
@@ -601,7 +600,10 @@ mod tests {
601
600
event:: metric:: { Metric , MetricValue } ,
602
601
http:: HttpClient ,
603
602
sinks:: prometheus:: { distribution_to_agg_histogram, distribution_to_ddsketch} ,
604
- test_util:: { next_addr, random_string, trace_init} ,
603
+ test_util:: {
604
+ components:: { run_and_assert_sink_compliance, SINK_TAGS } ,
605
+ next_addr, random_string, trace_init,
606
+ } ,
605
607
tls:: MaybeTlsSettings ,
606
608
} ;
607
609
@@ -686,21 +688,22 @@ mod tests {
686
688
suppress_timestamp,
687
689
..Default :: default ( )
688
690
} ;
689
- let ( sink, _) = config. build ( SinkContext :: new_test ( ) ) . await . unwrap ( ) ;
690
- let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
691
- let input_events = UnboundedReceiverStream :: new ( rx) ;
692
-
693
- let input_events = input_events. map ( Into :: into) ;
694
- let sink_handle = tokio:: spawn ( async move { sink. run ( input_events) . await . unwrap ( ) } ) ;
695
691
696
692
// Set up acknowledgement notification
697
693
let mut receiver = BatchNotifier :: apply_to ( & mut events[ ..] ) ;
698
-
699
694
assert_eq ! ( receiver. try_recv( ) , Err ( TryRecvError :: Empty ) ) ;
700
695
701
- for event in events {
702
- tx. send ( event) . expect ( "Failed to send event." ) ;
703
- }
696
+ let ( sink, _) = config. build ( SinkContext :: new_test ( ) ) . await . unwrap ( ) ;
697
+ let ( _, delayed_event) = create_metric_gauge ( Some ( "delayed" . to_string ( ) ) , 123.4 ) ;
698
+ let sink_handle = tokio:: spawn ( run_and_assert_sink_compliance (
699
+ sink,
700
+ stream:: iter ( events) . chain ( stream:: once ( async move {
701
+ // Wait a bit to have time to scrape metrics
702
+ time:: sleep ( time:: Duration :: from_millis ( 500 ) ) . await ;
703
+ delayed_event
704
+ } ) ) ,
705
+ & SINK_TAGS ,
706
+ ) ) ;
704
707
705
708
time:: sleep ( time:: Duration :: from_millis ( 100 ) ) . await ;
706
709
@@ -725,7 +728,6 @@ mod tests {
725
728
. expect ( "Reading body failed" ) ;
726
729
let result = String :: from_utf8 ( bytes. to_vec ( ) ) . unwrap ( ) ;
727
730
728
- drop ( tx) ;
729
731
sink_handle. await . unwrap ( ) ;
730
732
731
733
result
@@ -1072,12 +1074,20 @@ mod integration_tests {
1072
1074
#![ allow( clippy:: dbg_macro) ] // tests
1073
1075
1074
1076
use chrono:: Utc ;
1077
+ use futures:: { future:: ready, stream} ;
1075
1078
use serde_json:: Value ;
1076
1079
use tokio:: { sync:: mpsc, time} ;
1077
1080
use tokio_stream:: wrappers:: UnboundedReceiverStream ;
1078
1081
1079
1082
use super :: * ;
1080
- use crate :: { config:: ProxyConfig , http:: HttpClient , test_util:: trace_init} ;
1083
+ use crate :: {
1084
+ config:: ProxyConfig ,
1085
+ http:: HttpClient ,
1086
+ test_util:: {
1087
+ components:: { run_and_assert_sink_compliance, SINK_TAGS } ,
1088
+ trace_init,
1089
+ } ,
1090
+ } ;
1081
1091
1082
1092
fn sink_exporter_address ( ) -> String {
1083
1093
std:: env:: var ( "SINK_EXPORTER_ADDRESS" ) . unwrap_or_else ( |_| "127.0.0.1:9101" . into ( ) )
@@ -1145,17 +1155,19 @@ mod integration_tests {
1145
1155
..Default :: default ( )
1146
1156
} ;
1147
1157
let ( sink, _) = config. build ( SinkContext :: new_test ( ) ) . await . unwrap ( ) ;
1148
- let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
1149
- let input_events = UnboundedReceiverStream :: new ( rx) ;
1150
-
1151
- let input_events = input_events. map ( Into :: into) ;
1152
- let sink_handle = tokio:: spawn ( async move { sink. run ( input_events) . await . unwrap ( ) } ) ;
1153
-
1154
1158
let ( name, event) = tests:: create_metric_gauge ( None , 123.4 ) ;
1155
- tx. send ( event) . expect ( "Failed to send." ) ;
1156
-
1157
- // Wait a bit for the prometheus server to scrape the metrics
1158
- time:: sleep ( time:: Duration :: from_secs ( 2 ) ) . await ;
1159
+ let ( _, delayed_event) = tests:: create_metric_gauge ( Some ( "delayed" . to_string ( ) ) , 123.4 ) ;
1160
+
1161
+ run_and_assert_sink_compliance (
1162
+ sink,
1163
+ stream:: once ( ready ( event) ) . chain ( stream:: once ( async move {
1164
+ // Wait a bit for the prometheus server to scrape the metrics
1165
+ time:: sleep ( time:: Duration :: from_secs ( 2 ) ) . await ;
1166
+ delayed_event
1167
+ } ) ) ,
1168
+ & SINK_TAGS ,
1169
+ )
1170
+ . await ;
1159
1171
1160
1172
// Now try to download them from prometheus
1161
1173
let result = prometheus_query ( & name) . await ;
@@ -1172,9 +1184,6 @@ mod integration_tests {
1172
1184
) ;
1173
1185
assert ! ( data[ "value" ] [ 0 ] . as_f64( ) . unwrap( ) >= start as f64 ) ;
1174
1186
assert_eq ! ( data[ "value" ] [ 1 ] , Value :: String ( "123.4" . into( ) ) ) ;
1175
-
1176
- drop ( tx) ;
1177
- sink_handle. await . unwrap ( ) ;
1178
1187
}
1179
1188
1180
1189
async fn reset_on_flush_period ( ) {
0 commit comments