Skip to content

Commit f58ace5

Browse files
authored
refactor(meta-service): normalize compactor (#18373)
* refactor(meta-service): normalize state machine compactor chore(meta-service): no need to check against prefix when list by prefix chore(meta-service): SMV003.expire_cursor is not used and is removed refactor: compactor acquire process chore: remove useless trait bound * M src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs
1 parent 0c2533f commit f58ace5

File tree

12 files changed

+111
-142
lines changed

12 files changed

+111
-142
lines changed

src/meta/raft-store/src/leveled_store/db_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl DBBuilder {
140140
lm.freeze_writable();
141141

142142
let mut compacter = lm.acquire_compactor().await;
143-
let (sys_data, strm) = compacter.compact().await?;
143+
let (sys_data, strm) = compacter.compact_into_stream().await?;
144144

145145
self.append_kv_stream(strm).await?;
146146

src/meta/raft-store/src/leveled_store/immutable_levels.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ use crate::leveled_store::immutable::Immutable;
2424
use crate::leveled_store::level::Level;
2525
use crate::leveled_store::map_api::KVResultStream;
2626
use crate::leveled_store::map_api::MapKey;
27-
use crate::leveled_store::map_api::MapKeyDecode;
28-
use crate::leveled_store::map_api::MapKeyEncode;
2927

3028
/// A readonly leveled map that owns the data.
3129
#[derive(Debug, Default, Clone)]
@@ -71,8 +69,6 @@ impl ImmutableLevels {
7169
impl<K> MapApiRO<K> for ImmutableLevels
7270
where
7371
K: MapKey,
74-
K: MapKeyEncode,
75-
K: MapKeyDecode,
7672
Level: MapApiRO<K>,
7773
Immutable: MapApiRO<K>,
7874
{

src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,47 +17,9 @@ use tokio::time::timeout;
1717

1818
use crate::leveled_store::leveled_map::LeveledMap;
1919

20-
#[tokio::test]
21-
async fn test_try_acquire_ok() -> anyhow::Result<()> {
22-
let mut lm = LeveledMap::default();
23-
24-
let c = lm.try_acquire_compactor();
25-
assert!(c.is_some(), "Ok to try get compactor");
26-
27-
Ok(())
28-
}
29-
30-
#[tokio::test]
31-
async fn test_try_acquire_fail() -> anyhow::Result<()> {
32-
let mut lm = LeveledMap::default();
33-
34-
let _c = lm.acquire_compactor().await;
35-
36-
assert!(
37-
lm.try_acquire_compactor().is_none(),
38-
"can not get two compactor"
39-
);
40-
41-
Ok(())
42-
}
43-
44-
#[tokio::test]
45-
async fn test_try_acquire_ok_after_previous_dropped() -> anyhow::Result<()> {
46-
let mut lm = LeveledMap::default();
47-
48-
let _c = lm.acquire_compactor().await;
49-
50-
assert!(lm.try_acquire_compactor().is_none());
51-
52-
drop(_c);
53-
54-
assert!(lm.try_acquire_compactor().is_some());
55-
Ok(())
56-
}
57-
5820
#[tokio::test]
5921
async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
60-
let mut lm = LeveledMap::default();
22+
let lm = LeveledMap::default();
6123

6224
let _c = lm.acquire_compactor().await;
6325

@@ -79,7 +41,7 @@ async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
7941

8042
#[tokio::test]
8143
async fn test_blocking_wait_ok() -> anyhow::Result<()> {
82-
let mut lm = LeveledMap::default();
44+
let lm = LeveledMap::default();
8345

8446
let _c = lm.acquire_compactor().await;
8547

src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl<'a> CompactingData<'a> {
9393
/// The stream Item is 2 items tuple of key, and value with seq.
9494
///
9595
/// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`]
96-
pub async fn compact(
96+
pub async fn compact_into_stream(
9797
&self,
9898
) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> {
9999
fn with_context(e: io::Error, key: &impl fmt::Debug) -> io::Error {

src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ use rotbl::v001::SeqMarked;
2222
use crate::leveled_store::immutable_levels::ImmutableLevels;
2323
use crate::leveled_store::level_index::LevelIndex;
2424
use crate::leveled_store::leveled_map::compacting_data::CompactingData;
25+
use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit;
2526

2627
/// Compactor is responsible for compacting the immutable levels and db.
2728
///
2829
/// Only one Compactor can be running at a time.
2930
pub struct Compactor {
30-
/// When dropped, drop the Sender so that the [`LeveledMap`] will be notified.
31-
#[allow(dead_code)]
32-
pub(super) guard: tokio::sync::oneshot::Sender<()>,
31+
/// Acquired permit for this compactor.
32+
///
33+
/// This is used to ensure that only one compactor can run at a time.
34+
pub(crate) _permit: CompactorPermit,
3335

3436
/// In memory immutable levels.
3537
pub(super) immutable_levels: ImmutableLevels,
@@ -68,10 +70,10 @@ impl Compactor {
6870
/// and a stream contains `kv` and `expire` entries.
6971
///
7072
/// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`]
71-
pub async fn compact(
73+
pub async fn compact_into_stream(
7274
&mut self,
7375
) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> {
7476
let compacting_data = CompactingData::new(&mut self.immutable_levels, self.db.as_ref());
75-
compacting_data.compact().await
77+
compacting_data.compact_into_stream().await
7678
}
7779
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use tokio::sync::OwnedSemaphorePermit;
18+
use tokio::sync::Semaphore;
19+
20+
/// Acquirer is used to acquire a permit for compaction, without holding lock to the state machine.
21+
pub struct CompactorAcquirer {
22+
sem: Arc<Semaphore>,
23+
}
24+
25+
impl CompactorAcquirer {
26+
pub fn new(sem: Arc<Semaphore>) -> Self {
27+
CompactorAcquirer { sem }
28+
}
29+
30+
pub async fn acquire(self) -> CompactorPermit {
31+
// Safe unwrap: it returns error only when semaphore is closed.
32+
// This semaphore does not close.
33+
let permit = self.sem.acquire_owned().await.unwrap();
34+
CompactorPermit { _permit: permit }
35+
}
36+
}
37+
38+
pub struct CompactorPermit {
39+
_permit: OwnedSemaphorePermit,
40+
}

src/meta/raft-store/src/leveled_store/leveled_map/mod.rs

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,26 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use compactor::Compactor;
1618
use databend_common_meta_types::snapshot_db::DB;
1719
use databend_common_meta_types::sys_data::SysData;
1820
use log::info;
19-
use tokio::sync::oneshot::error::TryRecvError;
21+
use tokio::sync::Semaphore;
2022

2123
use crate::leveled_store::immutable::Immutable;
2224
use crate::leveled_store::immutable_levels::ImmutableLevels;
2325
use crate::leveled_store::level::Level;
2426
use crate::leveled_store::level_index::LevelIndex;
27+
use crate::leveled_store::leveled_map::compactor_acquirer::CompactorAcquirer;
28+
use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit;
2529

2630
#[cfg(test)]
2731
mod acquire_compactor_test;
2832
pub mod compacting_data;
2933
pub mod compactor;
34+
pub mod compactor_acquirer;
3035
#[cfg(test)]
3136
mod leveled_map_test;
3237
mod map_api_impl;
@@ -37,12 +42,10 @@ mod map_api_impl;
3742
///
3843
/// The top level is the newest and writable.
3944
/// Others are immutable.
40-
#[derive(Debug, Default)]
45+
#[derive(Debug)]
4146
pub struct LeveledMap {
42-
/// Concurrency control: only one thread can set this field to Some and compact.
43-
///
44-
/// The other should wait for the compaction to finish by blocking on the receiver.
45-
current_compactor: Option<tokio::sync::oneshot::Receiver<()>>,
47+
/// A semaphore that permits at most one compactor to run.
48+
compaction_semaphore: Arc<Semaphore>,
4649

4750
/// The top level is the newest and writable.
4851
writable: Level,
@@ -53,6 +56,18 @@ pub struct LeveledMap {
5356
persisted: Option<DB>,
5457
}
5558

59+
impl Default for LeveledMap {
60+
fn default() -> Self {
61+
Self {
62+
// Only one compactor is allowed.
63+
compaction_semaphore: Arc::new(Semaphore::new(1)),
64+
writable: Default::default(),
65+
immutable_levels: Default::default(),
66+
persisted: None,
67+
}
68+
}
69+
}
70+
5671
impl AsRef<SysData> for LeveledMap {
5772
fn as_ref(&self) -> &SysData {
5873
self.writable.sys_data_ref()
@@ -157,50 +172,25 @@ impl LeveledMap {
157172
info!("compaction finished replacing the db");
158173
}
159174

160-
/// Try to get a singleton `Compactor` instance specific to `self`
161-
/// if it is not currently in use by another thread.
162-
///
163-
/// This method requires a mutable reference to prevent concurrent access to shared data,
164-
/// such as `self.immediate_levels` and `self.persisted`, during the construction of the compactor.
165-
pub(crate) fn try_acquire_compactor(&mut self) -> Option<Compactor> {
166-
if let Some(rx) = &mut self.current_compactor {
167-
match rx.try_recv() {
168-
Err(TryRecvError::Closed) => {
169-
// Ok, released. Continue
170-
}
171-
Err(TryRecvError::Empty) => {
172-
// Another compactor still in use.
173-
return None;
174-
}
175-
Ok(_) => {
176-
unreachable!("it never send any value")
177-
}
178-
}
179-
}
180-
181-
Some(self.new_compactor())
182-
}
183-
184175
/// Get a singleton `Compactor` instance specific to `self`.
185176
///
186177
/// This method requires a mutable reference to prevent concurrent access to shared data,
187178
/// such as `self.immediate_levels` and `self.persisted`, during the construction of the compactor.
188-
pub(crate) async fn acquire_compactor(&mut self) -> Compactor {
189-
if let Some(rx) = self.current_compactor.take() {
190-
let _ = rx.await;
191-
}
179+
pub(crate) async fn acquire_compactor(&self) -> Compactor {
180+
let acquirer = self.new_compactor_acquirer();
192181

193-
self.new_compactor()
194-
}
182+
let permit = acquirer.acquire().await;
195183

196-
fn new_compactor(&mut self) -> Compactor {
197-
let (tx, rx) = tokio::sync::oneshot::channel();
184+
self.new_compactor(permit)
185+
}
198186

199-
// current_compactor must be None, which is guaranteed by caller.
200-
self.current_compactor = Some(rx);
187+
pub(crate) fn new_compactor_acquirer(&self) -> CompactorAcquirer {
188+
CompactorAcquirer::new(self.compaction_semaphore.clone())
189+
}
201190

191+
pub(crate) fn new_compactor(&self, permit: CompactorPermit) -> Compactor {
202192
Compactor {
203-
guard: tx,
193+
_permit: permit,
204194
immutable_levels: self.immutable_levels.clone(),
205195
db: self.persisted.clone(),
206196
since: self.immutable_level_index(),

src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async fn test_compact_3_level() -> anyhow::Result<()> {
149149

150150
let mut compactor = lm.acquire_compactor().await;
151151

152-
let (sys_data, strm) = compactor.compact().await?;
152+
let (sys_data, strm) = compactor.compact_into_stream().await?;
153153
assert_eq!(
154154
r#"{"last_applied":{"leader_id":{"term":3,"node_id":3},"index":3},"last_membership":{"log_id":{"leader_id":{"term":3,"node_id":3},"index":3},"membership":{"configs":[],"nodes":{}}},"nodes":{"3":{"name":"3","endpoint":{"addr":"3","port":3},"grpc_api_advertise_address":null}},"sequence":7}"#,
155155
serde_json::to_string(&sys_data).unwrap()
@@ -176,7 +176,7 @@ async fn test_export_2_level_with_meta() -> anyhow::Result<()> {
176176

177177
let mut compactor = sm.acquire_compactor().await;
178178

179-
let (sys_data, strm) = compactor.compact().await?;
179+
let (sys_data, strm) = compactor.compact_into_stream().await?;
180180
let got = strm
181181
.map_ok(|x| serde_json::to_string(&x).unwrap())
182182
.try_collect::<Vec<_>>()

src/meta/raft-store/src/sm_v003/compact_with_db_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ async fn test_compact_output_3_level() -> anyhow::Result<()> {
269269

270270
let mut compactor = lm.acquire_compactor().await;
271271

272-
let (sys_data, strm) = compactor.compact().await?;
272+
let (sys_data, strm) = compactor.compact_into_stream().await?;
273273

274274
assert_eq!(sys_data.curr_seq(), 7);
275275
assert_eq!(

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ use openraft::entry::RaftEntry;
2727

2828
use crate::applier::Applier;
2929
use crate::leveled_store::leveled_map::compactor::Compactor;
30+
use crate::leveled_store::leveled_map::compactor_acquirer::CompactorAcquirer;
31+
use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit;
3032
use crate::leveled_store::leveled_map::LeveledMap;
3133
use crate::leveled_store::sys_data_api::SysDataApiRO;
3234
use crate::sm_v003::sm_v003_kv_api::SMV003KVApi;
33-
use crate::state_machine::ExpireKey;
3435
use crate::state_machine_api::StateMachineApi;
3536

3637
type OnChange = Box<dyn Fn((String, Option<SeqV>, Option<SeqV>)) + Send + Sync>;
@@ -39,9 +40,6 @@ type OnChange = Box<dyn Fn((String, Option<SeqV>, Option<SeqV>)) + Send + Sync>;
3940
pub struct SMV003 {
4041
levels: LeveledMap,
4142

42-
/// The expiration key since which for next clean.
43-
expire_cursor: ExpireKey,
44-
4543
/// Callback when a change is applied to state machine
4644
pub(crate) on_change_applied: Option<OnChange>,
4745
}
@@ -50,7 +48,6 @@ impl fmt::Debug for SMV003 {
5048
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
5149
f.debug_struct("SMV003")
5250
.field("levels", &self.levels)
53-
.field("expire_cursor", &self.expire_cursor)
5451
.field(
5552
"on_change_applied",
5653
&self.on_change_applied.as_ref().map(|_x| "is_set"),
@@ -198,12 +195,18 @@ impl SMV003 {
198195
self.levels.freeze_writable();
199196
}
200197

201-
pub fn try_acquire_compactor(&mut self) -> Option<Compactor> {
202-
self.map_mut().try_acquire_compactor()
198+
/// A shortcut
199+
pub async fn acquire_compactor(&self) -> Compactor {
200+
let permit = self.new_compactor_acquirer().acquire().await;
201+
self.new_compactor(permit)
202+
}
203+
204+
pub fn new_compactor_acquirer(&self) -> CompactorAcquirer {
205+
self.levels.new_compactor_acquirer()
203206
}
204207

205-
pub async fn acquire_compactor(&mut self) -> Compactor {
206-
self.levels.acquire_compactor().await
208+
pub fn new_compactor(&self, permit: CompactorPermit) -> Compactor {
209+
self.levels.new_compactor(permit)
207210
}
208211

209212
/// Replace all the state machine data with the given one.
@@ -220,9 +223,5 @@ impl SMV003 {
220223
);
221224

222225
self.levels = level;
223-
224-
// The installed data may not clean up all expired keys, if it is built with an older state machine.
225-
// So we need to reset the cursor then the next time applying a log it will clean up all expired.
226-
self.expire_cursor = ExpireKey::new(0, 0);
227226
}
228227
}

0 commit comments

Comments
 (0)