Skip to content

Absorb builder into batcher #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ fn main() {
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);

let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_, ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_, ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Expand Down Expand Up @@ -351,8 +351,8 @@ use differential_dataflow::trace::implementations::merge_batcher::ColMerger;
use differential_dataflow::containers::TimelyStack;

/// A batcher for columnar storage.
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<Column<((K,V),T,R)>, batcher::Chunker<TimelyStack<((K,V),T,R)>>, ColMerger<(K,V),T,R>>;
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
pub type Col2ValBatcher<K,V,T,R,Bu> = MergeBatcher<Column<((K,V),T,R)>, batcher::Chunker<TimelyStack<((K,V),T,R)>>, ColMerger<(K,V),T,R>, Bu>;
pub type Col2KeyBatcher<K,T,R,Bu> = Col2ValBatcher<K, (), T, R, Bu>;

/// Types for consolidating, merging, and extracting columnar update collections.
pub mod batcher {
Expand Down
16 changes: 8 additions & 8 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ fn main() {
match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let data = data.arrange::<ColKeyBatcher<_,_,_,ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_,ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let data = data.arrange::<OrdKeyBatcher<_,_,_,RcOrdKeyBuilder<_,_,_>>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_,RcOrdKeyBuilder<_,_,_>>, OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_,VecBuilder<_,(),_,_>>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_,VecBuilder<_,(),_,_>>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
Expand All @@ -54,11 +54,11 @@ fn main() {

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.arrange::<PreferredBatcher<[u8],[u8],_,_,PreferredBuilder<[u8],[u8],_,_>>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.arrange::<PreferredBatcher<[u8],u8,_,_,PreferredBuilder<[u8],u8,_,_>>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
Expand Down
49 changes: 21 additions & 28 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use super::TraceAgent;
/// computation, memory) required to produce and maintain an indexed representation of a collection.
pub struct Arranged<G: Scope, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader+Clone,
Tr: TraceReader,
{
/// A stream containing arranged updates.
///
Expand All @@ -62,7 +61,7 @@ where

impl<G, Tr> Clone for Arranged<G, Tr>
where
G: Scope<Timestamp=Tr::Time>,
G: Scope,
Tr: TraceReader + Clone,
{
fn clone(&self) -> Self {
Expand Down Expand Up @@ -347,27 +346,26 @@ where
}

/// A type that can be arranged as if a collection of updates.
///
/// The type `C` is a container that a batcher can accept, and `G` is the scope.
pub trait Arrange<G, C>
where
G: Scope,
G::Timestamp: Lattice,
{
/// Arranges updates into a shared trace.
fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
fn arrange<Ba, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Ba: Batcher<Input=C, Time=G::Timestamp, Output=Tr::Batch> + 'static,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
{
self.arrange_named::<Ba, Bu, Tr>("Arrange")
self.arrange_named::<Ba, Tr>("Arrange")
}

/// Arranges updates into a shared trace, with a supplied name.
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Ba: Batcher<Input=C, Time=G::Timestamp, Output=Tr::Batch> + 'static,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
;
Expand All @@ -376,20 +374,18 @@ where
impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp, Output=Tr::Batch> + 'static,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
arrange_core::<_, _, Ba, _>(&self.inner, exchange, name)
}
}

Expand All @@ -398,14 +394,12 @@ where
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
pub fn arrange_core<G, P, Ba, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope,
G::Timestamp: Lattice,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Ba: Batcher<Time=G::Timestamp, Output=Tr::Batch> + 'static,
Ba::Input: Container + Clone + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
{
Expand Down Expand Up @@ -512,7 +506,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal::<Bu>(upper.clone());
let batch = batcher.seal(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -540,7 +534,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
let _batch = batcher.seal(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

Expand All @@ -559,15 +553,14 @@ impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, V
where
G::Timestamp: Lattice+Ord,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp, Output=Tr::Batch> + 'static,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
arrange_core::<_,_,Ba,_>(&self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down Expand Up @@ -598,7 +591,7 @@ where
}

fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
self.arrange_named::<ValBatcher<_,_,_,_,ValBuilder<_,_,_,_>>,_>(name)
}
}

Expand Down Expand Up @@ -633,6 +626,6 @@ where

fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
.arrange_named::<KeyBatcher<_,_,_,KeyBuilder<_,_,_>>,_>(name)
}
}
11 changes: 5 additions & 6 deletions differential-dataflow/src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::difference::Semigroup;

use crate::Data;
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::trace::Batcher;

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -46,21 +46,20 @@ where
/// ```
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate")
self.consolidate_named::<KeyBatcher<_,_,_,KeyBuilder<_,_,_>>, KeySpine<_,_,_>>("Consolidate")
}

/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
pub fn consolidate_named<Ba, Tr>(&self, name: &str) -> Self
where
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp, Output=Tr::Batch> + 'static,
Tr: crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>,
Tr::Batch: crate::trace::Batch,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.arrange_named::<Ba, Tr>(name)
.as_collection(|d, _| d.into_owned())
}

Expand Down
17 changes: 9 additions & 8 deletions differential-dataflow/src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::trace::{Batcher, Builder, Description};
///
/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
/// and must produce outputs of type `M::Chunk`.
pub struct MergeBatcher<Input, C, M: Merger> {
pub struct MergeBatcher<Input, C, M: Merger, Bu> {
/// Transforms input streams to chunks of sorted, consolidated data.
chunker: C,
/// A sequence of power-of-two length lists of sorted, consolidated containers.
Expand All @@ -44,18 +44,19 @@ pub struct MergeBatcher<Input, C, M: Merger> {
/// Timely operator ID.
operator_id: usize,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
_marker: PhantomData<Input>,
_marker: PhantomData<(Input, Bu)>,
}

impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
impl<Input, C, M, Bu> Batcher for MergeBatcher<Input, C, M, Bu>
where
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
M: Merger,
M::Time: Timestamp,
Bu: Builder<Time=M::Time, Input=M::Chunk>,
{
type Input = Input;
type Time = M::Time;
type Output = M::Chunk;
type Output = Bu::Output;

fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
Expand Down Expand Up @@ -85,7 +86,7 @@ where
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
fn seal(&mut self, upper: Antichain<M::Time>) -> Bu::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
Expand Down Expand Up @@ -115,7 +116,7 @@ where
self.stash.clear();

let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
let seal = B::seal(&mut readied, description);
let seal = Bu::seal(&mut readied, description);
self.lower = upper;
seal
}
Expand All @@ -127,7 +128,7 @@ where
}
}

impl<Input, C, M> MergeBatcher<Input, C, M>
impl<Input, C, M, Bu> MergeBatcher<Input, C, M, Bu>
where
M: Merger,
{
Expand Down Expand Up @@ -194,7 +195,7 @@ where
}
}

impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
impl<Input, C, M, Bu> Drop for MergeBatcher<Input, C, M, Bu>
where
M: Merger,
{
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
pub type OrdValBatcher<K, V, T, R, Bu> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>, Bu>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -34,14 +34,14 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
pub type ColValBatcher<K, V, T, R, Bu> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>, Bu>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
pub type OrdKeyBatcher<K, T, R, Bu> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>, Bu>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand All @@ -51,14 +51,14 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
pub type ColKeyBatcher<K, T, R, Bu> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>, Bu>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
/// A batcher for columnar storage.
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
pub type PreferredBatcher<K, V, T, R, Bu> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>, Bu>;
/// A builder for columnar storage.
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;

Expand Down
Loading
Loading