Skip to content

Commit bd4c8a7

Browse files
Move opinions about reachability logging into TrackerLogger (#629)
1 parent 90bd8aa commit bd4c8a7

File tree

1 file changed

+38
-36
lines changed

1 file changed

+38
-36
lines changed

timely/src/progress/reachability.rs

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -573,22 +573,16 @@ impl<T:Timestamp> Tracker<T> {
573573
let target_changes =
574574
self.target_changes
575575
.iter()
576-
.map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff))
577-
.collect::<Vec<_>>();
576+
.map(|((target, time), diff)| (target.node, target.port, time, *diff));
578577

579-
if !target_changes.is_empty() {
580-
logger.log_target_updates(target_changes);
581-
}
578+
logger.log_target_updates(target_changes);
582579

583580
let source_changes =
584581
self.source_changes
585582
.iter()
586-
.map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff))
587-
.collect::<Vec<_>>();
583+
.map(|((source, time), diff)| (source.node, source.port, time, *diff));
588584

589-
if !source_changes.is_empty() {
590-
logger.log_source_updates(source_changes);
591-
}
585+
logger.log_source_updates(source_changes);
592586
}
593587

594588
// Step 1: Drain `self.input_changes` and determine actual frontier changes.
@@ -853,22 +847,34 @@ pub mod logging {
853847
}
854848

855849
/// Log source update events with additional identifying information.
856-
pub fn log_source_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
857-
self.logger.log({
858-
SourceUpdate {
859-
tracker_id: self.identifier,
860-
updates,
861-
}
862-
})
850+
pub fn log_source_updates<'a, I>(&mut self, updates: I)
851+
where
852+
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
853+
{
854+
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
855+
if !updates.is_empty() {
856+
self.logger.log({
857+
SourceUpdate {
858+
tracker_id: self.identifier,
859+
updates
860+
}
861+
});
862+
}
863863
}
864864
/// Log target update events with additional identifying information.
865-
pub fn log_target_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
866-
self.logger.log({
867-
TargetUpdate {
868-
tracker_id: self.identifier,
869-
updates,
870-
}
871-
})
865+
pub fn log_target_updates<'a, I>(&mut self, updates: I)
866+
where
867+
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
868+
{
869+
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
870+
if !updates.is_empty() {
871+
self.logger.log({
872+
TargetUpdate {
873+
tracker_id: self.identifier,
874+
updates
875+
}
876+
});
877+
}
872878
}
873879
}
874880

@@ -929,25 +935,21 @@ impl<T: Timestamp> Drop for Tracker<T> {
929935
.flat_map(|(port, target)| {
930936
target.pointstamps
931937
.updates()
932-
.map(move |(time, diff)| (index, port, time.clone(), -diff))
933-
})
934-
.collect::<Vec<_>>();
935-
if !target_changes.is_empty() {
936-
logger.log_target_updates(target_changes);
937-
}
938+
.map(move |(time, diff)| (index, port, time, -diff))
939+
});
940+
941+
logger.log_target_updates(target_changes);
938942

939943
let source_changes = per_operator.sources
940944
.iter_mut()
941945
.enumerate()
942946
.flat_map(|(port, source)| {
943947
source.pointstamps
944948
.updates()
945-
.map(move |(time, diff)| (index, port, time.clone(), -diff))
946-
})
947-
.collect::<Vec<_>>();
948-
if !source_changes.is_empty() {
949-
logger.log_source_updates(source_changes);
950-
}
949+
.map(move |(time, diff)| (index, port, time, -diff))
950+
});
951+
952+
logger.log_source_updates(source_changes);
951953
}
952954
}
953955
}

0 commit comments

Comments
 (0)