Skip to content

Commit 8e775b6

Browse files
authored
Refactoring dyn Column (#1502)
1 parent e1f9af4 commit 8e775b6

30 files changed

+366
-549
lines changed

examples/custom_collector.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
// Of course, you can have a look at the tantivy's built-in collectors
88
// such as the `CountCollector` for more examples.
99

10+
use std::sync::Arc;
11+
1012
use fastfield_codecs::Column;
1113
// ---
1214
// Importing tantivy...
1315
use tantivy::collector::{Collector, SegmentCollector};
14-
use tantivy::fastfield::DynamicFastFieldReader;
1516
use tantivy::query::QueryParser;
1617
use tantivy::schema::{Field, Schema, FAST, INDEXED, TEXT};
1718
use tantivy::{doc, Index, Score, SegmentReader};
@@ -96,7 +97,7 @@ impl Collector for StatsCollector {
9697
}
9798

9899
struct StatsSegmentCollector {
99-
fast_field_reader: DynamicFastFieldReader<u64>,
100+
fast_field_reader: Arc<dyn Column<u64>>,
100101
stats: Stats,
101102
}
102103

examples/warmer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::cmp::Reverse;
22
use std::collections::{HashMap, HashSet};
33
use std::sync::{Arc, RwLock, Weak};
44

5-
use fastfield_codecs::Column;
65
use tantivy::collector::TopDocs;
76
use tantivy::query::QueryParser;
87
use tantivy::schema::{Field, Schema, FAST, TEXT};

fastfield_codecs/src/column.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
use std::marker::PhantomData;
2+
3+
use tantivy_bitpacker::minmax;
4+
15
pub trait Column<T = u64> {
26
/// Return the value associated to the given idx.
37
///
@@ -42,8 +46,113 @@ pub trait Column<T = u64> {
4246
fn max_value(&self) -> T;
4347

4448
fn num_vals(&self) -> u64;
49+
4550
/// Returns a iterator over the data
4651
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
4752
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
4853
}
4954
}
55+
56+
pub struct VecColumn<'a, T = u64> {
57+
values: &'a [T],
58+
min_value: T,
59+
max_value: T,
60+
}
61+
62+
impl<'a, T: Copy + PartialOrd> Column<T> for VecColumn<'a, T> {
63+
fn get_val(&self, position: u64) -> T {
64+
self.values[position as usize]
65+
}
66+
67+
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
68+
Box::new(self.values.iter().copied())
69+
}
70+
71+
fn min_value(&self) -> T {
72+
self.min_value
73+
}
74+
75+
fn max_value(&self) -> T {
76+
self.max_value
77+
}
78+
79+
fn num_vals(&self) -> u64 {
80+
self.values.len() as u64
81+
}
82+
}
83+
84+
impl<'a, T: Copy + Ord + Default> From<&'a [T]> for VecColumn<'a, T> {
85+
fn from(values: &'a [T]) -> Self {
86+
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
87+
Self {
88+
values,
89+
min_value,
90+
max_value,
91+
}
92+
}
93+
}
94+
95+
struct MonotonicMappingColumn<C, T, Input> {
96+
from_column: C,
97+
monotonic_mapping: T,
98+
_phantom: PhantomData<Input>,
99+
}
100+
101+
/// Creates a view of a column transformed by a monotonic mapping.
102+
pub fn monotonic_map_column<C, T, Input, Output>(
103+
from_column: C,
104+
monotonic_mapping: T,
105+
) -> impl Column<Output>
106+
where
107+
C: Column<Input>,
108+
T: Fn(Input) -> Output,
109+
{
110+
MonotonicMappingColumn {
111+
from_column,
112+
monotonic_mapping,
113+
_phantom: PhantomData,
114+
}
115+
}
116+
117+
impl<C, T, Input, Output> Column<Output> for MonotonicMappingColumn<C, T, Input>
118+
where
119+
C: Column<Input>,
120+
T: Fn(Input) -> Output,
121+
{
122+
fn get_val(&self, idx: u64) -> Output {
123+
let from_val = self.from_column.get_val(idx);
124+
(self.monotonic_mapping)(from_val)
125+
}
126+
127+
fn min_value(&self) -> Output {
128+
let from_min_value = self.from_column.min_value();
129+
(self.monotonic_mapping)(from_min_value)
130+
}
131+
132+
fn max_value(&self) -> Output {
133+
let from_max_value = self.from_column.max_value();
134+
(self.monotonic_mapping)(from_max_value)
135+
}
136+
137+
fn num_vals(&self) -> u64 {
138+
self.from_column.num_vals()
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use super::*;
145+
146+
#[test]
147+
fn test_monotonic_mapping() {
148+
let vals = &[1u64, 3u64][..];
149+
let col = VecColumn::from(vals);
150+
let mapped = monotonic_map_column(col, |el| el + 4);
151+
assert_eq!(mapped.min_value(), 5u64);
152+
assert_eq!(mapped.max_value(), 7u64);
153+
assert_eq!(mapped.num_vals(), 2);
154+
assert_eq!(mapped.num_vals(), 2);
155+
assert_eq!(mapped.get_val(0), 5);
156+
assert_eq!(mapped.get_val(1), 7);
157+
}
158+
}

fastfield_codecs/src/lib.rs

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub mod linear;
1414

1515
mod column;
1616

17-
pub use self::column::Column;
17+
pub use self::column::{monotonic_map_column, Column, VecColumn};
1818

1919
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
2020
#[repr(u8)]
@@ -56,12 +56,12 @@ impl FastFieldCodecType {
5656

5757
/// The FastFieldSerializerEstimate trait is required on all variants
5858
/// of fast field compressions, to decide which one to choose.
59-
pub trait FastFieldCodec {
59+
pub trait FastFieldCodec: 'static {
6060
/// A codex needs to provide a unique name and id, which is
6161
/// used for debugging and de/serialization.
6262
const CODEC_TYPE: FastFieldCodecType;
6363

64-
type Reader: Column<u64>;
64+
type Reader: Column<u64> + 'static;
6565

6666
/// Reads the metadata and returns the CodecReader
6767
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader>;
@@ -90,35 +90,6 @@ pub struct FastFieldStats {
9090
pub num_vals: u64,
9191
}
9292

93-
struct VecColum<'a>(&'a [u64]);
94-
impl<'a> Column for VecColum<'a> {
95-
fn get_val(&self, position: u64) -> u64 {
96-
self.0[position as usize]
97-
}
98-
99-
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
100-
Box::new(self.0.iter().cloned())
101-
}
102-
103-
fn min_value(&self) -> u64 {
104-
self.0.iter().min().cloned().unwrap_or(0)
105-
}
106-
107-
fn max_value(&self) -> u64 {
108-
self.0.iter().max().cloned().unwrap_or(0)
109-
}
110-
111-
fn num_vals(&self) -> u64 {
112-
self.0.len() as u64
113-
}
114-
}
115-
116-
impl<'a> From<&'a [u64]> for VecColum<'a> {
117-
fn from(data: &'a [u64]) -> Self {
118-
Self(data)
119-
}
120-
}
121-
12293
#[cfg(test)]
12394
mod tests {
12495
use proptest::prelude::*;
@@ -133,10 +104,10 @@ mod tests {
133104
data: &[u64],
134105
name: &str,
135106
) -> Option<(f32, f32)> {
136-
let estimation = Codec::estimate(&VecColum::from(data))?;
107+
let estimation = Codec::estimate(&VecColumn::from(data))?;
137108

138109
let mut out: Vec<u8> = Vec::new();
139-
Codec::serialize(&mut out, &VecColum::from(data)).unwrap();
110+
Codec::serialize(&mut out, &VecColumn::from(data)).unwrap();
140111

141112
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);
142113

@@ -233,7 +204,7 @@ mod tests {
233204
#[test]
234205
fn estimation_good_interpolation_case() {
235206
let data = (10..=20000_u64).collect::<Vec<_>>();
236-
let data: VecColum = data.as_slice().into();
207+
let data: VecColumn = data.as_slice().into();
237208

238209
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
239210
assert_le!(linear_interpol_estimation, 0.01);
@@ -249,7 +220,7 @@ mod tests {
249220
fn estimation_test_bad_interpolation_case() {
250221
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
251222

252-
let data: VecColum = data.into();
223+
let data: VecColumn = data.into();
253224
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
254225
assert_le!(linear_interpol_estimation, 0.32);
255226

@@ -260,7 +231,7 @@ mod tests {
260231
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
261232
let mut data: Vec<u64> = (200..=20000_u64).collect();
262233
data.push(1_000_000);
263-
let data: VecColum = data.as_slice().into();
234+
let data: VecColumn = data.as_slice().into();
264235

265236
// in this case the linear interpolation can't in fact not be worse than bitpacking,
266237
// but the estimator adds some threshold, which leads to estimated worse behavior

src/aggregation/agg_req_with_accessor.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use std::rc::Rc;
44
use std::sync::atomic::AtomicU32;
55
use std::sync::Arc;
66

7+
use fastfield_codecs::Column;
8+
79
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
810
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
911
use super::metric::{AverageAggregation, StatsAggregation};
1012
use super::segment_agg_result::BucketCount;
1113
use super::VecWithNames;
12-
use crate::fastfield::{
13-
type_and_cardinality, DynamicFastFieldReader, FastType, MultiValuedFastFieldReader,
14-
};
14+
use crate::fastfield::{type_and_cardinality, FastType, MultiValuedFastFieldReader};
1515
use crate::schema::{Cardinality, Type};
1616
use crate::{InvertedIndexReader, SegmentReader, TantivyError};
1717

@@ -37,10 +37,16 @@ impl AggregationsWithAccessor {
3737
#[derive(Clone)]
3838
pub(crate) enum FastFieldAccessor {
3939
Multi(MultiValuedFastFieldReader<u64>),
40-
Single(DynamicFastFieldReader<u64>),
40+
Single(Arc<dyn Column<u64>>),
4141
}
4242
impl FastFieldAccessor {
43-
pub fn as_single(&self) -> Option<&DynamicFastFieldReader<u64>> {
43+
pub fn as_single(&self) -> Option<&dyn Column<u64>> {
44+
match self {
45+
FastFieldAccessor::Multi(_) => None,
46+
FastFieldAccessor::Single(reader) => Some(&**reader),
47+
}
48+
}
49+
pub fn into_single(self) -> Option<Arc<dyn Column<u64>>> {
4450
match self {
4551
FastFieldAccessor::Multi(_) => None,
4652
FastFieldAccessor::Single(reader) => Some(reader),
@@ -118,7 +124,7 @@ impl BucketAggregationWithAccessor {
118124
pub struct MetricAggregationWithAccessor {
119125
pub metric: MetricAggregation,
120126
pub field_type: Type,
121-
pub accessor: DynamicFastFieldReader<u64>,
127+
pub accessor: Arc<dyn Column>,
122128
}
123129

124130
impl MetricAggregationWithAccessor {
@@ -134,9 +140,8 @@ impl MetricAggregationWithAccessor {
134140

135141
Ok(MetricAggregationWithAccessor {
136142
accessor: accessor
137-
.as_single()
138-
.expect("unexpected fast field cardinality")
139-
.clone(),
143+
.into_single()
144+
.expect("unexpected fast field cardinality"),
140145
field_type,
141146
metric: metric.clone(),
142147
})

src/aggregation/bucket/histogram/histogram.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::aggregation::intermediate_agg_result::{
1515
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
1616
};
1717
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
18-
use crate::fastfield::DynamicFastFieldReader;
1918
use crate::schema::Type;
2019
use crate::{DocId, TantivyError};
2120

@@ -264,7 +263,7 @@ impl SegmentHistogramCollector {
264263
req: &HistogramAggregation,
265264
sub_aggregation: &AggregationsWithAccessor,
266265
field_type: Type,
267-
accessor: &DynamicFastFieldReader<u64>,
266+
accessor: &dyn Column<u64>,
268267
) -> crate::Result<Self> {
269268
req.validate()?;
270269
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);

src/aggregation/bucket/range.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::fmt::Debug;
22
use std::ops::Range;
33

4-
use fastfield_codecs::Column;
54
use fnv::FnvHashMap;
65
use serde::{Deserialize, Serialize};
76

src/aggregation/metric/average.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use fastfield_codecs::Column;
44
use serde::{Deserialize, Serialize};
55

66
use crate::aggregation::f64_from_fastfield_u64;
7-
use crate::fastfield::DynamicFastFieldReader;
87
use crate::schema::Type;
98
use crate::DocId;
109

@@ -58,7 +57,7 @@ impl SegmentAverageCollector {
5857
data: Default::default(),
5958
}
6059
}
61-
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
60+
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
6261
let mut iter = doc.chunks_exact(4);
6362
for docs in iter.by_ref() {
6463
let val1 = field.get_val(docs[0] as u64);

src/aggregation/metric/stats.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use fastfield_codecs::Column;
22
use serde::{Deserialize, Serialize};
33

44
use crate::aggregation::f64_from_fastfield_u64;
5-
use crate::fastfield::DynamicFastFieldReader;
65
use crate::schema::Type;
76
use crate::{DocId, TantivyError};
87

@@ -164,7 +163,7 @@ impl SegmentStatsCollector {
164163
stats: IntermediateStats::default(),
165164
}
166165
}
167-
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
166+
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
168167
let mut iter = doc.chunks_exact(4);
169168
for docs in iter.by_ref() {
170169
let val1 = field.get_val(docs[0] as u64);

src/aggregation/segment_agg_result.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,10 @@ impl SegmentMetricResultCollector {
185185
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
186186
match self {
187187
SegmentMetricResultCollector::Average(avg_collector) => {
188-
avg_collector.collect_block(doc, &metric.accessor);
188+
avg_collector.collect_block(doc, &*metric.accessor);
189189
}
190190
SegmentMetricResultCollector::Stats(stats_collector) => {
191-
stats_collector.collect_block(doc, &metric.accessor);
191+
stats_collector.collect_block(doc, &*metric.accessor);
192192
}
193193
}
194194
}

0 commit comments

Comments
 (0)