Skip to content

Separate containers crate #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
[workspace]
members = [
"differential-dataflow",
# "advent_of_code_2017",
"containers",
"differential-dataflow",
"dogsdogsdogs",
"doop",
"experiments",
"interactive",
"server",
"server/dataflows/degr_dist",
"server/dataflows/neighborhood",
"server/dataflows/random_graph",
"server/dataflows/reachability",
#"tpchlike",
"doop"
# "tpchlike",
]
resolver = "2"

[workspace.dependencies]
columnar = { version = "0.4.1", default-features = false }
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.15.2" }
timely = { version = "0.21", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }
Expand Down
14 changes: 14 additions & 0 deletions containers/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "differential-containers"
version = "0.1.0"
edition = "2021"

[dependencies]
columnation = "0.1.0"
differential-dataflow = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
timely = { workspace = true }

[dev-dependencies]
bytemuck = { default-features = false, version = "1.21.0" }
columnar = { workspace = true }
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
//! Wordcount based on `columnar`.

use {
timely::container::{Container, CapacityContainerBuilder},
timely::dataflow::channels::pact::ExchangeCore,
timely::dataflow::InputHandleCore,
timely::dataflow::ProbeHandle,
};


use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder;
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;

use differential_containers::columnation::{ColKeyBuilder, ColKeySpine, ColMerger, TimelyStack};
use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
use timely::container::{Container, CapacityContainerBuilder};
use timely::dataflow::InputHandleCore;
use timely::dataflow::ProbeHandle;
use timely::dataflow::channels::pact::ExchangeCore;

fn main() {

Expand Down Expand Up @@ -346,10 +341,6 @@ mod builder {
}


use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
use differential_dataflow::trace::implementations::merge_batcher::ColMerger;
use differential_dataflow::containers::TimelyStack;

/// A batcher for columnar storage.
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<Column<((K,V),T,R)>, batcher::Chunker<TimelyStack<((K,V),T,R)>>, ColMerger<(K,V),T,R>>;
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
use differential_containers::columnation::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
Expand All @@ -42,15 +42,15 @@ fn main() {
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
use differential_containers::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};
use differential_containers::{PreferredBatcher, PreferredBuilder, PreferredSpine};

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,83 @@
//! A columnar container based on the columnation library.

use std::iter::FromIterator;

pub use columnation::*;
use std::rc::Rc;

use columnation::{Columnation, Region};
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder};
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use differential_dataflow::trace::implementations::{BatchContainer, Layout, OffsetList, Update};
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use timely::container::PushInto;

mod merge_batcher;
mod chunker;

pub use merge_batcher::ColMerger;
pub use chunker::ColumnationChunker;

/// A layout based on timely stacks
pub struct TStack<U: Update> {
phantom: std::marker::PhantomData<U>,
}

impl<U: Update> Layout for TStack<U>
where
U::Key: Columnation,
U::Val: Columnation,
U::Time: Columnation,
U::Diff: Columnation + Ord,
{
type Target = U;
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type OffsetContainer = OffsetList;
}


/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;


/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

// The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
// be presented with the actual contained type, rather than a type that borrows into it.
impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
type Owned = T;
type ReadItem<'a> = &'a T;

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}

fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
let mut new = Self::default();
new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
new
}
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
fn index(&self, index: usize) -> Self::ReadItem<'_> {
&self[index]
}
fn len(&self) -> usize {
self[..].len()
}
}


/// An append-only vector that store records as columns.
///
/// This container maintains elements that might conventionally own
Expand Down Expand Up @@ -274,7 +347,7 @@ mod container {
use timely::Container;
use timely::container::SizableContainer;

use crate::containers::TimelyStack;
use crate::columnation::TimelyStack;

impl<T: Columnation> Container for TimelyStack<T> {
type ItemRef<'a> = &'a T where Self: 'a;
Expand Down
120 changes: 120 additions & 0 deletions containers/src/columnation/chunker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::collections::VecDeque;
use columnation::Columnation;
use timely::container::{ContainerBuilder, PushInto};
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;

use crate::columnation::TimelyStack;

/// Chunk a stream of vectors into chains of vectors.
pub struct ColumnationChunker<T: Columnation> {
pending: Vec<T>,
ready: VecDeque<TimelyStack<T>>,
empty: Option<TimelyStack<T>>,
}


impl<T: Columnation> Default for ColumnationChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}

impl<D,T,R> ColumnationChunker<(D, T, R)>
where
D: Columnation + Ord,
T: Columnation + Ord,
R: Columnation + Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 64 << 10;
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<(D, T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}

/// Form chunks out of pending data, if needed. This function is meant to be applied to
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
/// half full when the function returns.
///
/// `form_chunk` does the following:
/// * If pending is full, consolidate.
/// * If after consolidation it's more than half full, peel off chunks,
/// leaving behind any partial chunk in pending.
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..chunk.capacity()) {
chunk.copy(&item);
}
self.ready.push_back(chunk);
}
}
}
}

impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
where
D: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}

impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
where
D: Columnation + Ord + Clone + 'static,
T: Columnation + Ord + Clone + 'static,
R: Columnation + Semigroup + Clone + 'static,
{
type Container = TimelyStack<(D,T,R)>;

fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}

fn finish(&mut self) -> Option<&mut Self::Container> {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
chunk.copy(&item);
}
self.ready.push_back(chunk);
}
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}

Loading
Loading