Skip to content

Commit 0d9a612

Browse files
committed
Persist: plumb incremental apply APIs through
Depends on #33044
1 parent 2bb9775 commit 0d9a612

File tree

5 files changed

+463
-163
lines changed

5 files changed

+463
-163
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
4040
use crate::internal::encoding::Schemas;
4141
use crate::internal::gc::{GarbageCollector, GcReq};
4242
use crate::internal::machine::Machine;
43-
use crate::internal::trace::{CompactionInput, FueledMergeRes};
43+
use crate::internal::trace::FueledMergeRes;
4444
use crate::rpc::{NoopPubSubSender, PubSubSender};
4545
use crate::write::{WriteHandle, WriterId};
4646
use crate::{
@@ -455,17 +455,17 @@ where
455455
let req = CompactReq {
456456
shard_id,
457457
desc: req.desc,
458-
inputs: req
459-
.inputs
460-
.into_iter()
461-
.map(|b| Arc::unwrap_or_clone(b.batch))
462-
.collect(),
458+
inputs: req.inputs,
463459
};
464-
let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
460+
let parts = req
461+
.inputs
462+
.iter()
463+
.map(|x| x.batch.part_count())
464+
.sum::<usize>();
465465
let bytes = req
466466
.inputs
467467
.iter()
468-
.map(|x| x.encoded_size_bytes())
468+
.map(|x| x.batch.encoded_size_bytes())
469469
.sum::<usize>();
470470
let start = Instant::now();
471471
info!(
@@ -511,7 +511,7 @@ where
511511
let (apply_res, maintenance) = machine
512512
.merge_res(&FueledMergeRes {
513513
output: res.output,
514-
input: CompactionInput::Legacy,
514+
input: res.input,
515515
new_active_compaction: None,
516516
})
517517
.await;
@@ -743,14 +743,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
743743
let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
744744
for req in reqs {
745745
info!(
746-
"force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
746+
"force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
747747
machine.applier.shard_metrics.name,
748748
machine.applier.shard_metrics.shard_id,
749749
req.inputs.len(),
750-
req.inputs.iter().flat_map(|x| &x.parts).count(),
750+
req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
751+
req.inputs
752+
.iter()
753+
.map(|x| x.batch.runs().count())
754+
.sum::<usize>(),
751755
req.inputs
752756
.iter()
753-
.flat_map(|x| &x.parts)
757+
.flat_map(|x| &x.batch.parts)
754758
.map(|x| x.encoded_size_bytes())
755759
.sum::<usize>(),
756760
req.desc.lower().elements(),
@@ -795,13 +799,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
795799

796800
// NB: This check is intentionally at the end so that it's safe to call
797801
// this method in a loop.
798-
let num_batches = machine.applier.all_batches().len();
799-
if num_batches < 2 {
802+
let num_runs: usize = machine
803+
.applier
804+
.all_batches()
805+
.iter()
806+
.map(|x| x.runs().count())
807+
.sum();
808+
if num_runs <= 1 {
800809
info!(
801-
"force_compaction {} {} exiting with {} batches",
810+
"force_compaction {} {} exiting with {} runs",
802811
machine.applier.shard_metrics.name,
803812
machine.applier.shard_metrics.shard_id,
804-
num_batches
813+
num_runs
805814
);
806815
return;
807816
}

0 commit comments

Comments
 (0)