diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 9ca301ba2..8ba993726 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -27,9 +27,9 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use crate::order::PartialOrder; -use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; +use crate::progress::operate::PortConnectivity; use crate::scheduling::Activations; use crate::dataflow::channels::pullers::counter::ConsumedGuard; @@ -238,7 +238,7 @@ pub struct InputCapability { /// Output capability buffers, for use in minting capabilities. internal: CapabilityUpdates, /// Timestamp summaries for each output. - summaries: Rc>>>, + summaries: Rc>>, /// A drop guard that updates the consumed capability this InputCapability refers to on drop consumed_guard: ConsumedGuard, } @@ -246,18 +246,20 @@ pub struct InputCapability { impl CapabilityTrait for InputCapability { fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - let borrow = self.summaries.borrow(); - self.internal.borrow().iter().enumerate().any(|(index, rc)| { - // To be valid, the output buffer must match and the timestamp summary needs to be the default. - Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default() - }) + let summaries_borrow = self.summaries.borrow(); + let internal_borrow = self.internal.borrow(); + // To be valid, the output buffer must match and the timestamp summary needs to be the default. + let result = summaries_borrow.iter_ports().any(|(port, path)| { + Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default() + }); + result } } impl InputCapability { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>>, guard: ConsumedGuard) -> Self { + pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>, guard: ConsumedGuard) -> Self { InputCapability { internal, summaries, @@ -281,10 +283,15 @@ impl InputCapability { /// Delays capability for a specific output port. pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; - if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { - Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) - } else { - panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time()); + if let Some(path) = self.summaries.borrow().get(output_port) { + if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { + Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) + } else { + panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time()); + } + } + else { + panic!("Attempted to delay a capability for a disconnected output"); } } @@ -305,11 +312,16 @@ impl InputCapability { pub fn retain_for_output(self, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; let self_time = self.time().clone(); - if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { - Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) + if let Some(path) = self.summaries.borrow().get(output_port) { + if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { + Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) + } + else { + panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time); + } } else { - panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time); + panic!("Attempted to retain a capability for a disconnected output"); } } } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index e008b3029..8fdd4f708 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -113,7 +113,7 @@ impl ConnectLoop for StreamCore { let summary = handle.summary; let mut output = handle.output; - let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); + let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 4d20adc6e..c5f974432 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; use crate::scheduling::{Schedule, Activator}; -use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; use crate::progress::Source; - +use crate::progress::operate::Connectivity; use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::{Scope, ScopeParent, StreamCore}; @@ -205,7 +204,7 @@ impl Operate for Operator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); (Vec::new(), Rc::clone(&self.shared_progress)) } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 2c7b26df6..7663a1d69 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder}; use crate::scheduling::{Schedule, ActivateOnDrop}; -use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; - +use crate::progress::operate::Connectivity; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; @@ -134,7 +133,7 @@ impl Operate for UnorderedOperator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { let mut borrow = self.internal.borrow_mut(); for (time, count) in borrow.drain() { self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 43266683e..4659383a9 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; - +use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::Container; use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::channels::pushers::Tee; @@ -60,7 +60,7 @@ pub struct OperatorBuilder { global: usize, address: Rc<[usize]>, // path to the operator (ending with index). shape: OperatorShape, - summary: Vec::Summary>>>, + summary: Connectivity<::Summary>, } impl OperatorBuilder { @@ -105,17 +105,19 @@ impl OperatorBuilder { /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller - where - P: ParallelizationContract { - let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; + where + P: ParallelizationContract + { + let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> P::Puller where - P: ParallelizationContract { - + P: ParallelizationContract, + I: IntoIterator::Summary>)>, + { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging); @@ -123,8 +125,9 @@ impl OperatorBuilder { stream.connect_to(target, sender, channel_id); self.shape.inputs += 1; - assert_eq!(self.shape.outputs, connection.len()); - self.summary.push(connection); + let connectivity: PortConnectivity<_> = connection.into_iter().collect(); + assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs)); + self.summary.push(connectivity); receiver } @@ -132,21 +135,23 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (Tee, StreamCore) { - let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; + let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { - + pub fn new_output_connection(&mut self, connection: I) -> (Tee, StreamCore) + where + I: IntoIterator::Summary>)>, + { + let new_output = self.shape.outputs; + self.shape.outputs += 1; let (targets, registrar) = Tee::::new(); - let source = Source::new(self.index, self.shape.outputs); + let source = Source::new(self.index, new_output); let stream = StreamCore::new(source, registrar, self.scope.clone()); - self.shape.outputs += 1; - assert_eq!(self.shape.inputs, connection.len()); - for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) { - summary.push(entry); + for (input, entry) in connection { + self.summary[input].add_port(new_output, entry); } (targets, stream) @@ -188,7 +193,7 @@ where logic: L, shared_progress: Rc>>, activations: Rc>, - summary: Vec>>, + summary: Connectivity, } impl Schedule for OperatorCore @@ -213,7 +218,7 @@ where fn outputs(&self) -> usize { self.shape.outputs } // announce internal topology as fully connected, and hold all default capabilities. - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { // Request the operator to be scheduled at least once. self.activations.borrow_mut().activate(&self.address[..]); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 1df5dc0cc..66957ddf5 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability; use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; - +use crate::progress::operate::PortConnectivity; use crate::logging::TimelyLogger as Logger; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; @@ -33,7 +33,7 @@ pub struct OperatorBuilder { consumed: Vec>>>, internal: Rc>>>>>, /// For each input, a shared list of summaries to each output. - summaries: Vec::Summary>>>>>, + summaries: Vec::Summary>>>>, produced: Vec>>>, logging: Option, } @@ -64,7 +64,7 @@ impl OperatorBuilder { where P: ParallelizationContract { - let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect(); + let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) } @@ -76,17 +76,18 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore - where - P: ParallelizationContract { - + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> InputHandleCore + where + P: ParallelizationContract, + I: IntoIterator::Summary>)> + Clone, + { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); let input = PullCounter::new(puller); self.frontier.push(MutableAntichain::new()); self.consumed.push(Rc::clone(input.consumed())); - let shared_summary = Rc::new(RefCell::new(connection)); + let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect())); self.summaries.push(Rc::clone(&shared_summary)); new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone()) @@ -94,7 +95,7 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { - let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect(); + let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -106,14 +107,14 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection( - &mut self, - connection: Vec::Summary>> - ) -> ( + pub fn new_output_connection(&mut self, connection: I) -> ( OutputWrapper>, StreamCore - ) { - + ) + where + I: IntoIterator::Summary>)> + Clone, + { + let new_output = self.shape().outputs(); let (tee, stream) = self.builder.new_output_connection(connection.clone()); let internal = Rc::new(RefCell::new(ChangeBatch::new())); @@ -122,8 +123,8 @@ impl OperatorBuilder { let mut buffer = PushBuffer::new(PushCounter::new(tee)); self.produced.push(Rc::clone(buffer.inner().produced())); - for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { - summary.borrow_mut().push(connection.clone()); + for (input, entry) in connection { + self.summaries[input].borrow_mut().add_port(new_output, entry); } (OutputWrapper::new(buffer, internal), stream) diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index b8b220e39..8412ca8e7 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -6,10 +6,10 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; +use crate::progress::operate::PortConnectivity; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; @@ -30,7 +30,7 @@ pub struct InputHandleCore>> { /// /// Each timestamp received through this input may only produce output timestamps /// greater or equal to the input timestamp subjected to at least one of these summaries. - summaries: Rc>>>, + summaries: Rc>>, logging: Option, } @@ -149,7 +149,7 @@ pub fn _access_pull_counter>>( pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, - summaries: Rc>>>, + summaries: Rc>>, logging: Option ) -> InputHandleCore { InputHandleCore { diff --git a/timely/src/logging.rs b/timely/src/logging.rs index c70904b31..ad20e4664 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use crate::Container; use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::capture::{Event, EventPusher}; +use crate::progress::operate::Connectivity; /// Logs events as a timely stream, with progress statements. pub struct BatchLogger where P: EventPusher { @@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent { /// Worker-unique identifier for the operator. pub id: usize, /// Timestamp action summaries for (input, output) pairs. - pub summary: Vec>>, + pub summary: Connectivity, } #[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 28fd10ae8..1b3848251 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -44,7 +44,7 @@ pub trait Operate : Schedule { /// /// The default behavior is to indicate that timestamps on any input can emerge unchanged on /// any output, and no initial capabilities are held. - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>); + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>); /// Signals that external frontiers have been set. /// @@ -58,6 +58,55 @@ pub trait Operate : Schedule { fn notify_me(&self) -> bool { true } } +/// Operator internal connectivity, from inputs to outputs. +pub type Connectivity = Vec>; +/// Internal connectivity from one port to any number of opposing ports. +#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)] +pub struct PortConnectivity { + tree: std::collections::BTreeMap>, +} + +impl Default for PortConnectivity { + fn default() -> Self { + Self { tree: std::collections::BTreeMap::new() } + } +} + +impl PortConnectivity { + /// Inserts an element by reference, ensuring that the index exists. + pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder { + self.tree.entry(index).or_default().insert(element) + } + /// Inserts an element by reference, ensuring that the index exists. + pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone { + self.tree.entry(index).or_default().insert_ref(element) + } + /// Introduces a summary for `port`. Panics if a summary already exists. + pub fn add_port(&mut self, port: usize, summary: Antichain) { + if !summary.is_empty() { + let prior = self.tree.insert(port, summary); + assert!(prior.is_none()); + } + else { + assert!(self.tree.remove(&port).is_none()); + } + } + /// Borrowing iterator of port identifiers and antichains. + pub fn iter_ports(&self) -> impl Iterator)> { + self.tree.iter().map(|(o,p)| (*o, p)) + } + /// Returns the associated path summary, if it exists. + pub fn get(&self, index: usize) -> Option<&Antichain> { + self.tree.get(&index) + } +} + +impl FromIterator<(usize, Antichain)> for PortConnectivity { + fn from_iter(iter: T) -> Self where T: IntoIterator)> { + Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() } + } +} + /// Progress information shared between parent and child. #[derive(Debug)] pub struct SharedProgress { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 64d197ab6..65450d835 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -17,9 +17,9 @@ //! let mut builder = Builder::::new(); //! //! // Each node with one input connected to one output. -//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); -//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); -//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); +//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]); //! //! // Connect nodes in sequence, looping around to the first from the last. //! builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -79,8 +79,8 @@ use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; use crate::progress::{Location, Port}; - -use crate::progress::frontier::{Antichain, MutableAntichain}; +use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::frontier::MutableAntichain; use crate::progress::timestamp::PathSummary; @@ -113,9 +113,9 @@ use crate::progress::timestamp::PathSummary; /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. -/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); -/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); -/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); +/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -132,7 +132,7 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - pub nodes: Vec>>>, + pub nodes: Vec>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. @@ -156,11 +156,11 @@ impl Builder { /// Add links internal to operators. /// /// This method overwrites any existing summary, instead of anything more sophisticated. - pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec>>) { + pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); - for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } + debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs))); while self.nodes.len() <= index { self.nodes.push(Vec::new()); @@ -195,7 +195,7 @@ impl Builder { /// default summaries (a serious liveness issue). /// /// The optional logger information is baked into the resulting tracker. - pub fn build(self, logger: Option>) -> (Tracker, Vec>>) { + pub fn build(self, logger: Option>) -> (Tracker, Connectivity) { if !self.is_acyclic() { println!("Cycle detected without timestamp increment"); @@ -224,9 +224,9 @@ impl Builder { /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. - /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); - /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); - /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]); + /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); + /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); + /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -253,8 +253,8 @@ impl Builder { /// /// // Two inputs and outputs, only one of which advances. /// builder.add_node(0, 2, 2, vec![ - /// vec![Antichain::from_elem(0),Antichain::new(),], - /// vec![Antichain::new(),Antichain::from_elem(1),], + /// [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(), + /// [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(), /// ]); /// /// // Connect each output to the opposite input. @@ -285,7 +285,7 @@ impl Builder { for (input, outputs) in summary.iter().enumerate() { let target = Location::new_target(index, input); in_degree.entry(target).or_insert(0); - for (output, summaries) in outputs.iter().enumerate() { + for (output, summaries) in outputs.iter_ports() { let source = Location::new_source(index, output); for summary in summaries.elements().iter() { if summary == &Default::default() { @@ -322,7 +322,7 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter().enumerate() { + for (output, summaries) in self.nodes[node][port].iter_ports() { let source = Location::new_source(node, output); for summary in summaries.elements().iter() { if summary == &Default::default() { @@ -361,7 +361,7 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: Vec>>>, + nodes: Vec>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. @@ -433,7 +433,7 @@ pub struct PortInformation { /// Current implications of active pointstamps across the dataflow. pub implications: MutableAntichain, /// Path summaries to each of the scope outputs. - pub output_summaries: Vec>, + pub output_summaries: PortConnectivity, } impl PortInformation { @@ -442,7 +442,7 @@ impl PortInformation { PortInformation { pointstamps: MutableAntichain::new(), implications: MutableAntichain::new(), - output_summaries: Vec::new(), + output_summaries: PortConnectivity::default(), } } @@ -503,7 +503,7 @@ impl Tracker { /// output port. /// /// If the optional logger is provided, it will be used to log various tracker events. - pub fn allocate_from(builder: Builder, logger: Option>) -> (Self, Vec>>) { + pub fn allocate_from(builder: Builder, logger: Option>) -> (Self, Connectivity) { // Allocate buffer space for each input and input port. let mut per_operator = @@ -514,7 +514,7 @@ impl Tracker { .collect::>(); // Summary of scope inputs to scope outputs. - let mut builder_summary = vec![vec![]; builder.shape[0].1]; + let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1]; // Compile summaries from each location to each scope output. let output_summaries = summarize_outputs::(&builder.nodes, &builder.edges); @@ -598,7 +598,7 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter().enumerate() { + for (output, summaries) in operator.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -617,7 +617,7 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter().enumerate() { + for (output, summaries) in operator.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -658,7 +658,7 @@ impl Tracker { for (time, diff) in changes { let nodes = &self.nodes[location.node][port_index]; - for (output_port, summaries) in nodes.iter().enumerate() { + for (output_port, summaries) in nodes.iter_ports() { let source = Location { node: location.node, port: Port::Source(output_port) }; for summary in summaries.elements().iter() { if let Some(new_time) = summary.results_in(&time) { @@ -732,9 +732,9 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &[Vec>>], + nodes: &[Connectivity], edges: &[Vec>], - ) -> HashMap>> + ) -> HashMap> { // A reverse edge map, to allow us to walk back up the dataflow graph. let mut reverse = HashMap::new(); @@ -749,7 +749,20 @@ fn summarize_outputs( } } - let mut results: HashMap>> = HashMap::new(); + // A reverse map from operator outputs to inputs, along their internal summaries. + let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new(); + for (node, connectivity) in nodes.iter().enumerate() { + for (input, outputs) in connectivity.iter().enumerate() { + for (output, summary) in outputs.iter_ports() { + reverse_internal + .entry(Location::new_source(node, output)) + .or_default() + .push((input, summary)); + } + } + } + + let mut results: HashMap> = HashMap::new(); let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); let outputs = @@ -766,58 +779,36 @@ fn summarize_outputs( // Loop until we stop discovering novel reachability paths. while let Some((location, output, summary)) = worklist.pop_front() { - match location.port { // This is an output port of an operator, or a scope input. // We want to crawl up the operator, to its inputs. - Port::Source(output_port) => { - - // Consider each input port of the associated operator. - for (input_port, summaries) in nodes[location.node].iter().enumerate() { - - // Determine the current path summaries from the input port. - let location = Location { node: location.node, port: Port::Target(input_port) }; - let antichains = results - .entry(location) - .and_modify(|antichains| antichains.reserve(output)) - .or_insert_with(|| Vec::with_capacity(output)); - - while antichains.len() <= output { antichains.push(Antichain::new()); } - - // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries[output_port].elements().iter() { - if let Some(combined) = operator_summary.followed_by(&summary) { - if antichains[output].insert(combined.clone()) { - worklist.push_back((location, output, combined)); + Port::Source(_output_port) => { + if let Some(inputs) = reverse_internal.get(&location) { + for (input_port, operator_summary) in inputs.iter() { + let new_location = Location::new_target(location.node, *input_port); + for op_summary in operator_summary.elements().iter() { + if let Some(combined) = op_summary.followed_by(&summary) { + if results.entry(new_location).or_default().insert_ref(output, &combined) { + worklist.push_back((new_location, output, combined)); + } } } } } - - }, + } // This is an input port of an operator, or a scope output. // We want to walk back the edges leading to it. Port::Target(_port) => { - // Each target should have (at most) one source. if let Some(&source) = reverse.get(&location) { - let antichains = results - .entry(source) - .and_modify(|antichains| antichains.reserve(output)) - .or_insert_with(|| Vec::with_capacity(output)); - - while antichains.len() <= output { antichains.push(Antichain::new()); } - - if antichains[output].insert(summary.clone()) { - worklist.push_back((source, output, summary.clone())); + if results.entry(source).or_default().insert_ref(output, &summary) { + worklist.push_back((source, output, summary)); } } - }, } - } results diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index e3082087e..090d36265 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -16,10 +16,10 @@ use crate::logging::TimelySummaryLogger as SummaryLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; -use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter}; +use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; - +use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; @@ -168,7 +168,7 @@ where let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. - let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect(); + let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); @@ -270,7 +270,7 @@ where progcaster: Progcaster, shared_progress: Rc>>, - scope_summary: Vec>>, + scope_summary: Connectivity, progress_mode: ProgressMode, } @@ -546,7 +546,7 @@ where // produces connectivity summaries from inputs to outputs, and reports initial internal // capabilities on each of the outputs (projecting capabilities from contained scopes). - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { // double-check that child 0 (the outside world) is correctly shaped. assert_eq!(self.children[0].outputs, self.inputs()); @@ -555,12 +555,12 @@ where // Note that we need to have `self.inputs()` elements in the summary // with each element containing `self.outputs()` antichains regardless // of how long `self.scope_summary` is - let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; + let mut internal_summary = vec![PortConnectivity::default(); self.inputs()]; for (input_idx, input) in self.scope_summary.iter().enumerate() { - for (output_idx, output) in input.iter().enumerate() { - let antichain = &mut internal_summary[input_idx][output_idx]; - antichain.reserve(output.elements().len()); - antichain.extend(output.elements().iter().cloned().map(TInner::summarize)); + for (output_idx, output) in input.iter_ports() { + for outer in output.elements().iter().cloned().map(TInner::summarize) { + internal_summary[input_idx].insert(output_idx, outer); + } } } @@ -570,8 +570,8 @@ where "the internal summary should have as many elements as there are inputs", ); debug_assert!( - internal_summary.iter().all(|summary| summary.len() == self.outputs()), - "each element of the internal summary should have as many elements as there are outputs", + internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())), + "each element of the internal summary should only reference valid outputs", ); // Each child has expressed initial capabilities (their `shared_progress.internals`). @@ -614,7 +614,7 @@ struct PerOperatorState { shared_progress: Rc>>, - internal_summary: Vec>>, // cached result from get_internal_summary. + internal_summary: Connectivity, // cached result from get_internal_summary. logging: Option, } @@ -671,8 +671,8 @@ impl PerOperatorState { inputs, ); assert!( - !internal_summary.iter().any(|x| x.len() != outputs), - "operator summary had too few outputs", + internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)), + "operator summary references invalid output port", ); PerOperatorState { diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs new file mode 100644 index 000000000..ba2268fb9 --- /dev/null +++ b/timely/tests/shape_scaling.rs @@ -0,0 +1,79 @@ +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Input; +use timely::dataflow::InputHandle; +use timely::Config; + +#[test] fn operator_scaling_1() { operator_scaling(1); } +#[test] fn operator_scaling_10() { operator_scaling(10); } +#[test] fn operator_scaling_100() { operator_scaling(100); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_1000() { operator_scaling(1000); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_10000() { operator_scaling(10000); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_100000() { operator_scaling(100000); } + +fn operator_scaling(scale: u64) { + timely::execute(Config::thread(), move |worker| { + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + use timely::dataflow::operators::Partition; + let parts = + scope + .input_from(&mut input) + .partition(scale, |()| (0, ())); + + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + let mut builder = OperatorBuilder::new("OpScaling".to_owned(), scope.clone()); + let mut handles = Vec::with_capacity(parts.len()); + let mut outputs = Vec::with_capacity(parts.len()); + for (index, part) in parts.into_iter().enumerate() { + use timely::container::CapacityContainerBuilder; + let (output, stream) = builder.new_output_connection::>,_>([]); + use timely::progress::Antichain; + let connectivity = [(index, Antichain::from_elem(Default::default()))]; + handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output)); + outputs.push(stream); + } + + builder.build(move |_| { + move |_frontiers| { + for (input, output) in handles.iter_mut() { + let mut output = output.activate(); + input.for_each(|time, data| { + let mut output = output.session_with_builder(&time); + for datum in data.drain(..) { + output.give(datum); + } + }); + } + } + }); + }); + }) + .unwrap(); +} + +#[test] fn subgraph_scaling_1() { subgraph_scaling(1); } +#[test] fn subgraph_scaling_10() { subgraph_scaling(10); } +#[test] fn subgraph_scaling_100() { subgraph_scaling(100); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_1000() { subgraph_scaling(1000); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_10000() { subgraph_scaling(10000); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_100000() { subgraph_scaling(100000); } + +fn subgraph_scaling(scale: u64) { + timely::execute(Config::thread(), move |worker| { + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + use timely::dataflow::operators::Partition; + let parts = + scope + .input_from(&mut input) + .partition(scale, |()| (0, ())); + + use timely::dataflow::Scope; + let _outputs = scope.region(|inner| { + use timely::dataflow::operators::{Enter, Leave}; + parts.into_iter().map(|part| part.enter(inner).leave()).collect::>() + }); + }); + }) + .unwrap(); +}