From c0b643ef180c22f3057e5b705832be41d5213380 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 24 Mar 2025 12:08:05 -0400 Subject: [PATCH 1/9] Introduce Connectivity and PortConnectivity type aliases --- timely/src/dataflow/operators/capability.rs | 6 +++--- timely/src/dataflow/operators/core/input.rs | 5 ++--- .../operators/core/unordered_input.rs | 5 ++--- .../dataflow/operators/generic/builder_raw.rs | 8 ++++---- .../dataflow/operators/generic/builder_rc.rs | 4 ++-- .../src/dataflow/operators/generic/handles.rs | 6 +++--- timely/src/logging.rs | 3 ++- timely/src/progress/operate.rs | 8 +++++++- timely/src/progress/reachability.rs | 20 +++++++++---------- timely/src/progress/subgraph.rs | 8 ++++---- 10 files changed, 39 insertions(+), 34 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 9ca301ba24..7779a124ec 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -27,9 +27,9 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use crate::order::PartialOrder; -use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; +use crate::progress::operate::PortConnectivity; use crate::scheduling::Activations; use crate::dataflow::channels::pullers::counter::ConsumedGuard; @@ -238,7 +238,7 @@ pub struct InputCapability { /// Output capability buffers, for use in minting capabilities. internal: CapabilityUpdates, /// Timestamp summaries for each output. - summaries: Rc>>>, + summaries: Rc>>, /// A drop guard that updates the consumed capability this InputCapability refers to on drop consumed_guard: ConsumedGuard, } @@ -257,7 +257,7 @@ impl CapabilityTrait for InputCapability { impl InputCapability { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>>, guard: ConsumedGuard) -> Self { + pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>, guard: ConsumedGuard) -> Self { InputCapability { internal, summaries, diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 4d20adc6e7..c5f9744325 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; use crate::scheduling::{Schedule, Activator}; -use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; use crate::progress::Source; - +use crate::progress::operate::Connectivity; use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::{Scope, ScopeParent, StreamCore}; @@ -205,7 +204,7 @@ impl Operate for Operator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); (Vec::new(), Rc::clone(&self.shared_progress)) } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 2c7b26df68..7663a1d69b 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder}; use crate::scheduling::{Schedule, ActivateOnDrop}; -use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; - +use crate::progress::operate::Connectivity; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; @@ -134,7 +133,7 @@ impl Operate for UnorderedOperator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { let mut borrow = self.internal.borrow_mut(); for (time, count) in borrow.drain() { self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 43266683e6..6a0b30b168 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; - +use crate::progress::operate::Connectivity; use crate::Container; use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::channels::pushers::Tee; @@ -60,7 +60,7 @@ pub struct OperatorBuilder { global: usize, address: Rc<[usize]>, // path to the operator (ending with index). shape: OperatorShape, - summary: Vec::Summary>>>, + summary: Connectivity<::Summary>, } impl OperatorBuilder { @@ -188,7 +188,7 @@ where logic: L, shared_progress: Rc>>, activations: Rc>, - summary: Vec>>, + summary: Connectivity, } impl Schedule for OperatorCore @@ -213,7 +213,7 @@ where fn outputs(&self) -> usize { self.shape.outputs } // announce internal topology as fully connected, and hold all default capabilities. - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { // Request the operator to be scheduled at least once. self.activations.borrow_mut().activate(&self.address[..]); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 1df5dc0cca..85eab1f9dd 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability; use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; - +use crate::progress::operate::PortConnectivity; use crate::logging::TimelyLogger as Logger; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; @@ -33,7 +33,7 @@ pub struct OperatorBuilder { consumed: Vec>>>, internal: Rc>>>>>, /// For each input, a shared list of summaries to each output. - summaries: Vec::Summary>>>>>, + summaries: Vec::Summary>>>>, produced: Vec>>>, logging: Option, } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index b8b220e395..8412ca8e7d 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -6,10 +6,10 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; +use crate::progress::operate::PortConnectivity; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; @@ -30,7 +30,7 @@ pub struct InputHandleCore>> { /// /// Each timestamp received through this input may only produce output timestamps /// greater or equal to the input timestamp subjected to at least one of these summaries. - summaries: Rc>>>, + summaries: Rc>>, logging: Option, } @@ -149,7 +149,7 @@ pub fn _access_pull_counter>>( pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, - summaries: Rc>>>, + summaries: Rc>>, logging: Option ) -> InputHandleCore { InputHandleCore { diff --git a/timely/src/logging.rs b/timely/src/logging.rs index c70904b319..ad20e46641 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use crate::Container; use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::capture::{Event, EventPusher}; +use crate::progress::operate::Connectivity; /// Logs events as a timely stream, with progress statements. pub struct BatchLogger where P: EventPusher { @@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent { /// Worker-unique identifier for the operator. pub id: usize, /// Timestamp action summaries for (input, output) pairs. - pub summary: Vec>>, + pub summary: Connectivity, } #[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 28fd10ae8c..78a823c7c0 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -44,7 +44,7 @@ pub trait Operate : Schedule { /// /// The default behavior is to indicate that timestamps on any input can emerge unchanged on /// any output, and no initial capabilities are held. - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>); + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>); /// Signals that external frontiers have been set. /// @@ -58,6 +58,12 @@ pub trait Operate : Schedule { fn notify_me(&self) -> bool { true } } +/// Operator internal connectivity, from inputs to outputs. +pub type Connectivity = Vec>; +/// Internal connectivity from one port to any number of opposing ports. +pub type PortConnectivity = Vec>; + + /// Progress information shared between parent and child. #[derive(Debug)] pub struct SharedProgress { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 64d197ab63..0a194594f2 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -79,7 +79,7 @@ use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; use crate::progress::{Location, Port}; - +use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::timestamp::PathSummary; @@ -132,7 +132,7 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - pub nodes: Vec>>>, + pub nodes: Vec>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. @@ -156,7 +156,7 @@ impl Builder { /// Add links internal to operators. /// /// This method overwrites any existing summary, instead of anything more sophisticated. - pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec>>) { + pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); @@ -195,7 +195,7 @@ impl Builder { /// default summaries (a serious liveness issue). /// /// The optional logger information is baked into the resulting tracker. - pub fn build(self, logger: Option>) -> (Tracker, Vec>>) { + pub fn build(self, logger: Option>) -> (Tracker, Connectivity) { if !self.is_acyclic() { println!("Cycle detected without timestamp increment"); @@ -361,7 +361,7 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: Vec>>>, + nodes: Vec>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. @@ -433,7 +433,7 @@ pub struct PortInformation { /// Current implications of active pointstamps across the dataflow. pub implications: MutableAntichain, /// Path summaries to each of the scope outputs. - pub output_summaries: Vec>, + pub output_summaries: PortConnectivity, } impl PortInformation { @@ -503,7 +503,7 @@ impl Tracker { /// output port. /// /// If the optional logger is provided, it will be used to log various tracker events. - pub fn allocate_from(builder: Builder, logger: Option>) -> (Self, Vec>>) { + pub fn allocate_from(builder: Builder, logger: Option>) -> (Self, Connectivity) { // Allocate buffer space for each input and input port. let mut per_operator = @@ -732,9 +732,9 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &[Vec>>], + nodes: &[Connectivity], edges: &[Vec>], - ) -> HashMap>> + ) -> HashMap> { // A reverse edge map, to allow us to walk back up the dataflow graph. let mut reverse = HashMap::new(); @@ -749,7 +749,7 @@ fn summarize_outputs( } } - let mut results: HashMap>> = HashMap::new(); + let mut results: HashMap> = HashMap::new(); let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); let outputs = diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index e3082087e5..291792e5fc 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -19,7 +19,7 @@ use crate::scheduling::activate::Activations; use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; - +use crate::progress::operate::Connectivity; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; @@ -270,7 +270,7 @@ where progcaster: Progcaster, shared_progress: Rc>>, - scope_summary: Vec>>, + scope_summary: Connectivity, progress_mode: ProgressMode, } @@ -546,7 +546,7 @@ where // produces connectivity summaries from inputs to outputs, and reports initial internal // capabilities on each of the outputs (projecting capabilities from contained scopes). - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { // double-check that child 0 (the outside world) is correctly shaped. assert_eq!(self.children[0].outputs, self.inputs()); @@ -614,7 +614,7 @@ struct PerOperatorState { shared_progress: Rc>>, - internal_summary: Vec>>, // cached result from get_internal_summary. + internal_summary: Connectivity, // cached result from get_internal_summary. logging: Option, } From b42bbc2d2a79c404901c0bed0aff40c7e0d35cb2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 24 Mar 2025 13:51:50 -0400 Subject: [PATCH 2/9] Make PortConnectivity a struct, with sufficient methods --- .../dataflow/operators/generic/builder_raw.rs | 4 +- .../dataflow/operators/generic/builder_rc.rs | 4 +- timely/src/progress/operate.rs | 57 ++++++++++++++++- timely/src/progress/reachability.rs | 62 +++++++++---------- timely/src/progress/subgraph.rs | 16 ++--- 5 files changed, 96 insertions(+), 47 deletions(-) diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 6a0b30b168..8e5ad138f2 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -124,7 +124,7 @@ impl OperatorBuilder { self.shape.inputs += 1; assert_eq!(self.shape.outputs, connection.len()); - self.summary.push(connection); + self.summary.push(connection.into()); receiver } @@ -146,7 +146,7 @@ impl OperatorBuilder { self.shape.outputs += 1; assert_eq!(self.shape.inputs, connection.len()); for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) { - summary.push(entry); + summary.add_port(entry); } (targets, stream) diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 85eab1f9dd..153ee879af 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -86,7 +86,7 @@ impl OperatorBuilder { self.frontier.push(MutableAntichain::new()); self.consumed.push(Rc::clone(input.consumed())); - let shared_summary = Rc::new(RefCell::new(connection)); + let shared_summary = Rc::new(RefCell::new(connection.into())); self.summaries.push(Rc::clone(&shared_summary)); new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone()) @@ -123,7 +123,7 @@ impl OperatorBuilder { self.produced.push(Rc::clone(buffer.inner().produced())); for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { - summary.borrow_mut().push(connection.clone()); + summary.borrow_mut().add_port(connection.clone()); } (OutputWrapper::new(buffer, internal), stream) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 78a823c7c0..6a39a81608 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -61,8 +61,63 @@ pub trait Operate : Schedule { /// Operator internal connectivity, from inputs to outputs. pub type Connectivity = Vec>; /// Internal connectivity from one port to any number of opposing ports. -pub type PortConnectivity = Vec>; +#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)] +pub struct PortConnectivity { + list: Vec>, +} + +impl Default for PortConnectivity { + fn default() -> Self { + Self { list: Vec::new() } + } +} + +impl PortConnectivity { + /// Introduces default summaries for `0 .. count` ports. + pub fn default_for(count: usize) -> Self { + let mut list = Vec::with_capacity(count); + for _ in 0 .. count { list.push(Default::default()) } + Self { list } + } + /// Ensures an entry exists at `index` and returns a mutable reference to it. + pub fn ensure(&mut self, index: usize) -> &mut Antichain { + while self.next_port() <= index { self.add_port(Antichain::new()); } + &mut self.list[index] + } + /// Inserts an element by reference, ensuring that the index exists. + pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone { + self.ensure(index).insert_ref(element) + } + /// Introduces a summary for the port `self.next_port()`. + pub fn add_port(&mut self, summary: Antichain) { + self.list.push(summary); + } + /// Borrowing iterator of port identifiers and antichains. + pub fn iter_ports(&self) -> impl Iterator)> { + self.list.iter().enumerate() + } + /// Owning iterator of port identifiers and antichains. + pub fn into_iter_ports(self) -> impl Iterator)> { + self.list.into_iter().enumerate() + } + /// Announces the next output port identifier. + pub fn next_port(&self) -> usize { + self.list.len() + } +} +impl From>> for PortConnectivity { + fn from(list: Vec>) -> Self { + Self { list } + } +} + +impl std::ops::Index for PortConnectivity { + type Output = Antichain; + fn index(&self, index: usize) -> &Self::Output { + &self.list[index] + } +} /// Progress information shared between parent and child. #[derive(Debug)] diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 0a194594f2..4564161d78 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -17,9 +17,9 @@ //! let mut builder = Builder::::new(); //! //! // Each node with one input connected to one output. -//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); -//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); -//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); +//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); +//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); +//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]); //! //! // Connect nodes in sequence, looping around to the first from the last. //! builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -80,7 +80,7 @@ use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; use crate::progress::{Location, Port}; use crate::progress::operate::{Connectivity, PortConnectivity}; -use crate::progress::frontier::{Antichain, MutableAntichain}; +use crate::progress::frontier::MutableAntichain; use crate::progress::timestamp::PathSummary; @@ -113,9 +113,9 @@ use crate::progress::timestamp::PathSummary; /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. -/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); -/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); -/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); +/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); +/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); +/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -160,7 +160,7 @@ impl Builder { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); - for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } + for x in summary.iter() { debug_assert_eq!(outputs, x.next_port()); } while self.nodes.len() <= index { self.nodes.push(Vec::new()); @@ -224,9 +224,9 @@ impl Builder { /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. - /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); - /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); - /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]); + /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); + /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); + /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -253,8 +253,8 @@ impl Builder { /// /// // Two inputs and outputs, only one of which advances. /// builder.add_node(0, 2, 2, vec![ - /// vec![Antichain::from_elem(0),Antichain::new(),], - /// vec![Antichain::new(),Antichain::from_elem(1),], + /// vec![Antichain::from_elem(0),Antichain::new(),].into(), + /// vec![Antichain::new(),Antichain::from_elem(1),].into(), /// ]); /// /// // Connect each output to the opposite input. @@ -285,7 +285,7 @@ impl Builder { for (input, outputs) in summary.iter().enumerate() { let target = Location::new_target(index, input); in_degree.entry(target).or_insert(0); - for (output, summaries) in outputs.iter().enumerate() { + for (output, summaries) in outputs.iter_ports() { let source = Location::new_source(index, output); for summary in summaries.elements().iter() { if summary == &Default::default() { @@ -322,7 +322,7 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter().enumerate() { + for (output, summaries) in self.nodes[node][port].iter_ports() { let source = Location::new_source(node, output); for summary in summaries.elements().iter() { if summary == &Default::default() { @@ -442,7 +442,7 @@ impl PortInformation { PortInformation { pointstamps: MutableAntichain::new(), implications: MutableAntichain::new(), - output_summaries: Vec::new(), + output_summaries: PortConnectivity::default(), } } @@ -514,7 +514,7 @@ impl Tracker { .collect::>(); // Summary of scope inputs to scope outputs. - let mut builder_summary = vec![vec![]; builder.shape[0].1]; + let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1]; // Compile summaries from each location to each scope output. let output_summaries = summarize_outputs::(&builder.nodes, &builder.edges); @@ -598,7 +598,7 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter().enumerate() { + for (output, summaries) in operator.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -617,7 +617,7 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter().enumerate() { + for (output, summaries) in operator.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -658,7 +658,7 @@ impl Tracker { for (time, diff) in changes { let nodes = &self.nodes[location.node][port_index]; - for (output_port, summaries) in nodes.iter().enumerate() { + for (output_port, summaries) in nodes.iter_ports() { let source = Location { node: location.node, port: Port::Source(output_port) }; for summary in summaries.elements().iter() { if let Some(new_time) = summary.results_in(&time) { @@ -778,17 +778,14 @@ fn summarize_outputs( // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; - let antichains = results - .entry(location) - .and_modify(|antichains| antichains.reserve(output)) - .or_insert_with(|| Vec::with_capacity(output)); - - while antichains.len() <= output { antichains.push(Antichain::new()); } + let antichains = results.entry(location).or_default(); + // TODO: This is redundant with `insert_ref` below. + antichains.ensure(output); // Combine each operator-internal summary to the output with `summary`. for operator_summary in summaries[output_port].elements().iter() { if let Some(combined) = operator_summary.followed_by(&summary) { - if antichains[output].insert(combined.clone()) { + if antichains.insert_ref(output, &combined) { worklist.push_back((location, output, combined)); } } @@ -803,14 +800,11 @@ fn summarize_outputs( // Each target should have (at most) one source. if let Some(&source) = reverse.get(&location) { - let antichains = results - .entry(source) - .and_modify(|antichains| antichains.reserve(output)) - .or_insert_with(|| Vec::with_capacity(output)); - - while antichains.len() <= output { antichains.push(Antichain::new()); } + let antichains = results.entry(source).or_default(); + // TODO: This is redundant with `insert_ref` below. + antichains.ensure(output); - if antichains[output].insert(summary.clone()) { + if antichains.insert_ref(output, &summary) { worklist.push_back((source, output, summary.clone())); } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 291792e5fc..11c8af7c2b 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -16,10 +16,10 @@ use crate::logging::TimelySummaryLogger as SummaryLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; -use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter}; +use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; -use crate::progress::operate::Connectivity; +use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; @@ -168,7 +168,7 @@ where let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. - let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect(); + let summary = (0..outputs).map(|_| PortConnectivity::default_for(inputs)).collect(); builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); @@ -555,10 +555,10 @@ where // Note that we need to have `self.inputs()` elements in the summary // with each element containing `self.outputs()` antichains regardless // of how long `self.scope_summary` is - let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; + let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()]; for (input_idx, input) in self.scope_summary.iter().enumerate() { - for (output_idx, output) in input.iter().enumerate() { - let antichain = &mut internal_summary[input_idx][output_idx]; + for (output_idx, output) in input.iter_ports() { + let antichain = internal_summary[input_idx].ensure(output_idx); antichain.reserve(output.elements().len()); antichain.extend(output.elements().iter().cloned().map(TInner::summarize)); } @@ -570,7 +570,7 @@ where "the internal summary should have as many elements as there are inputs", ); debug_assert!( - internal_summary.iter().all(|summary| summary.len() == self.outputs()), + internal_summary.iter().all(|summary| summary.next_port() == self.outputs()), "each element of the internal summary should have as many elements as there are outputs", ); @@ -671,7 +671,7 @@ impl PerOperatorState { inputs, ); assert!( - !internal_summary.iter().any(|x| x.len() != outputs), + !internal_summary.iter().any(|x| x.next_port() != outputs), "operator summary had too few outputs", ); From b7191563a6de850d4c27945db0c0f7972099f265 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 09:32:53 -0400 Subject: [PATCH 3/9] Make PortConnectivity API more explicit --- .../dataflow/operators/generic/builder_raw.rs | 8 +++---- .../dataflow/operators/generic/builder_rc.rs | 4 ++-- timely/src/progress/operate.rs | 23 ++++++++++--------- timely/src/progress/reachability.rs | 6 +---- timely/src/progress/subgraph.rs | 14 +++++------ 5 files changed, 26 insertions(+), 29 deletions(-) diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e5ad138f2..49283868fe 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -138,15 +138,15 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { - + let new_output = self.shape.outputs; + self.shape.outputs += 1; let (targets, registrar) = Tee::::new(); - let source = Source::new(self.index, self.shape.outputs); + let source = Source::new(self.index, new_output); let stream = StreamCore::new(source, registrar, self.scope.clone()); - self.shape.outputs += 1; assert_eq!(self.shape.inputs, connection.len()); for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) { - summary.add_port(entry); + summary.add_port(new_output, entry); } (targets, stream) diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 153ee879af..15e28ea22b 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -113,7 +113,7 @@ impl OperatorBuilder { OutputWrapper>, StreamCore ) { - + let new_output = self.shape().outputs(); let (tee, stream) = self.builder.new_output_connection(connection.clone()); let internal = Rc::new(RefCell::new(ChangeBatch::new())); @@ -123,7 +123,7 @@ impl OperatorBuilder { self.produced.push(Rc::clone(buffer.inner().produced())); for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { - summary.borrow_mut().add_port(connection.clone()); + summary.borrow_mut().add_port(new_output, connection.clone()); } (OutputWrapper::new(buffer, internal), stream) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 6a39a81608..8e8a1e8a0c 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -80,29 +80,30 @@ impl PortConnectivity { Self { list } } /// Ensures an entry exists at `index` and returns a mutable reference to it. - pub fn ensure(&mut self, index: usize) -> &mut Antichain { - while self.next_port() <= index { self.add_port(Antichain::new()); } + fn ensure(&mut self, index: usize) -> &mut Antichain { + while self.list.len() <= index { self.add_port(self.list.len(), Antichain::new()); } &mut self.list[index] } /// Inserts an element by reference, ensuring that the index exists. + pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder + Clone { + self.ensure(index).insert(element) + } + /// Inserts an element by reference, ensuring that the index exists. pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone { self.ensure(index).insert_ref(element) } - /// Introduces a summary for the port `self.next_port()`. - pub fn add_port(&mut self, summary: Antichain) { + /// Introduces a summary for `port`. Panics if a summary already exists. + pub fn add_port(&mut self, port: usize, summary: Antichain) { + assert_eq!(self.list.len(), port); self.list.push(summary); } /// Borrowing iterator of port identifiers and antichains. pub fn iter_ports(&self) -> impl Iterator)> { self.list.iter().enumerate() } - /// Owning iterator of port identifiers and antichains. - pub fn into_iter_ports(self) -> impl Iterator)> { - self.list.into_iter().enumerate() - } - /// Announces the next output port identifier. - pub fn next_port(&self) -> usize { - self.list.len() + /// Announces the largest port identifier, largely for debug asserts. + pub fn max_port(&self) -> usize { + self.list.len() - 1 } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 4564161d78..8385956bc8 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -160,7 +160,7 @@ impl Builder { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); - for x in summary.iter() { debug_assert_eq!(outputs, x.next_port()); } + debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs))); while self.nodes.len() <= index { self.nodes.push(Vec::new()); @@ -779,8 +779,6 @@ fn summarize_outputs( // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; let antichains = results.entry(location).or_default(); - // TODO: This is redundant with `insert_ref` below. - antichains.ensure(output); // Combine each operator-internal summary to the output with `summary`. for operator_summary in summaries[output_port].elements().iter() { @@ -801,8 +799,6 @@ fn summarize_outputs( // Each target should have (at most) one source. if let Some(&source) = reverse.get(&location) { let antichains = results.entry(source).or_default(); - // TODO: This is redundant with `insert_ref` below. - antichains.ensure(output); if antichains.insert_ref(output, &summary) { worklist.push_back((source, output, summary.clone())); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 11c8af7c2b..ec95eed041 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -558,9 +558,9 @@ where let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()]; for (input_idx, input) in self.scope_summary.iter().enumerate() { for (output_idx, output) in input.iter_ports() { - let antichain = internal_summary[input_idx].ensure(output_idx); - antichain.reserve(output.elements().len()); - antichain.extend(output.elements().iter().cloned().map(TInner::summarize)); + for outer in output.elements().iter().cloned().map(TInner::summarize) { + internal_summary[input_idx].insert(output_idx, outer); + } } } @@ -570,8 +570,8 @@ where "the internal summary should have as many elements as there are inputs", ); debug_assert!( - internal_summary.iter().all(|summary| summary.next_port() == self.outputs()), - "each element of the internal summary should have as many elements as there are outputs", + internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())), + "each element of the internal summary should only reference valid outputs", ); // Each child has expressed initial capabilities (their `shared_progress.internals`). @@ -671,8 +671,8 @@ impl PerOperatorState { inputs, ); assert!( - !internal_summary.iter().any(|x| x.next_port() != outputs), - "operator summary had too few outputs", + internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)), + "operator summary references invalid output port", ); PerOperatorState { From c6d7e7304affa126c9399d803df0bf5b591f6c14 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 09:47:04 -0400 Subject: [PATCH 4/9] Support optional path summaries for disconnected ports --- timely/src/dataflow/operators/capability.rs | 36 ++++++++++++++------- timely/src/progress/operate.rs | 17 +++------- timely/src/progress/reachability.rs | 10 +++--- timely/src/progress/subgraph.rs | 4 +-- 4 files changed, 36 insertions(+), 31 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 7779a124ec..8ba9937264 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -246,11 +246,13 @@ pub struct InputCapability { impl CapabilityTrait for InputCapability { fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - let borrow = self.summaries.borrow(); - self.internal.borrow().iter().enumerate().any(|(index, rc)| { - // To be valid, the output buffer must match and the timestamp summary needs to be the default. - Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default() - }) + let summaries_borrow = self.summaries.borrow(); + let internal_borrow = self.internal.borrow(); + // To be valid, the output buffer must match and the timestamp summary needs to be the default. + let result = summaries_borrow.iter_ports().any(|(port, path)| { + Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default() + }); + result } } @@ -281,10 +283,15 @@ impl InputCapability { /// Delays capability for a specific output port. pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; - if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { - Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) - } else { - panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time()); + if let Some(path) = self.summaries.borrow().get(output_port) { + if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { + Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) + } else { + panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time()); + } + } + else { + panic!("Attempted to delay a capability for a disconnected output"); } } @@ -305,11 +312,16 @@ impl InputCapability { pub fn retain_for_output(self, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; let self_time = self.time().clone(); - if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { - Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) + if let Some(path) = self.summaries.borrow().get(output_port) { + if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { + Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) + } + else { + panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time); + } } else { - panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time); + panic!("Attempted to retain a capability for a disconnected output"); } } } diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 8e8a1e8a0c..247345374c 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -73,12 +73,6 @@ impl Default for PortConnectivity { } impl PortConnectivity { - /// Introduces default summaries for `0 .. count` ports. - pub fn default_for(count: usize) -> Self { - let mut list = Vec::with_capacity(count); - for _ in 0 .. count { list.push(Default::default()) } - Self { list } - } /// Ensures an entry exists at `index` and returns a mutable reference to it. fn ensure(&mut self, index: usize) -> &mut Antichain { while self.list.len() <= index { self.add_port(self.list.len(), Antichain::new()); } @@ -105,6 +99,10 @@ impl PortConnectivity { pub fn max_port(&self) -> usize { self.list.len() - 1 } + /// Returns the associated path summary, if it exists. + pub fn get(&self, index: usize) -> Option<&Antichain> { + self.list.get(index) + } } impl From>> for PortConnectivity { @@ -113,13 +111,6 @@ impl From>> for PortConnectivity { } } -impl std::ops::Index for PortConnectivity { - type Output = Antichain; - fn index(&self, index: usize) -> &Self::Output { - &self.list[index] - } -} - /// Progress information shared between parent and child. #[derive(Debug)] pub struct SharedProgress { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 8385956bc8..07a5d67e5b 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -781,10 +781,12 @@ fn summarize_outputs( let antichains = results.entry(location).or_default(); // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries[output_port].elements().iter() { - if let Some(combined) = operator_summary.followed_by(&summary) { - if antichains.insert_ref(output, &combined) { - worklist.push_back((location, output, combined)); + if let Some(connection) = summaries.get(output_port) { + for operator_summary in connection.elements().iter() { + if let Some(combined) = operator_summary.followed_by(&summary) { + if antichains.insert_ref(output, &combined) { + worklist.push_back((location, output, combined)); + } } } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index ec95eed041..090d36265a 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -168,7 +168,7 @@ where let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. - let summary = (0..outputs).map(|_| PortConnectivity::default_for(inputs)).collect(); + let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); @@ -555,7 +555,7 @@ where // Note that we need to have `self.inputs()` elements in the summary // with each element containing `self.outputs()` antichains regardless // of how long `self.scope_summary` is - let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()]; + let mut internal_summary = vec![PortConnectivity::default(); self.inputs()]; for (input_idx, input) in self.scope_summary.iter().enumerate() { for (output_idx, output) in input.iter_ports() { for outer in output.elements().iter().cloned().map(TInner::summarize) { From a3c83f873633a2b3586d9ff757fc3aaa6e81e4c0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 09:57:47 -0400 Subject: [PATCH 5/9] Swap PortConnectivity implementation from Vec to BTreeMap --- timely/src/progress/operate.rs | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 247345374c..1e352bbe49 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -63,51 +63,47 @@ pub type Connectivity = Vec>; /// Internal connectivity from one port to any number of opposing ports. #[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)] pub struct PortConnectivity { - list: Vec>, + tree: std::collections::BTreeMap>, } impl Default for PortConnectivity { fn default() -> Self { - Self { list: Vec::new() } + Self { tree: std::collections::BTreeMap::new() } } } impl PortConnectivity { - /// Ensures an entry exists at `index` and returns a mutable reference to it. - fn ensure(&mut self, index: usize) -> &mut Antichain { - while self.list.len() <= index { self.add_port(self.list.len(), Antichain::new()); } - &mut self.list[index] - } /// Inserts an element by reference, ensuring that the index exists. pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder + Clone { - self.ensure(index).insert(element) + self.tree.entry(index).or_default().insert(element) } /// Inserts an element by reference, ensuring that the index exists. pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone { - self.ensure(index).insert_ref(element) + self.tree.entry(index).or_default().insert_ref(element) } /// Introduces a summary for `port`. Panics if a summary already exists. pub fn add_port(&mut self, port: usize, summary: Antichain) { - assert_eq!(self.list.len(), port); - self.list.push(summary); + if !summary.is_empty() { + let prior = self.tree.insert(port, summary); + assert!(prior.is_none()); + } + else { + assert!(self.tree.remove(&port).is_none()); + } } /// Borrowing iterator of port identifiers and antichains. pub fn iter_ports(&self) -> impl Iterator)> { - self.list.iter().enumerate() - } - /// Announces the largest port identifier, largely for debug asserts. - pub fn max_port(&self) -> usize { - self.list.len() - 1 + self.tree.iter().map(|(o,p)| (*o, p)) } /// Returns the associated path summary, if it exists. pub fn get(&self, index: usize) -> Option<&Antichain> { - self.list.get(index) + self.tree.get(&index) } } impl From>> for PortConnectivity { fn from(list: Vec>) -> Self { - Self { list } + Self { tree: list.into_iter().enumerate().filter(|(_,p)| !p.is_empty()).collect() } } } From f8a234a44fadde754e73f2f82545ecb6ab3f52b2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 11:31:49 -0400 Subject: [PATCH 6/9] Adjust reachability logic to be more linear --- timely/src/progress/reachability.rs | 49 ++++++++++++++--------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 07a5d67e5b..5fe608a843 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -749,6 +749,19 @@ fn summarize_outputs( } } + // A reverse map from operator outputs to inputs, along their internal summaries. + let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new(); + for (node, connectivity) in nodes.iter().enumerate() { + for (input, outputs) in connectivity.iter().enumerate() { + for (output, summary) in outputs.iter_ports() { + reverse_internal + .entry(Location::new_source(node, output)) + .or_default() + .push((input, summary)); + } + } + } + let mut results: HashMap> = HashMap::new(); let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); @@ -766,50 +779,36 @@ fn summarize_outputs( // Loop until we stop discovering novel reachability paths. while let Some((location, output, summary)) = worklist.pop_front() { - match location.port { // This is an output port of an operator, or a scope input. // We want to crawl up the operator, to its inputs. - Port::Source(output_port) => { - - // Consider each input port of the associated operator. - for (input_port, summaries) in nodes[location.node].iter().enumerate() { - - // Determine the current path summaries from the input port. - let location = Location { node: location.node, port: Port::Target(input_port) }; - let antichains = results.entry(location).or_default(); - - // Combine each operator-internal summary to the output with `summary`. - if let Some(connection) = summaries.get(output_port) { - for operator_summary in connection.elements().iter() { - if let Some(combined) = operator_summary.followed_by(&summary) { - if antichains.insert_ref(output, &combined) { - worklist.push_back((location, output, combined)); + Port::Source(_output_port) => { + if let Some(inputs) = reverse_internal.get(&location) { + for (input_port, operator_summary) in inputs.iter() { + let new_location = Location::new_target(location.node, *input_port); + for op_summary in operator_summary.elements().iter() { + if let Some(combined) = op_summary.followed_by(&summary) { + if results.entry(new_location).or_default().insert_ref(output, &combined) { + worklist.push_back((new_location, output, combined)); } } } } } - - }, + } // This is an input port of an operator, or a scope output. // We want to walk back the edges leading to it. Port::Target(_port) => { - // Each target should have (at most) one source. if let Some(&source) = reverse.get(&location) { - let antichains = results.entry(source).or_default(); - - if antichains.insert_ref(output, &summary) { - worklist.push_back((source, output, summary.clone())); + if results.entry(source).or_default().insert_ref(output, &summary) { + worklist.push_back((source, output, summary)); } } - }, } - } results From aa08b30f9a101b925adb5f644ae5a8082ff4dd83 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 12:41:51 -0400 Subject: [PATCH 7/9] Switch builder API to port-identified iterators --- .../src/dataflow/operators/core/feedback.rs | 2 +- .../dataflow/operators/generic/builder_raw.rs | 33 +++++++++++-------- .../dataflow/operators/generic/builder_rc.rs | 29 ++++++++-------- timely/src/progress/operate.rs | 6 ++-- timely/src/progress/reachability.rs | 22 ++++++------- 5 files changed, 49 insertions(+), 43 deletions(-) diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index e008b30291..8fdd4f7087 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -113,7 +113,7 @@ impl ConnectLoop for StreamCore { let summary = handle.summary; let mut output = handle.output; - let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); + let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 49283868fe..4659383a95 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; -use crate::progress::operate::Connectivity; +use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::Container; use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::channels::pushers::Tee; @@ -105,17 +105,19 @@ impl OperatorBuilder { /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller - where - P: ParallelizationContract { - let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; + where + P: ParallelizationContract + { + let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> P::Puller where - P: ParallelizationContract { - + P: ParallelizationContract, + I: IntoIterator::Summary>)>, + { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging); @@ -123,8 +125,9 @@ impl OperatorBuilder { stream.connect_to(target, sender, channel_id); self.shape.inputs += 1; - assert_eq!(self.shape.outputs, connection.len()); - self.summary.push(connection.into()); + let connectivity: PortConnectivity<_> = connection.into_iter().collect(); + assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs)); + self.summary.push(connectivity); receiver } @@ -132,21 +135,23 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (Tee, StreamCore) { - let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; + let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { + pub fn new_output_connection(&mut self, connection: I) -> (Tee, StreamCore) + where + I: IntoIterator::Summary>)>, + { let new_output = self.shape.outputs; self.shape.outputs += 1; let (targets, registrar) = Tee::::new(); let source = Source::new(self.index, new_output); let stream = StreamCore::new(source, registrar, self.scope.clone()); - assert_eq!(self.shape.inputs, connection.len()); - for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) { - summary.add_port(new_output, entry); + for (input, entry) in connection { + self.summary[input].add_port(new_output, entry); } (targets, stream) diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 15e28ea22b..66957ddf5e 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -64,7 +64,7 @@ impl OperatorBuilder { where P: ParallelizationContract { - let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect(); + let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default()))); self.new_input_connection(stream, pact, connection) } @@ -76,17 +76,18 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore - where - P: ParallelizationContract { - + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> InputHandleCore + where + P: ParallelizationContract, + I: IntoIterator::Summary>)> + Clone, + { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); let input = PullCounter::new(puller); self.frontier.push(MutableAntichain::new()); self.consumed.push(Rc::clone(input.consumed())); - let shared_summary = Rc::new(RefCell::new(connection.into())); + let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect())); self.summaries.push(Rc::clone(&shared_summary)); new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone()) @@ -94,7 +95,7 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { - let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect(); + let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -106,13 +107,13 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection( - &mut self, - connection: Vec::Summary>> - ) -> ( + pub fn new_output_connection(&mut self, connection: I) -> ( OutputWrapper>, StreamCore - ) { + ) + where + I: IntoIterator::Summary>)> + Clone, + { let new_output = self.shape().outputs(); let (tee, stream) = self.builder.new_output_connection(connection.clone()); @@ -122,8 +123,8 @@ impl OperatorBuilder { let mut buffer = PushBuffer::new(PushCounter::new(tee)); self.produced.push(Rc::clone(buffer.inner().produced())); - for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { - summary.borrow_mut().add_port(new_output, connection.clone()); + for (input, entry) in connection { + self.summaries[input].borrow_mut().add_port(new_output, entry); } (OutputWrapper::new(buffer, internal), stream) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 1e352bbe49..633d743bdc 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -101,9 +101,9 @@ impl PortConnectivity { } } -impl From>> for PortConnectivity { - fn from(list: Vec>) -> Self { - Self { tree: list.into_iter().enumerate().filter(|(_,p)| !p.is_empty()).collect() } +impl FromIterator<(usize, Antichain)> for PortConnectivity { + fn from_iter(iter: T) -> Self where T: IntoIterator)> { + Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() } } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 5fe608a843..65450d835c 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -17,9 +17,9 @@ //! let mut builder = Builder::::new(); //! //! // Each node with one input connected to one output. -//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); -//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); -//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]); +//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]); //! //! // Connect nodes in sequence, looping around to the first from the last. //! builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -113,9 +113,9 @@ use crate::progress::timestamp::PathSummary; /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. -/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); -/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); -/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]); +/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); +/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -224,9 +224,9 @@ impl Builder { /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. - /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); - /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); - /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)].into()]); + /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); + /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); + /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source::new(0, 0), Target::new(1, 0)); @@ -253,8 +253,8 @@ impl Builder { /// /// // Two inputs and outputs, only one of which advances. /// builder.add_node(0, 2, 2, vec![ - /// vec![Antichain::from_elem(0),Antichain::new(),].into(), - /// vec![Antichain::new(),Antichain::from_elem(1),].into(), + /// [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(), + /// [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(), /// ]); /// /// // Connect each output to the opposite input. From 59218d19a56009a63f426252402707f03b50d888 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 25 Mar 2025 16:06:40 -0400 Subject: [PATCH 8/9] Add scaling test --- timely/tests/shape_scaling.rs | 79 +++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 timely/tests/shape_scaling.rs diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs new file mode 100644 index 0000000000..ba2268fb9e --- /dev/null +++ b/timely/tests/shape_scaling.rs @@ -0,0 +1,79 @@ +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Input; +use timely::dataflow::InputHandle; +use timely::Config; + +#[test] fn operator_scaling_1() { operator_scaling(1); } +#[test] fn operator_scaling_10() { operator_scaling(10); } +#[test] fn operator_scaling_100() { operator_scaling(100); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_1000() { operator_scaling(1000); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_10000() { operator_scaling(10000); } +#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_100000() { operator_scaling(100000); } + +fn operator_scaling(scale: u64) { + timely::execute(Config::thread(), move |worker| { + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + use timely::dataflow::operators::Partition; + let parts = + scope + .input_from(&mut input) + .partition(scale, |()| (0, ())); + + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + let mut builder = OperatorBuilder::new("OpScaling".to_owned(), scope.clone()); + let mut handles = Vec::with_capacity(parts.len()); + let mut outputs = Vec::with_capacity(parts.len()); + for (index, part) in parts.into_iter().enumerate() { + use timely::container::CapacityContainerBuilder; + let (output, stream) = builder.new_output_connection::>,_>([]); + use timely::progress::Antichain; + let connectivity = [(index, Antichain::from_elem(Default::default()))]; + handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output)); + outputs.push(stream); + } + + builder.build(move |_| { + move |_frontiers| { + for (input, output) in handles.iter_mut() { + let mut output = output.activate(); + input.for_each(|time, data| { + let mut output = output.session_with_builder(&time); + for datum in data.drain(..) { + output.give(datum); + } + }); + } + } + }); + }); + }) + .unwrap(); +} + +#[test] fn subgraph_scaling_1() { subgraph_scaling(1); } +#[test] fn subgraph_scaling_10() { subgraph_scaling(10); } +#[test] fn subgraph_scaling_100() { subgraph_scaling(100); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_1000() { subgraph_scaling(1000); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_10000() { subgraph_scaling(10000); } +#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_100000() { subgraph_scaling(100000); } + +fn subgraph_scaling(scale: u64) { + timely::execute(Config::thread(), move |worker| { + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + use timely::dataflow::operators::Partition; + let parts = + scope + .input_from(&mut input) + .partition(scale, |()| (0, ())); + + use timely::dataflow::Scope; + let _outputs = scope.region(|inner| { + use timely::dataflow::operators::{Enter, Leave}; + parts.into_iter().map(|part| part.enter(inner).leave()).collect::>() + }); + }); + }) + .unwrap(); +} From 62fef0747348a491335455067517850af8f98454 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 26 Mar 2025 13:44:17 -0400 Subject: [PATCH 9/9] Update timely/src/progress/operate.rs Co-authored-by: Moritz Hoffmann --- timely/src/progress/operate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 633d743bdc..1b38482513 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -74,7 +74,7 @@ impl Default for PortConnectivity { impl PortConnectivity { /// Inserts an element by reference, ensuring that the index exists. - pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder + Clone { + pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder { self.tree.entry(index).or_default().insert(element) } /// Inserts an element by reference, ensuring that the index exists.