Skip to content

Commit f035a90

Browse files
committed
Persist: Add new API for run based spine replacement
1 parent a547087 commit f035a90

File tree

7 files changed

+1616
-345
lines changed

7 files changed

+1616
-345
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,11 @@ where
509509
start.elapsed(),
510510
);
511511
let (apply_res, maintenance) = machine
512-
.merge_res(&FueledMergeRes { output: res.output })
512+
.merge_res(&FueledMergeRes {
513+
output: res.output,
514+
inputs: vec![],
515+
new_active_compaction: None,
516+
})
513517
.await;
514518
if !maintenance.is_empty() {
515519
info!("ignoring non-empty requested maintenance: {maintenance:?}")

src/persist-client/src/internal/compact.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,11 @@ where
396396
);
397397
let res = Self::compact_all(stream, req.clone()).await?;
398398
let maintenance = Self::apply(
399-
FueledMergeRes { output: res.output },
399+
FueledMergeRes {
400+
output: res.output,
401+
inputs: vec![],
402+
new_active_compaction: None,
403+
},
400404
&metrics_clone,
401405
&machine_clone,
402406
)

src/persist-client/src/internal/machine.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,11 @@ pub mod datadriven {
24642464
.clone();
24652465
let (merge_res, maintenance) = datadriven
24662466
.machine
2467-
.merge_res(&FueledMergeRes { output: batch })
2467+
.merge_res(&FueledMergeRes {
2468+
output: batch,
2469+
inputs: vec![],
2470+
new_active_compaction: None,
2471+
})
24682472
.await;
24692473
datadriven.routine.push(maintenance);
24702474
Ok(format!(

src/persist-client/src/internal/state.rs

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,29 @@ impl<T> HollowBatch<T> {
10531053
}
10541054
}
10551055

1056+
#[cfg(test)]
1057+
pub(crate) fn new_run_for_test(
1058+
desc: Description<T>,
1059+
parts: Vec<RunPart<T>>,
1060+
len: usize,
1061+
run_id: RunId,
1062+
) -> Self {
1063+
let run_meta = if parts.is_empty() {
1064+
vec![]
1065+
} else {
1066+
let mut meta = RunMeta::default();
1067+
meta.id = Some(run_id);
1068+
vec![meta]
1069+
};
1070+
Self {
1071+
desc,
1072+
len,
1073+
parts,
1074+
run_splits: vec![],
1075+
run_meta,
1076+
}
1077+
}
1078+
10561079
/// An empty hollow batch, representing no updates over the given desc.
10571080
pub(crate) fn empty(desc: Description<T>) -> Self {
10581081
Self {
@@ -1810,7 +1833,9 @@ where
18101833
return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
18111834
}
18121835

1813-
let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1836+
let apply_merge_result = self
1837+
.trace
1838+
.apply_merge_res_checked_classic::<D>(res, metrics);
18141839
Continue(apply_merge_result)
18151840
}
18161841

@@ -2164,6 +2189,8 @@ where
21642189
// able to append that batch in the first place.
21652190
let fake_merge = FueledMergeRes {
21662191
output: HollowBatch::empty(desc),
2192+
inputs: vec![],
2193+
new_active_compaction: None,
21672194
};
21682195
let result = self.trace.apply_tombstone_merge(&fake_merge);
21692196
assert!(
@@ -2833,34 +2860,92 @@ pub(crate) mod tests {
28332860
}
28342861
}
28352862

2863+
pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2864+
num_runs: usize,
2865+
) -> impl Strategy<Value = HollowBatch<T>> {
2866+
(
2867+
any::<T>(),
2868+
any::<T>(),
2869+
any::<T>(),
2870+
proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2871+
any::<usize>(),
2872+
)
2873+
.prop_map(move |(t0, t1, since, parts, len)| {
2874+
let (lower, upper) = if t0 <= t1 {
2875+
(Antichain::from_elem(t0), Antichain::from_elem(t1))
2876+
} else {
2877+
(Antichain::from_elem(t1), Antichain::from_elem(t0))
2878+
};
2879+
let since = Antichain::from_elem(since);
2880+
2881+
let run_splits = (1..num_runs)
2882+
.map(|i| i * parts.len() / num_runs)
2883+
.collect::<Vec<_>>();
2884+
2885+
let run_meta = (0..num_runs)
2886+
.map(|_| {
2887+
let mut meta = RunMeta::default();
2888+
meta.id = Some(RunId::new());
2889+
meta
2890+
})
2891+
.collect::<Vec<_>>();
2892+
2893+
HollowBatch::new(
2894+
Description::new(lower, upper, since),
2895+
parts,
2896+
len % 10,
2897+
run_meta,
2898+
run_splits,
2899+
)
2900+
})
2901+
}
2902+
28362903
pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
28372904
Strategy::prop_map(
28382905
(
28392906
any::<T>(),
28402907
any::<T>(),
28412908
any::<T>(),
2842-
proptest::collection::vec(any_run_part::<T>(), 0..3),
2909+
proptest::collection::vec(any_run_part::<T>(), 0..20),
28432910
any::<usize>(),
2844-
any::<bool>(),
2911+
0..=10usize,
2912+
proptest::collection::vec(any::<RunId>(), 10),
28452913
),
2846-
|(t0, t1, since, parts, len, runs)| {
2914+
|(t0, t1, since, parts, len, num_runs, run_ids)| {
28472915
let (lower, upper) = if t0 <= t1 {
28482916
(Antichain::from_elem(t0), Antichain::from_elem(t1))
28492917
} else {
28502918
(Antichain::from_elem(t1), Antichain::from_elem(t0))
28512919
};
28522920
let since = Antichain::from_elem(since);
2853-
if runs && parts.len() > 2 {
2854-
let split_at = parts.len() / 2;
2921+
if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2922+
let run_splits = (1..num_runs)
2923+
.map(|i| i * parts.len() / num_runs)
2924+
.collect::<Vec<_>>();
2925+
2926+
let run_meta = (0..num_runs)
2927+
.enumerate()
2928+
.map(|(i, _)| {
2929+
let mut meta = RunMeta::default();
2930+
meta.id = Some(run_ids[i]);
2931+
meta
2932+
})
2933+
.collect::<Vec<_>>();
2934+
28552935
HollowBatch::new(
28562936
Description::new(lower, upper, since),
28572937
parts,
28582938
len % 10,
2859-
vec![RunMeta::default(), RunMeta::default()],
2860-
vec![split_at],
2939+
run_meta,
2940+
run_splits,
28612941
)
28622942
} else {
2863-
HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10)
2943+
HollowBatch::new_run_for_test(
2944+
Description::new(lower, upper, since),
2945+
parts,
2946+
len % 10,
2947+
run_ids[0],
2948+
)
28642949
}
28652950
},
28662951
)

src/persist-client/src/internal/state_diff.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -877,7 +877,11 @@ fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
877877

878878
// Fast-path: compaction
879879
if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880-
let res = FueledMergeRes { output };
880+
let res = FueledMergeRes {
881+
output,
882+
inputs: vec![],
883+
new_active_compaction: None,
884+
};
881885
// We can't predict how spine will arrange the batches when it's
882886
// hydrated. This means that something that is maintaining a Spine
883887
// starting at some seqno may not exactly match something else
@@ -1444,7 +1448,11 @@ mod tests {
14441448
leader
14451449
.collections
14461450
.trace
1447-
.apply_merge_res_unchecked(&FueledMergeRes { output });
1451+
.apply_merge_res_unchecked(&FueledMergeRes {
1452+
output,
1453+
inputs: vec![],
1454+
new_active_compaction: None,
1455+
});
14481456
}
14491457
}
14501458
}

0 commit comments

Comments
 (0)