Skip to content

Make std::time::Instant optional #663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() {
let probe = ProbeHandle::new();

// Register timely worker logging.
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| println!("LOG1: {:?}", x))
}
Expand All @@ -28,7 +28,7 @@ fn main() {
// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
Expand All @@ -50,7 +50,7 @@ fn main() {
}
);

worker.log_register().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| {
println!("REACHABILITY: {:?}", x);
Expand All @@ -61,7 +61,7 @@ fn main() {
}
);

worker.log_register().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|(_, x)| {
println!("SUMMARY: {:?}", x);
Expand All @@ -81,7 +81,7 @@ fn main() {
});

// Register timely worker logging.
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| println!("LOG2: {:?}", x))
}
Expand All @@ -100,7 +100,7 @@ fn main() {

// Register user-level logging.
type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
worker.log_register().insert::<MyBuilder,_>("input", |time, data|
worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
if let Some(data) = data {
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
Expand All @@ -111,7 +111,7 @@ fn main() {
}
);

let input_logger = worker.log_register().get::<MyBuilder>("input").expect("Input logger absent");
let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");

let timer = std::time::Instant::now();

Expand Down
2 changes: 1 addition & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::default();
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);

// create input and probe handles.
let mut input = InputHandle::new();
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
fn peek_identifier(&self) -> usize {
self.parent.peek_identifier()
}
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
self.parent.log_register()
}
}
Expand Down Expand Up @@ -135,8 +135,8 @@ where
let path = self.addr_for_child(index);

let type_name = std::any::type_name::<T2>();
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));
let summary_logging = self.log_register().get(&format!("timely/summary/{type_name}"));
let progress_logging = self.logger_for(&format!("timely/progress/{type_name}"));
let summary_logging = self.logger_for(&format!("timely/summary/{type_name}"));

let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name));
let result = {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
{
let alloc = crate::communication::allocator::thread::Thread::default();
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down Expand Up @@ -320,7 +320,7 @@ where
T: Send+'static,
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
initialize_from(builders, others, move |allocator| {
let mut worker = Worker::new(worker_config.clone(), allocator);
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down
9 changes: 4 additions & 5 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::collections::BinaryHeap;
use std::cmp::Reverse;

use crate::logging::{TimelyLogger as Logger, TimelyProgressEventBuilder};
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelySummaryLogger as SummaryLogger;

use crate::scheduling::Schedule;
Expand Down Expand Up @@ -182,10 +182,9 @@
// The `None` argument is optional logging infrastructure.
let type_name = std::any::type_name::<TInner>();
let reachability_logging =
worker.log_register()
.get::<reachability::logging::TrackerEventBuilder<TInner>>(&format!("timely/reachability/{type_name}"))
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
let progress_logging = worker.log_register().get::<TimelyProgressEventBuilder<TInner>>(&format!("timely/progress/{type_name}"));
worker.logger_for(&format!("timely/reachability/{type_name}"))
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);
Expand Down Expand Up @@ -785,7 +784,7 @@
for (time, diff) in internal.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);

Check warning on line 787 in timely/src/progress/subgraph.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`internal` shadows a previous, unrelated binding
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; internal {:?}", self.name);
Expand Down
37 changes: 22 additions & 15 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ pub struct Activations {
rx: Receiver<Vec<usize>>,

// Delayed activations.
timer: Instant,
timer: Option<Instant>,
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
}

impl Activations {

/// Creates a new activation tracker.
pub fn new(timer: Instant) -> Self {
pub fn new(timer: Option<Instant>) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
Self {
clean: 0,
Expand All @@ -77,13 +77,18 @@ impl Activations {

/// Schedules a future activation for the task addressed by `path`.
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
if let Some(timer) = self.timer {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
else {
let moment = timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
}
}
else {
let moment = self.timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
self.activate(path);
}
}

Expand All @@ -96,11 +101,13 @@ impl Activations {
}

// Drain timer-based activations.
if !self.queue.is_empty() {
let now = self.timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
if let Some(timer) = self.timer {
if !self.queue.is_empty() {
let now = timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
}
}
}

Expand Down Expand Up @@ -173,12 +180,12 @@ impl Activations {
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() {
if !self.bounds.is_empty() || self.timer.is_none() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
let elapsed = self.timer.unwrap().elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
Expand Down
52 changes: 30 additions & 22 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,23 +201,33 @@ pub trait AsWorker : Scheduler {
/// The next worker-unique identifier to be allocated.
fn peek_identifier(&self) -> usize;
/// Provides access to named logging streams.
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry>;
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>>;
/// Acquires a logger by name, if the log register exists and the name is registered.
///
/// For a more precise understanding of why a result is `None` one can use the direct functions.
fn logger_for<CB: timely_container::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
self.log_register().and_then(|l| l.get(name))
}
/// Provides access to the timely logging stream.
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely").map(Into::into) }
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
}

/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
/// and has a list of dataflows that it manages.
pub struct Worker<A: Allocate> {
config: Config,
timer: Instant,
/// An optional instant from which the start of the computation should be reckoned.
///
/// If this is set to none, system time-based functionality will be unavailable or work badly.
/// For example, logging will be unavailable, and activation after a delay will be unavailable.
timer: Option<Instant>,
paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
allocator: Rc<RefCell<A>>,
identifiers: Rc<RefCell<usize>>,
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
dataflow_counter: Rc<RefCell<usize>>,
logging: Rc<RefCell<crate::logging_core::Registry>>,
logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,

activations: Rc<RefCell<Activations>>,
active_dataflows: Vec<usize>,
Expand Down Expand Up @@ -255,7 +265,7 @@ impl<A: Allocate> AsWorker for Worker<A> {

fn new_identifier(&mut self) -> usize { self.new_identifier() }
fn peek_identifier(&self) -> usize { self.peek_identifier() }
fn log_register(&self) -> RefMut<crate::logging_core::Registry> {
fn log_register(&self) -> Option<RefMut<crate::logging_core::Registry>> {
self.log_register()
}
}
Expand All @@ -268,8 +278,7 @@ impl<A: Allocate> Scheduler for Worker<A> {

impl<A: Allocate> Worker<A> {
/// Allocates a new `Worker` bound to a channel allocator.
pub fn new(config: Config, c: A) -> Worker<A> {
let now = Instant::now();
pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
Worker {
config,
timer: now,
Expand All @@ -278,7 +287,7 @@ impl<A: Allocate> Worker<A> {
identifiers: Default::default(),
dataflows: Default::default(),
dataflow_counter: Default::default(),
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now))),
logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
activations: Rc::new(RefCell::new(Activations::new(now))),
active_dataflows: Default::default(),
temp_channel_ids: Default::default(),
Expand Down Expand Up @@ -414,7 +423,7 @@ impl<A: Allocate> Worker<A> {
}

// Clean up, indicate if dataflows remain.
self.logging.borrow_mut().flush();
self.logging.as_ref().map(|l| l.borrow_mut().flush());
self.allocator.borrow_mut().release();
!self.dataflows.borrow().is_empty()
}
Expand Down Expand Up @@ -485,7 +494,7 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
Expand All @@ -500,7 +509,7 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
Expand All @@ -516,13 +525,13 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
/// });
/// ```
pub fn timer(&self) -> Instant { self.timer }
pub fn timer(&self) -> Option<Instant> { self.timer }

/// Allocate a new worker-unique identifier.
///
Expand All @@ -546,13 +555,14 @@ impl<A: Allocate> Worker<A> {
/// timely::execute_from_args(::std::env::args(), |worker| {
///
/// worker.log_register()
/// .unwrap()
/// .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
/// println!("{:?}\t{:?}", time, data)
/// );
/// });
/// ```
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
self.logging.borrow_mut()
pub fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
self.logging.as_ref().map(|l| l.borrow_mut())
}

/// Construct a new dataflow.
Expand All @@ -575,8 +585,7 @@ impl<A: Allocate> Worker<A> {
T: Refines<()>,
F: FnOnce(&mut Child<Self, T>)->R,
{
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
}

/// Construct a new dataflow with a (purely cosmetic) name.
Expand All @@ -599,8 +608,7 @@ impl<A: Allocate> Worker<A> {
T: Refines<()>,
F: FnOnce(&mut Child<Self, T>)->R,
{
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
}

/// Construct a new dataflow with specific configurations.
Expand Down Expand Up @@ -639,8 +647,8 @@ impl<A: Allocate> Worker<A> {
let identifier = self.new_identifier();

let type_name = std::any::type_name::<T>();
let progress_logging = self.logging.borrow_mut().get(&format!("timely/progress/{type_name}"));
let summary_logging = self.logging.borrow_mut().get(&format!("timely/summary/{type_name}"));
let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name));
let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name));
let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
let subscope = RefCell::new(subscope);

Expand Down Expand Up @@ -735,7 +743,7 @@ impl<A: Allocate> Clone for Worker<A> {
identifiers: Rc::clone(&self.identifiers),
dataflows: Rc::clone(&self.dataflows),
dataflow_counter: Rc::clone(&self.dataflow_counter),
logging: Rc::clone(&self.logging),
logging: self.logging.clone(),
activations: Rc::clone(&self.activations),
active_dataflows: Vec::new(),
temp_channel_ids: Rc::clone(&self.temp_channel_ids),
Expand Down
Loading