7
7
// the Business Source License, use of this software will be governed
8
8
// by the Apache License, Version 2.0.
9
9
10
+ use rand:: Rng ;
10
11
use std:: collections:: { HashMap , HashSet , VecDeque } ;
11
12
use std:: convert:: TryInto ;
12
13
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -369,13 +370,17 @@ impl DataPlaneInfo {
369
370
let refresh = self . refresh_metadata_info . clone ( ) ;
370
371
let id = self . source_id . clone ( ) ;
371
372
let topic = self . topic_name . clone ( ) ;
373
+ let worker_id = self . worker_id ;
374
+ let worker_count = self . worker_count ;
372
375
thread:: spawn ( move || {
373
376
metadata_fetch (
374
377
timestamping_stopped,
375
378
consumer,
376
379
refresh,
377
380
& id,
378
381
& topic,
382
+ worker_id,
383
+ worker_count,
379
384
metadata_refresh_frequency,
380
385
)
381
386
} ) ;
@@ -421,8 +426,6 @@ impl DataPlaneInfo {
421
426
}
422
427
423
428
/// Returns true if this worker is responsible for this partition
424
- /// If multi-worker reading is not enabled, this worker is *always* responsible for the
425
- /// partition
426
429
/// Ex: if pid=0 and worker_id = 0, then true
427
430
/// if pid=1 and worker_id = 0, then false
428
431
fn has_partition ( & self , partition_id : i32 ) -> bool {
@@ -760,12 +763,15 @@ fn activate_source_timestamping<G>(
760
763
761
764
/// This function is responsible for refreshing the number of known partitions. It marks the source
762
765
/// has needing to be refreshed if new partitions are detected.
766
+ #[ allow( clippy:: too_many_arguments) ]
763
767
fn metadata_fetch (
764
768
timestamping_stopped : Arc < AtomicBool > ,
765
769
consumer : Arc < BaseConsumer < GlueConsumerContext > > ,
766
770
partition_count : Arc < Mutex < Option < i32 > > > ,
767
771
id : & str ,
768
772
topic : & str ,
773
+ worker_id : i32 ,
774
+ worker_count : i32 ,
769
775
wait : Duration ,
770
776
) {
771
777
debug ! (
@@ -783,6 +789,7 @@ fn metadata_fetch(
783
789
}
784
790
785
791
let mut partition_kafka_metadata: HashMap < i32 , IntGauge > = HashMap :: new ( ) ;
792
+ let mut rng = rand:: thread_rng ( ) ;
786
793
787
794
while !timestamping_stopped. load ( Ordering :: SeqCst ) {
788
795
let metadata = consumer. fetch_metadata ( Some ( & topic) , Duration :: from_secs ( 30 ) ) ;
@@ -806,37 +813,40 @@ fn metadata_fetch(
806
813
continue ;
807
814
}
808
815
new_partition_count = metadata_topic. partitions ( ) . len ( ) ;
809
- let mut refresh_data = partition_count. lock ( ) . expect ( "lock poisoned" ) ;
810
816
811
817
// Upgrade partition metrics
812
818
for p in 0 ..new_partition_count {
813
819
let pid = p. try_into ( ) . unwrap ( ) ;
814
- match consumer. fetch_watermarks ( & topic, pid, Duration :: from_secs ( 1 ) ) {
815
- Ok ( ( _, high) ) => {
816
- if let Some ( max_available_offset) =
817
- partition_kafka_metadata. get_mut ( & pid)
818
- {
819
- max_available_offset. set ( high)
820
- } else {
821
- let max_offset = MAX_AVAILABLE_OFFSET . with_label_values ( & [
822
- topic,
823
- & id,
824
- & pid. to_string ( ) ,
825
- ] ) ;
826
- max_offset. set ( high) ;
827
- partition_kafka_metadata. insert ( pid, max_offset) ;
820
+ // Only check metadata updates for partitions that the worker owns
821
+ if ( pid % worker_count) == worker_id {
822
+ match consumer. fetch_watermarks ( & topic, pid, Duration :: from_secs ( 1 ) ) {
823
+ Ok ( ( _, high) ) => {
824
+ if let Some ( max_available_offset) =
825
+ partition_kafka_metadata. get_mut ( & pid)
826
+ {
827
+ max_available_offset. set ( high)
828
+ } else {
829
+ let max_offset = MAX_AVAILABLE_OFFSET . with_label_values ( & [
830
+ topic,
831
+ & id,
832
+ & pid. to_string ( ) ,
833
+ ] ) ;
834
+ max_offset. set ( high) ;
835
+ partition_kafka_metadata. insert ( pid, max_offset) ;
836
+ }
828
837
}
838
+ Err ( e) => warn ! (
839
+ "error loading watermarks topic={} partition={} error={}" ,
840
+ topic, p, e
841
+ ) ,
829
842
}
830
- Err ( e) => warn ! (
831
- "error loading watermarks topic={} partition={} error={}" ,
832
- topic, p, e
833
- ) ,
834
843
}
835
844
}
836
845
837
846
// Kafka partition are i32, and Kafka consequently cannot support more than i32
838
847
// partitions
839
- * refresh_data = Some ( new_partition_count. try_into ( ) . unwrap ( ) ) ;
848
+ * partition_count. lock ( ) . expect ( "lock poisoned" ) =
849
+ Some ( new_partition_count. try_into ( ) . unwrap ( ) ) ;
840
850
}
841
851
Err ( e) => {
842
852
new_partition_count = 0 ;
@@ -845,7 +855,10 @@ fn metadata_fetch(
845
855
}
846
856
847
857
if new_partition_count > 0 {
848
- thread:: sleep ( wait) ;
858
+ // Add jitter to spread-out metadata requests from workers. Brokers can get overloaded
859
+ // if all workers make simultaneous metadata request calls.
860
+ let sleep_jitter = rng. gen_range ( Duration :: from_secs ( 0 ) , Duration :: from_secs ( 15 ) ) ;
861
+ thread:: sleep ( wait + sleep_jitter) ;
849
862
} else {
850
863
// If no partitions have been detected yet, sleep for a second rather than
851
864
// the specified "wait" period of time, as we know that there should at least be one
0 commit comments