Skip to content

Linear reachability, take 2 #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
42 changes: 27 additions & 15 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use std::cell::RefCell;
use std::fmt::{self, Debug};

use crate::order::PartialOrder;
use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::operate::PortConnectivity;
use crate::scheduling::Activations;
use crate::dataflow::channels::pullers::counter::ConsumedGuard;

Expand Down Expand Up @@ -238,26 +238,28 @@ pub struct InputCapability<T: Timestamp> {
/// Output capability buffers, for use in minting capabilities.
internal: CapabilityUpdates<T>,
/// Timestamp summaries for each output.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
let borrow = self.summaries.borrow();
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
})
let summaries_borrow = self.summaries.borrow();
let internal_borrow = self.internal.borrow();
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
let result = summaries_borrow.iter_ports().any(|(port, path)| {
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
});
result
}
}

impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
summaries,
Expand All @@ -281,10 +283,15 @@ impl<T: Timestamp> InputCapability<T> {
/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
}
}
else {
panic!("Attempted to delay a capability for a disconnected output");
}
}

Expand All @@ -305,11 +312,16 @@ impl<T: Timestamp> InputCapability<T> {
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
let self_time = self.time().clone();
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
}
else {
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
}
}
else {
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
panic!("Attempted to retain a capability for a disconnected output");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
let summary = handle.summary;
let mut output = handle.output;

let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);

builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};

use crate::scheduling::{Schedule, Activator};

use crate::progress::frontier::Antichain;
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
use crate::progress::Source;

use crate::progress::operate::Connectivity;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::{Scope, ScopeParent, StreamCore};
Expand Down Expand Up @@ -205,7 +204,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
(Vec::new(), Rc::clone(&self.shared_progress))
}
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};

use crate::scheduling::{Schedule, ActivateOnDrop};

use crate::progress::frontier::Antichain;
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
use crate::progress::Source;
use crate::progress::ChangeBatch;

use crate::progress::operate::Connectivity;
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};

Expand Down Expand Up @@ -134,7 +133,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
let mut borrow = self.internal.borrow_mut();
for (time, count) in borrow.drain() {
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
Expand Down
45 changes: 25 additions & 20 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};

use crate::progress::{Source, Target};
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};

use crate::progress::operate::{Connectivity, PortConnectivity};
use crate::Container;
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::channels::pushers::Tee;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct OperatorBuilder<G: Scope> {
global: usize,
address: Rc<[usize]>, // path to the operator (ending with index).
shape: OperatorShape,
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
}

impl<G: Scope> OperatorBuilder<G> {
Expand Down Expand Up @@ -105,48 +105,53 @@ impl<G: Scope> OperatorBuilder<G> {

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
where
P: ParallelizationContract<G::Timestamp, C> {
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
where
P: ParallelizationContract<G::Timestamp, C>
{
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> P::Puller
where
P: ParallelizationContract<G::Timestamp, C> {

P: ParallelizationContract<G::Timestamp, C>,
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
{
let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);

self.shape.inputs += 1;
assert_eq!(self.shape.outputs, connection.len());
self.summary.push(connection);
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
self.summary.push(connectivity);

receiver
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {

let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {

pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, StreamCore<G, C>)
where
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
{
let new_output = self.shape.outputs;
self.shape.outputs += 1;
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
let source = Source::new(self.index, self.shape.outputs);
let source = Source::new(self.index, new_output);
let stream = StreamCore::new(source, registrar, self.scope.clone());

self.shape.outputs += 1;
assert_eq!(self.shape.inputs, connection.len());
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
summary.push(entry);
for (input, entry) in connection {
self.summary[input].add_port(new_output, entry);
}

(targets, stream)
Expand Down Expand Up @@ -188,7 +193,7 @@ where
logic: L,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
activations: Rc<RefCell<Activations>>,
summary: Vec<Vec<Antichain<T::Summary>>>,
summary: Connectivity<T::Summary>,
}

impl<T, L> Schedule for OperatorCore<T, L>
Expand All @@ -213,7 +218,7 @@ where
fn outputs(&self) -> usize { self.shape.outputs }

// announce internal topology as fully connected, and hold all default capabilities.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {

// Request the operator to be scheduled at least once.
self.activations.borrow_mut().activate(&self.address[..]);
Expand Down
35 changes: 18 additions & 17 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;

use crate::progress::operate::PortConnectivity;
use crate::logging::TimelyLogger as Logger;

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
Expand All @@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
/// For each input, a shared list of summaries to each output.
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
logging: Option<Logger>,
}
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
where
P: ParallelizationContract<G::Timestamp, C> {

let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}

Expand All @@ -76,25 +76,26 @@ impl<G: Scope> OperatorBuilder<G> {
///
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
/// antichain indicating that there is no connection from the input to the output.
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C> {

pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C>,
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
{
let puller = self.builder.new_input_connection(stream, pact, connection.clone());

let input = PullCounter::new(puller);
self.frontier.push(MutableAntichain::new());
self.consumed.push(Rc::clone(input.consumed()));

let shared_summary = Rc::new(RefCell::new(connection));
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
self.summaries.push(Rc::clone(&shared_summary));

new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}

Expand All @@ -106,14 +107,14 @@ impl<G: Scope> OperatorBuilder<G> {
///
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
/// antichain indicating that there is no connection from the input to the output.
pub fn new_output_connection<CB: ContainerBuilder>(
&mut self,
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
) -> (
pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
StreamCore<G, CB::Container>
) {

)
where
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
{
let new_output = self.shape().outputs();
let (tee, stream) = self.builder.new_output_connection(connection.clone());

let internal = Rc::new(RefCell::new(ChangeBatch::new()));
Expand All @@ -122,8 +123,8 @@ impl<G: Scope> OperatorBuilder<G> {
let mut buffer = PushBuffer::new(PushCounter::new(tee));
self.produced.push(Rc::clone(buffer.inner().produced()));

for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
summary.borrow_mut().push(connection.clone());
for (input, entry) in connection {
self.summaries[input].borrow_mut().add_port(new_output, entry);
}

(OutputWrapper::new(buffer, internal), stream)
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
use crate::progress::operate::PortConnectivity;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
Expand All @@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
///
/// Each timestamp received through this input may only produce output timestamps
/// greater or equal to the input timestamp subjected to at least one of these summaries.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>,
}

Expand Down Expand Up @@ -149,7 +149,7 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>
) -> InputHandleCore<T, C, P> {
InputHandleCore {
Expand Down
3 changes: 2 additions & 1 deletion timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use crate::Container;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::capture::{Event, EventPusher};
use crate::progress::operate::Connectivity;

/// Logs events as a timely stream, with progress statements.
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
/// Worker-unique identifier for the operator.
pub id: usize,
/// Timestamp action summaries for (input, output) pairs.
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
pub summary: Connectivity<TS>,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
Expand Down
Loading