Skip to content

Commit 56110d8

Browse files
Merge branch 'master' into rotation
2 parents a5f9c07 + 8ff9aea commit 56110d8

File tree

10 files changed

+195
-66
lines changed

10 files changed

+195
-66
lines changed

Cargo.lock

Lines changed: 3 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ crossbeam-channel = "0.4.0"
99
crossbeam-utils = "0.7.0"
1010
log = "0.4.6"
1111
rand = "0.7.2"
12-
snap = "0.2.5"
12+
snap = "1.0.0"
1313
crc32c = "0.4.0"
1414
fs2 = "0.4.3"
1515
fxhash = "0.2.1"

src/db/mod.rs

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub trait DB {
9191
#[derive(Clone)]
9292
pub struct WickDB<S: Storage + Clone + 'static> {
9393
inner: Arc<DBImpl<S>>,
94+
shutdown_batch_processing_thread: (Sender<()>, Receiver<()>),
95+
shutdown_compaction_thread: (Sender<()>, Receiver<()>),
9496
}
9597

9698
pub type WickDBIterator<S> = DBIterator<
@@ -152,15 +154,19 @@ impl<S: Storage + Clone> DB for WickDB<S> {
152154

153155
fn close(&mut self) -> Result<()> {
154156
self.inner.is_shutting_down.store(true, Ordering::Release);
155-
match &self.inner.db_lock {
156-
Some(lock) => lock.unlock(),
157-
None => Ok(()),
158-
}
157+
self.inner.schedule_close_batch();
158+
let _ = self.shutdown_batch_processing_thread.1.recv();
159+
// Send a signal to avoid blocking forever
160+
let _ = self.inner.do_compaction.0.send(());
161+
let _ = self.shutdown_compaction_thread.1.recv();
162+
self.inner.close()?;
163+
debug!("DB {} closed", &self.inner.db_name);
164+
Ok(())
159165
}
160166

161167
fn destroy(&mut self) -> Result<()> {
162168
let db = self.inner.clone();
163-
db.is_shutting_down.store(true, Ordering::Release);
169+
self.close()?;
164170
db.env.remove_dir(&db.db_name, true)
165171
}
166172

@@ -173,6 +179,7 @@ impl<S: Storage + Clone> WickDB<S> {
173179
/// Create a new WickDB
174180
pub fn open_db(mut options: Options, db_name: &'static str, storage: S) -> Result<Self> {
175181
options.initialize(db_name.to_owned(), &storage);
182+
debug!("Open db: '{}'", db_name);
176183
let mut db = DBImpl::new(options, db_name, storage);
177184
let (mut edit, should_save_manifest) = db.recover()?;
178185
let mut versions = db.versions.lock().unwrap();
@@ -188,16 +195,20 @@ impl<S: Storage + Clone> WickDB<S> {
188195
if should_save_manifest {
189196
edit.set_prev_log_number(0);
190197
edit.set_log_number(versions.log_number());
198+
dbg!("log_and_apply in open_db");
191199
versions.log_and_apply(&mut edit)?;
192200
}
193201

194202
let current = versions.current();
195203
db.delete_obsolete_files(versions);
196204
let wick_db = WickDB {
197205
inner: Arc::new(db),
206+
shutdown_batch_processing_thread: crossbeam_channel::bounded(1),
207+
shutdown_compaction_thread: crossbeam_channel::bounded(1),
198208
};
199209
wick_db.process_compaction();
200210
wick_db.process_batch();
211+
// Schedule a compaction to current version for potential unfinished work
201212
wick_db.inner.maybe_schedule_compaction(current);
202213
Ok(wick_db)
203214
}
@@ -223,9 +234,17 @@ impl<S: Storage + Clone> WickDB<S> {
223234
// 5. Update sequence of version set
224235
fn process_batch(&self) {
225236
let db = self.inner.clone();
237+
let shutdown = self.shutdown_batch_processing_thread.0.clone();
226238
thread::spawn(move || {
227239
loop {
228240
if db.is_shutting_down.load(Ordering::Acquire) {
241+
// Cleanup all the batch queue
242+
let mut queue = db.batch_queue.lock().unwrap();
243+
while let Some(batch) = queue.pop_front() {
244+
let _ = batch.signal.send(Err(Error::DBClosed(
245+
"DB is closing. Clean up all the batch in queue".to_owned(),
246+
)));
247+
}
229248
break;
230249
}
231250
let first = {
@@ -236,6 +255,9 @@ impl<S: Storage + Clone> WickDB<S> {
236255
}
237256
queue.pop_front().unwrap()
238257
};
258+
if first.stop_process {
259+
break;
260+
}
239261
let force = first.force_mem_compaction;
240262
// TODO: The VersionSet is locked when processing `make_room_for_write`
241263
match db.make_room_for_write(force) {
@@ -313,13 +335,16 @@ impl<S: Storage + Clone> WickDB<S> {
313335
}
314336
}
315337
}
338+
shutdown.send(()).unwrap();
339+
debug!("batch processing thread shut down");
316340
});
317341
}
318342

319343
// Process a compaction work when receiving the signal.
320344
// The compaction might run recursively since we produce new table files.
321345
fn process_compaction(&self) {
322346
let db = self.inner.clone();
347+
let shutdown = self.shutdown_compaction_thread.0.clone();
323348
thread::spawn(move || {
324349
while let Ok(()) = db.do_compaction.1.recv() {
325350
if db.is_shutting_down.load(Ordering::Acquire) {
@@ -338,6 +363,8 @@ impl<S: Storage + Clone> WickDB<S> {
338363
let current = db.versions.lock().unwrap().current();
339364
db.maybe_schedule_compaction(current);
340365
}
366+
shutdown.send(()).unwrap();
367+
debug!("compaction thread shut down");
341368
});
342369
}
343370
}
@@ -389,9 +416,18 @@ unsafe impl<S: Storage + Clone> Send for DBImpl<S> {}
389416
impl<S: Storage + Clone> Drop for DBImpl<S> {
390417
#[allow(unused_must_use)]
391418
fn drop(&mut self) {
419+
if !self.is_shutting_down.load(Ordering::Acquire) {
420+
let _ = self.close();
421+
}
422+
}
423+
}
424+
425+
impl<S: Storage + Clone> DBImpl<S> {
426+
fn close(&self) -> Result<()> {
392427
self.is_shutting_down.store(true, Ordering::Release);
393-
if let Some(lock) = self.db_lock.as_ref() {
394-
lock.unlock();
428+
match &self.db_lock {
429+
Some(lock) => lock.unlock(),
430+
None => Ok(()),
395431
}
396432
}
397433
}
@@ -492,13 +528,16 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
492528
new_db.set_log_number(0);
493529
new_db.set_next_file(2);
494530
new_db.set_last_sequence(0);
531+
// Create manifest
495532
let manifest_filenum = 1;
496533
let manifest_filename =
497534
generate_filename(self.db_name, FileType::Manifest, manifest_filenum);
535+
debug!("Create manifest file: {}", &manifest_filename);
498536
let manifest = self.env.create(manifest_filename.as_str())?;
499537
let mut manifest_writer = Writer::new(manifest);
500538
let mut record = vec![];
501539
new_db.encode_to(&mut record);
540+
debug!("Append manifest record: {:?}", &new_db);
502541
match manifest_writer.add_record(&record) {
503542
Ok(()) => update_current(&self.env, self.db_name, manifest_filenum)?,
504543
Err(e) => {
@@ -530,7 +569,7 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
530569
let prev_log = versions.prev_log_number();
531570
let all_files = self.env.list(self.db_name)?;
532571
let mut logs_to_recover = vec![];
533-
for filename in all_files.iter() {
572+
for filename in all_files {
534573
if let Some((file_type, file_number)) = parse_filename(filename) {
535574
if file_type == FileType::Log && (file_number >= min_log || file_number == prev_log)
536575
{
@@ -701,6 +740,20 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
701740
}
702741
}
703742

743+
// Schedule a WriteBatch to close batch processing thread for gracefully shutting down db
744+
fn schedule_close_batch(&self) {
745+
let (send, _) = crossbeam_channel::bounded(0);
746+
let task = BatchTask {
747+
stop_process: true,
748+
force_mem_compaction: false,
749+
batch: WriteBatch::default(),
750+
signal: send,
751+
options: WriteOptions::default(),
752+
};
753+
self.batch_queue.lock().unwrap().push_back(task);
754+
self.process_batch_sem.notify_all();
755+
}
756+
704757
// Schedule the WriteBatch and wait for the result from the receiver.
705758
// This function wakes up the thread in `process_batch`.
706759
// An empty `WriteBatch` will trigger a force memtable compaction.
@@ -718,6 +771,7 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
718771
}
719772
let (send, recv) = crossbeam_channel::bounded(0);
720773
let task = BatchTask {
774+
stop_process: false,
721775
force_mem_compaction,
722776
batch,
723777
signal: send,
@@ -747,7 +801,8 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
747801
// Group several batches from queue
748802
while !queue.is_empty() {
749803
let current = queue.pop_front().unwrap();
750-
if current.options.sync && !grouped.options.sync {
804+
if current.stop_process || (current.options.sync && !grouped.options.sync) {
805+
// Do not include a stop process batch
751806
// Do not include a sync write into a batch handled by a non-sync write.
752807
queue.push_front(current);
753808
break;
@@ -1291,6 +1346,7 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
12911346

12921347
// A wrapper struct for scheduling `WriteBatch`
12931348
struct BatchTask {
1349+
stop_process: bool, // flag for shutdown the batch processing thread gracefully
12941350
force_mem_compaction: bool,
12951351
batch: WriteBatch,
12961352
signal: Sender<Result<()>>,
@@ -1418,7 +1474,7 @@ mod tests {
14181474
}
14191475

14201476
fn new_test_options(o: TestOption) -> Options {
1421-
match o {
1477+
let opt = match o {
14221478
TestOption::Default => Options::default(),
14231479
TestOption::Reuse => {
14241480
let mut o = Options::default();
@@ -1436,10 +1492,12 @@ mod tests {
14361492
o.compression = CompressionType::NoCompression;
14371493
o
14381494
}
1439-
}
1495+
};
1496+
opt
14401497
}
14411498
struct DBTest {
1442-
store: MemStorage, // With the same db's inner storage
1499+
store: MemStorage, // Used as the db's inner storage
1500+
opt: Options, // Used as the db's options
14431501
db: WickDB<MemStorage>,
14441502
}
14451503

@@ -1477,8 +1535,16 @@ mod tests {
14771535
fn new(opt: Options) -> Self {
14781536
let store = MemStorage::default();
14791537
let name = "db_test";
1480-
let db = WickDB::open_db(opt, name, store.clone()).unwrap();
1481-
DBTest { store, db }
1538+
let db = WickDB::open_db(opt.clone(), name, store.clone()).unwrap();
1539+
DBTest { store, opt, db }
1540+
}
1541+
1542+
// Close the inner db without destroy the contents and establish a new WickDB on same db path with same option
1543+
fn reopen(&mut self) -> Result<()> {
1544+
self.db.close()?;
1545+
let db = WickDB::open_db(self.opt.clone(), self.db.inner.db_name, self.store.clone())?;
1546+
self.db = db;
1547+
Ok(())
14821548
}
14831549

14841550
// Put entries with default `WriteOptions`
@@ -1641,8 +1707,8 @@ mod tests {
16411707
let store = MemStorage::default();
16421708
let name = "db_test";
16431709
let opt = new_test_options(TestOption::Default);
1644-
let db = WickDB::open_db(opt, name, store.clone()).unwrap();
1645-
DBTest { store, db }
1710+
let db = WickDB::open_db(opt.clone(), name, store.clone()).unwrap();
1711+
DBTest { store, opt, db }
16461712
}
16471713
}
16481714

@@ -2064,4 +2130,30 @@ mod tests {
20642130
assert_iter_entry(&iter, "a", "va");
20652131
}
20662132
}
2133+
2134+
#[test]
2135+
fn test_reopen_with_empty_db() {
2136+
for mut t in default_cases() {
2137+
t.reopen().unwrap();
2138+
t.reopen().unwrap();
2139+
}
2140+
}
2141+
2142+
#[test]
2143+
fn test_recover_with_entries() {
2144+
for mut t in default_cases() {
2145+
t.put_entries(vec![("foo", "v1"), ("baz", "v5")]);
2146+
t.reopen().unwrap();
2147+
assert_eq!(t.get("foo", None).unwrap(), "v1");
2148+
assert_eq!(t.get("baz", None).unwrap(), "v5");
2149+
2150+
t.put_entries(vec![("bar", "v2"), ("foo", "v3")]);
2151+
t.reopen().unwrap();
2152+
assert_eq!(t.get("foo", None).unwrap(), "v3");
2153+
t.put("foo", "v4").unwrap();
2154+
assert_eq!(t.get("bar", None).unwrap(), "v2");
2155+
assert_eq!(t.get("foo", None).unwrap(), "v4");
2156+
assert_eq!(t.get("baz", None).unwrap(), "v5");
2157+
}
2158+
}
20672159
}

src/logger.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,22 @@ impl Logger {
4646
let inner = match inner {
4747
Some(l) => l,
4848
None => {
49-
let drain = if cfg!(debug_assertions) {
49+
if cfg!(debug_assertions) {
5050
// Use std out
5151
let decorator = slog_term::TermDecorator::new().build();
52-
let drain = slog_term::FullFormat::new(decorator).build().fuse();
53-
slog_async::Async::new(drain).build().fuse()
52+
let drain = Mutex::new(slog_term::FullFormat::new(decorator).build()).fuse();
53+
slog::Logger::root(drain, o!())
5454
} else {
5555
// Use a file `Log` to record all logs
56+
// TODO: add file rotation
5657
let file = storage
5758
.create(generate_filename(db_path, FileType::InfoLog, 0).as_str())
5859
.unwrap();
59-
slog_async::Async::new(FileBasedDrain::new(file))
60+
let drain = slog_async::Async::new(FileBasedDrain::new(file))
6061
.build()
61-
.fuse()
62-
};
63-
slog::Logger::root(drain, o!())
62+
.fuse();
63+
slog::Logger::root(drain, o!())
64+
}
6465
}
6566
};
6667
Self { inner, level }
@@ -95,7 +96,16 @@ impl Log for Logger {
9596
level,
9697
tag: target,
9798
};
98-
self.inner.log(&slog::Record::new(&s, args, slog::b!()))
99+
if cfg!(debug_assertions) {
100+
let meta_info = format!("{}:{}", file, line);
101+
self.inner.log(&slog::Record::new(
102+
&s,
103+
args,
104+
slog::b!("[location]" => meta_info),
105+
))
106+
} else {
107+
self.inner.log(&slog::Record::new(&s, args, slog::b!()))
108+
}
99109
}
100110
}
101111
fn flush(&self) {}

src/options.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl From<u8> for CompressionType {
4444
}
4545

4646
/// Options to control the behavior of a database (passed to `DB::Open`)
47+
#[derive(Clone)]
4748
pub struct Options {
4849
// -------------------
4950
// Parameters that affect behavior:

0 commit comments

Comments
 (0)