From 37b737c1ab247605db6920c850d9f433019c9c0a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 26 Aug 2024 15:35:39 -0400 Subject: [PATCH 1/2] Make std::time::Instant optional --- timely/examples/logging-send.rs | 14 ++++----- timely/examples/threadless.rs | 2 +- timely/src/dataflow/scopes/child.rs | 6 ++-- timely/src/execute.rs | 4 +-- timely/src/progress/subgraph.rs | 9 ++++-- timely/src/scheduling/activate.rs | 37 ++++++++++++++---------- timely/src/worker.rs | 44 ++++++++++++++++------------- 7 files changed, 65 insertions(+), 51 deletions(-) diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 4d9c2bb00..f74106da5 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -16,7 +16,7 @@ fn main() { let probe = ProbeHandle::new(); // Register timely worker logging. - worker.log_register().insert::("timely", |time, data| + worker.log_register().unwrap().insert::("timely", |time, data| if let Some(data) = data { data.iter().for_each(|x| println!("LOG1: {:?}", x)) } @@ -28,7 +28,7 @@ fn main() { // Register timely progress logging. // Less generally useful: intended for debugging advanced custom operators or timely // internals. - worker.log_register().insert::,_>("timely/progress/usize", |time, data| + worker.log_register().unwrap().insert::,_>("timely/progress/usize", |time, data| if let Some(data) = data { data.iter().for_each(|x| { println!("PROGRESS: {:?}", x); @@ -50,7 +50,7 @@ fn main() { } ); - worker.log_register().insert::,_>("timely/reachability/usize", |time, data| + worker.log_register().unwrap().insert::,_>("timely/reachability/usize", |time, data| if let Some(data) = data { data.iter().for_each(|x| { println!("REACHABILITY: {:?}", x); @@ -61,7 +61,7 @@ fn main() { } ); - worker.log_register().insert::,_>("timely/summary/usize", |time, data| + worker.log_register().unwrap().insert::,_>("timely/summary/usize", |time, data| if let Some(data) = data { data.iter().for_each(|(_, x)| { println!("SUMMARY: {:?}", x); @@ -81,7 +81,7 @@ fn main() { }); // Register timely worker logging. - worker.log_register().insert::("timely", |time, data| + worker.log_register().unwrap().insert::("timely", |time, data| if let Some(data) = data { data.iter().for_each(|x| println!("LOG2: {:?}", x)) } @@ -100,7 +100,7 @@ fn main() { // Register user-level logging. type MyBuilder = CapacityContainerBuilder>; - worker.log_register().insert::("input", |time, data| + worker.log_register().unwrap().insert::("input", |time, data| if let Some(data) = data { for element in data.iter() { println!("Round tick at: {:?}", element.0); @@ -111,7 +111,7 @@ fn main() { } ); - let input_logger = worker.log_register().get::("input").expect("Input logger absent"); + let input_logger = worker.log_register().unwrap().get::("input").expect("Input logger absent"); let timer = std::time::Instant::now(); diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index 4a7c124d5..a25273c57 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -6,7 +6,7 @@ fn main() { // create a naked single-threaded worker. let allocator = timely::communication::allocator::Thread::default(); - let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator); + let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None); // create input and probe handles. let mut input = InputHandle::new(); diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 19dddd7e6..560e17405 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -73,7 +73,7 @@ where fn peek_identifier(&self) -> usize { self.parent.peek_identifier() } - fn log_register(&self) -> ::std::cell::RefMut { + fn log_register(&self) -> Option<::std::cell::RefMut> { self.parent.log_register() } } @@ -135,8 +135,8 @@ where let path = self.addr_for_child(index); let type_name = std::any::type_name::(); - let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}")); - let summary_logging = self.log_register().get(&format!("timely/summary/{type_name}")); + let progress_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/progress/{type_name}"))); + let summary_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/summary/{type_name}"))); let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name)); let result = { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index f1d87f745..b9673a98e 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -154,7 +154,7 @@ where F: FnOnce(&mut Worker)->T+Send+Sync+'static { let alloc = crate::communication::allocator::thread::Thread::default(); - let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); + let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now())); let result = func(&mut worker); while worker.has_dataflows() { worker.step_or_park(None); @@ -320,7 +320,7 @@ where T: Send+'static, F: Fn(&mut Worker<::Allocator>)->T+Send+Sync+'static { initialize_from(builders, others, move |allocator| { - let mut worker = Worker::new(worker_config.clone(), allocator); + let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now())); let result = func(&mut worker); while worker.has_dataflows() { worker.step_or_park(None); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 090d36265..f7d600150 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -183,9 +183,12 @@ where let type_name = std::any::type_name::(); let reachability_logging = worker.log_register() - .get::>(&format!("timely/reachability/{type_name}")) - .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)); - let progress_logging = worker.log_register().get::>(&format!("timely/progress/{type_name}")); + .as_ref() + .and_then(|l| + l.get::>(&format!("timely/reachability/{type_name}")) + .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)) + ); + let progress_logging = worker.log_register().as_ref().and_then(|l| l.get::>(&format!("timely/progress/{type_name}"))); let (tracker, scope_summary) = builder.build(reachability_logging); let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging); diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index aa3d4fcd0..2642b520b 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -48,14 +48,14 @@ pub struct Activations { rx: Receiver>, // Delayed activations. - timer: Instant, + timer: Option, queue: BinaryHeap)>>, } impl Activations { /// Creates a new activation tracker. - pub fn new(timer: Instant) -> Self { + pub fn new(timer: Option) -> Self { let (tx, rx) = crossbeam_channel::unbounded(); Self { clean: 0, @@ -77,13 +77,18 @@ impl Activations { /// Schedules a future activation for the task addressed by `path`. pub fn activate_after(&mut self, path: &[usize], delay: Duration) { - // TODO: We could have a minimum delay and immediately schedule anything less than that delay. - if delay == Duration::new(0, 0) { - self.activate(path); - } + if let Some(timer) = self.timer { + // TODO: We could have a minimum delay and immediately schedule anything less than that delay. + if delay == Duration::new(0, 0) { + self.activate(path); + } + else { + let moment = timer.elapsed() + delay; + self.queue.push(Reverse((moment, path.to_vec()))); + } + } else { - let moment = self.timer.elapsed() + delay; - self.queue.push(Reverse((moment, path.to_vec()))); + self.activate(path); } } @@ -96,11 +101,13 @@ impl Activations { } // Drain timer-based activations. - if !self.queue.is_empty() { - let now = self.timer.elapsed(); - while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) { - let Reverse((_time, path)) = self.queue.pop().unwrap(); - self.activate(&path[..]); + if let Some(timer) = self.timer { + if !self.queue.is_empty() { + let now = timer.elapsed(); + while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) { + let Reverse((_time, path)) = self.queue.pop().unwrap(); + self.activate(&path[..]); + } } } @@ -173,12 +180,12 @@ impl Activations { /// indicates the amount of time before the thread should be unparked for the /// next scheduled activation. pub fn empty_for(&self) -> Option { - if !self.bounds.is_empty() { + if !self.bounds.is_empty() || self.timer.is_none() { Some(Duration::new(0,0)) } else { self.queue.peek().map(|Reverse((t,_a))| { - let elapsed = self.timer.elapsed(); + let elapsed = self.timer.unwrap().elapsed(); if t < &elapsed { Duration::new(0,0) } else { *t - elapsed } }) diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 155115f38..771449567 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -201,23 +201,27 @@ pub trait AsWorker : Scheduler { /// The next worker-unique identifier to be allocated. fn peek_identifier(&self) -> usize; /// Provides access to named logging streams. - fn log_register(&self) -> ::std::cell::RefMut; + fn log_register(&self) -> Option<::std::cell::RefMut>; /// Provides access to the timely logging stream. - fn logging(&self) -> Option { self.log_register().get("timely").map(Into::into) } + fn logging(&self) -> Option { self.log_register().and_then(|l| l.get("timely").map(Into::into)) } } /// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`, /// and has a list of dataflows that it manages. pub struct Worker { config: Config, - timer: Instant, + /// An optional instant from which the start of the computation should be reckoned. + /// + /// If this is set to none, system time-based functionality will be unavailable or work badly. + /// For example, logging will be unavailable, and activation after a delay will be unavailable. + timer: Option, paths: Rc>>>, allocator: Rc>, identifiers: Rc>, // dataflows: Rc>>, dataflows: Rc>>, dataflow_counter: Rc>, - logging: Rc>, + logging: Option>>, activations: Rc>, active_dataflows: Vec, @@ -255,7 +259,7 @@ impl AsWorker for Worker { fn new_identifier(&mut self) -> usize { self.new_identifier() } fn peek_identifier(&self) -> usize { self.peek_identifier() } - fn log_register(&self) -> RefMut { + fn log_register(&self) -> Option> { self.log_register() } } @@ -268,8 +272,7 @@ impl Scheduler for Worker { impl Worker { /// Allocates a new `Worker` bound to a channel allocator. - pub fn new(config: Config, c: A) -> Worker { - let now = Instant::now(); + pub fn new(config: Config, c: A, now: Option) -> Worker { Worker { config, timer: now, @@ -278,7 +281,7 @@ impl Worker { identifiers: Default::default(), dataflows: Default::default(), dataflow_counter: Default::default(), - logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now))), + logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))), activations: Rc::new(RefCell::new(Activations::new(now))), active_dataflows: Default::default(), temp_channel_ids: Default::default(), @@ -414,7 +417,7 @@ impl Worker { } // Clean up, indicate if dataflows remain. - self.logging.borrow_mut().flush(); + self.logging.as_ref().map(|l| l.borrow_mut().flush()); self.allocator.borrow_mut().release(); !self.dataflows.borrow().is_empty() } @@ -485,7 +488,7 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// @@ -500,7 +503,7 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// @@ -516,13 +519,13 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// /// }); /// ``` - pub fn timer(&self) -> Instant { self.timer } + pub fn timer(&self) -> Option { self.timer } /// Allocate a new worker-unique identifier. /// @@ -546,13 +549,14 @@ impl Worker { /// timely::execute_from_args(::std::env::args(), |worker| { /// /// worker.log_register() + /// .unwrap() /// .insert::("timely", |time, data| /// println!("{:?}\t{:?}", time, data) /// ); /// }); /// ``` - pub fn log_register(&self) -> ::std::cell::RefMut { - self.logging.borrow_mut() + pub fn log_register(&self) -> Option<::std::cell::RefMut> { + self.logging.as_ref().map(|l| l.borrow_mut()) } /// Construct a new dataflow. @@ -575,7 +579,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.borrow_mut().get("timely").map(Into::into); + let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into)); self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child)) } @@ -599,7 +603,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.borrow_mut().get("timely").map(Into::into); + let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into)); self.dataflow_core(name, logging, Box::new(()), |_, child| func(child)) } @@ -639,8 +643,8 @@ impl Worker { let identifier = self.new_identifier(); let type_name = std::any::type_name::(); - let progress_logging = self.logging.borrow_mut().get(&format!("timely/progress/{type_name}")); - let summary_logging = self.logging.borrow_mut().get(&format!("timely/summary/{type_name}")); + let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/progress/{}", type_name)).map(Into::into)); + let summary_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/summary/{}", type_name)).map(Into::into)); let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); let subscope = RefCell::new(subscope); @@ -735,7 +739,7 @@ impl Clone for Worker { identifiers: Rc::clone(&self.identifiers), dataflows: Rc::clone(&self.dataflows), dataflow_counter: Rc::clone(&self.dataflow_counter), - logging: Rc::clone(&self.logging), + logging: self.logging.clone(), activations: Rc::clone(&self.activations), active_dataflows: Vec::new(), temp_channel_ids: Rc::clone(&self.temp_channel_ids), From 7578c1ef5d2a50b38596f9ead76226a19d31c02a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 7 May 2025 12:19:33 -0400 Subject: [PATCH 2/2] Tidy various logging uses --- timely/src/dataflow/scopes/child.rs | 4 ++-- timely/src/progress/subgraph.rs | 12 ++++-------- timely/src/worker.rs | 18 +++++++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 560e17405..c5ddf3091 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -135,8 +135,8 @@ where let path = self.addr_for_child(index); let type_name = std::any::type_name::(); - let progress_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/progress/{type_name}"))); - let summary_logging = self.log_register().as_ref().and_then(|l| l.get(&format!("timely/summary/{type_name}"))); + let progress_logging = self.logger_for(&format!("timely/progress/{type_name}")); + let summary_logging = self.logger_for(&format!("timely/summary/{type_name}")); let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name)); let result = { diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f7d600150..868fd10d9 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -10,7 +10,7 @@ use std::cell::RefCell; use std::collections::BinaryHeap; use std::cmp::Reverse; -use crate::logging::{TimelyLogger as Logger, TimelyProgressEventBuilder}; +use crate::logging::TimelyLogger as Logger; use crate::logging::TimelySummaryLogger as SummaryLogger; use crate::scheduling::Schedule; @@ -182,13 +182,9 @@ where // The `None` argument is optional logging infrastructure. let type_name = std::any::type_name::(); let reachability_logging = - worker.log_register() - .as_ref() - .and_then(|l| - l.get::>(&format!("timely/reachability/{type_name}")) - .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)) - ); - let progress_logging = worker.log_register().as_ref().and_then(|l| l.get::>(&format!("timely/progress/{type_name}"))); + worker.logger_for(&format!("timely/reachability/{type_name}")) + .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)); + let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}")); let (tracker, scope_summary) = builder.build(reachability_logging); let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging); diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 771449567..36f66d1a6 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -202,8 +202,14 @@ pub trait AsWorker : Scheduler { fn peek_identifier(&self) -> usize; /// Provides access to named logging streams. fn log_register(&self) -> Option<::std::cell::RefMut>; + /// Acquires a logger by name, if the log register exists and the name is registered. + /// + /// For a more precise understanding of why a result is `None` one can use the direct functions. + fn logger_for(&self, name: &str) -> Option> { + self.log_register().and_then(|l| l.get(name)) + } /// Provides access to the timely logging stream. - fn logging(&self) -> Option { self.log_register().and_then(|l| l.get("timely").map(Into::into)) } + fn logging(&self) -> Option { self.logger_for("timely").map(Into::into) } } /// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`, @@ -579,8 +585,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into)); - self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child)) + self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child)) } /// Construct a new dataflow with a (purely cosmetic) name. @@ -603,8 +608,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely").map(Into::into)); - self.dataflow_core(name, logging, Box::new(()), |_, child| func(child)) + self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child)) } /// Construct a new dataflow with specific configurations. @@ -643,8 +647,8 @@ impl Worker { let identifier = self.new_identifier(); let type_name = std::any::type_name::(); - let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/progress/{}", type_name)).map(Into::into)); - let summary_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get(&format!("timely/summary/{}", type_name)).map(Into::into)); + let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name)); + let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name)); let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); let subscope = RefCell::new(subscope);