Skip to content

Layout extension trait #627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 0 additions & 204 deletions differential-dataflow/src/into_owned.rs

This file was deleted.

2 changes: 0 additions & 2 deletions differential-dataflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use std::fmt::Debug;
pub use collection::{Collection, AsCollection};
pub use hashable::Hashable;
pub use difference::Abelian as Diff;
pub use into_owned::IntoOwned;

/// Data type usable in differential dataflow.
///
Expand All @@ -106,7 +105,6 @@ pub mod logging;
pub mod consolidation;
pub mod capture;
pub mod containers;
mod into_owned;

/// Configuration options for differential dataflow.
#[derive(Default)]
Expand Down
11 changes: 5 additions & 6 deletions differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ pub struct TraceAgent<Tr: TraceReader> {
logging: Option<crate::logging::Logger>,
}

use crate::trace::implementations::WithLayout;
impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
type Layout = Tr::Layout;
}

impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
type Key<'a> = Tr::Key<'a>;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type TimeGat<'a> = Tr::TimeGat<'a>;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
Expand Down
12 changes: 7 additions & 5 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::dataflow::operators::Capability;

use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned};
use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
Expand Down Expand Up @@ -285,10 +285,11 @@ where
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T1: TraceReader<KeyOwn = K>,
T2: for<'a> Trace<
Key<'a>= T1::Key<'a>,
Val<'a> : IntoOwned<'a, Owned = V>,
KeyOwn = K,
ValOwn = V,
Time=T1::Time,
Diff: Abelian,
>+'static,
Expand All @@ -309,10 +310,11 @@ where
/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T1: TraceReader<KeyOwn = K>,
T2: for<'a> Trace<
Key<'a>=T1::Key<'a>,
Val<'a> : IntoOwned<'a, Owned = V>,
KeyOwn = K,
ValOwn = V,
Time=T1::Time,
>+'static,
K: Ord + 'static,
Expand Down
32 changes: 16 additions & 16 deletions differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
Expand Down Expand Up @@ -111,7 +111,9 @@ use timely::dataflow::operators::Capability;
use crate::operators::arrange::arrangement::Arranged;
use crate::trace::{Builder, Description};
use crate::trace::{self, Trace, TraceReader, Cursor};
use crate::{ExchangeData, Hashable, IntoOwned};
use crate::{ExchangeData, Hashable};

use crate::trace::implementations::containers::BatchContainer;

use super::TraceAgent;

Expand All @@ -125,21 +127,19 @@ use super::TraceAgent;
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, K, V, Bu, Tr>(
stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
pub fn arrange_from_upsert<G, Bu, Tr>(
stream: &Stream<G, (Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr: Trace + for<'a> TraceReader<
Key<'a> : IntoOwned<'a, Owned = K>,
Val<'a> : IntoOwned<'a, Owned = V>,
Tr: for<'a> Trace<
KeyOwn: ExchangeData+Hashable+std::hash::Hash,
ValOwn: ExchangeData,
Time: TotalOrder+ExchangeData,
Diff=isize,
>+'static,
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
Bu: Builder<Time=G::Timestamp, Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand All @@ -148,7 +148,7 @@ where

let reader = &mut reader;

let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());
let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option<Tr::ValOwn>,G::Timestamp)| (update.0).hashed().into());

stream.unary_frontier(exchange, name, move |_capability, info| {

Expand All @@ -173,7 +173,7 @@ where
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>>::new();
let mut updates = Vec::new();

move |input, output| {
Expand Down Expand Up @@ -236,19 +236,19 @@ where
for (key, mut list) in to_process {

// The prior value associated with the key.
let mut prev_value: Option<V> = None;
let mut prev_value: Option<Tr::ValOwn> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff));
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(val.into_owned());
prev_value = Some(Tr::owned_val(val));
}
trace_cursor.step_val(&trace_storage);
}
Expand Down
Loading
Loading