diff --git a/Cargo.lock b/Cargo.lock index 4a6e44a..e27ca71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -700,6 +700,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "wickdb" version = "0.1.0" dependencies = [ + "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "crc32c 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index c94248e..4cf330d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ num-derive = "0.3" slog = "2.5.2" slog-term = "2.5.0" slog-async = "2.4.0" - +chrono = "0.4" [dev-dependencies] criterion = "0.3.0" diff --git a/src/db/mod.rs b/src/db/mod.rs index 7104f72..9f8024f 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -177,7 +177,7 @@ impl DB for WickDB { impl WickDB { /// Create a new WickDB pub fn open_db(mut options: Options, db_name: &'static str, storage: S) -> Result { - options.initialize(db_name.to_owned(), &storage); + options.initialize(db_name, &storage); debug!("Open db: '{}'", db_name); let mut db = DBImpl::new(options, db_name, storage); let (mut edit, should_save_manifest) = db.recover()?; diff --git a/src/logger.rs b/src/logger.rs index f8cc04a..4c1f817 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -12,17 +12,27 @@ // limitations under the License. use crate::db::filename::{generate_filename, FileType}; +use crate::error::Result; use crate::storage::{File, Storage}; use log::{LevelFilter, Log, Metadata, Record}; use slog::{o, Drain, Level}; +use chrono::prelude::*; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; /// A `slog` based logger which can be used with `log` crate /// /// See `slog` at https://github.com/slog-rs/slog /// See `log` at https://github.com/rust-lang/log +/// + +pub fn create_file(storage: &S, db_path: &str, timestamp: i64) -> Result { + let log_path = generate_filename(db_path, FileType::Log, timestamp as u64); + storage.create(log_path) +} + pub struct Logger { inner: slog::Logger, level: LevelFilter, @@ -35,11 +45,11 @@ impl Logger { /// If `inner` is `None` /// - In dev mode, use a std output /// - In release mode, use a storage specific file with name `LOG` - pub fn new( + pub fn new( inner: Option, level: LevelFilter, - storage: &S, - db_path: &str, + storage: S, + db_path: &'static str, ) -> Self { let inner = match inner { Some(l) => l, @@ -51,13 +61,13 @@ impl Logger { slog::Logger::root(drain, o!()) } else { // Use a file `Log` to record all logs - // TODO: add file rotation - let file = storage - .create(generate_filename(db_path, FileType::InfoLog, 0).as_str()) - .unwrap(); - let drain = slog_async::Async::new(FileBasedDrain::new(file)) - .build() - .fuse(); + let file = create_file(&storage, db_path, Local::now().timestamp()).unwrap(); + let file_fn = move |path: String| { + create_file(&storage, path.as_str(), Local::now().timestamp()) + }; + let drain = FileBasedDrain::new(file, db_path, file_fn) + .add_rotator(RotatedFileBySize::new(1)); + let drain = slog_async::Async::new(drain).build().fuse(); slog::Logger::root(drain, o!()) } } @@ -104,7 +114,6 @@ impl Log for Logger { } } } - fn flush(&self) {} } @@ -120,16 +129,53 @@ fn log_to_slog_level(level: log::Level) -> Level { struct FileBasedDrain { inner: Mutex, + rotators: Vec>, + db_path: String, + new_file: Box Result>, } impl FileBasedDrain { - fn new(f: F) -> Self { + fn new(f: F, path: &str, new_file: H) -> Self + where + H: 'static + Send + Fn(String) -> Result, + { FileBasedDrain { + db_path: path.to_string(), inner: Mutex::new(f), + rotators: vec![], + new_file: Box::new(new_file), } } -} + fn add_rotator(mut self, rotator: R) -> Self { + if rotator.is_enabled() { + self.rotators.push(Box::new(rotator)); + } + for rotator in self.rotators.iter() { + rotator.prepare(&*self.inner.lock().unwrap()).unwrap(); + } + self + } + + fn flush(&self) -> Result<()> { + for rotator in self.rotators.iter() { + if rotator.should_rotate() { + let new_file = (self.new_file)(self.db_path.clone()).unwrap(); + + let mut old_file = self.inner.lock().unwrap(); + *old_file = new_file; + + for rotator in self.rotators.iter() { + rotator.on_rotate(); + } + + return Ok(()); + } + } + + self.inner.lock().unwrap().flush() + } +} impl Drain for FileBasedDrain { type Ok = (); type Err = slog::Never; @@ -138,19 +184,80 @@ impl Drain for FileBasedDrain { &self, record: &slog::Record, values: &slog::OwnedKVList, - ) -> Result { - // Ignore errors here - let _ = self.inner.lock().unwrap().write( - format!( - "[{}] : {:?} \n {:?} \n", - record.level(), - record.msg(), - values - ) - .as_bytes(), + ) -> std::result::Result { + let by = format!( + "[{}] : {:?} \n {:?} \n", + record.level(), + record.msg(), + values ); + + for rotator in self.rotators.iter() { + rotator.on_write(by.as_bytes()).unwrap(); + } + + self.flush().unwrap(); + + //Ignore errors here + let _ = self.inner.lock().unwrap().write(by.as_bytes()); + + Ok(()) + } +} + +trait Rotator: Send { + /// Check if the option is enabled in configuration. + /// Return true if the `rotator` is valid. + fn is_enabled(&self) -> bool; + + /// Call by operator, initializes the states of rotators. + fn prepare(&self, file: &dyn File) -> Result<()>; + + /// Return if the file need to be rotated. + fn should_rotate(&self) -> bool; + + /// Call by operator, update rotators' state while the operator try to write some data. + fn on_write(&self, buf: &[u8]) -> Result<()>; + + // Call by operator, update rotators' state while the operator execute a rotation. + fn on_rotate(&self); +} + +struct RotatedFileBySize { + rotation_size: u64, + file_size: AtomicU64, +} + +impl RotatedFileBySize { + fn new(rotation_size: u64) -> Self { + RotatedFileBySize { + rotation_size, + file_size: AtomicU64::new(0), + } + } +} + +impl Rotator for RotatedFileBySize { + fn is_enabled(&self) -> bool { + self.rotation_size != 0 + } + fn prepare(&self, file: &dyn File) -> Result<()> { + self.file_size.store(file.len().unwrap(), Ordering::Relaxed); + Ok(()) + } + + fn should_rotate(&self) -> bool { + self.file_size.load(Ordering::Relaxed) > self.rotation_size + } + fn on_write(&self, buf: &[u8]) -> Result<()> { + let size = self.file_size.load(Ordering::Relaxed) + buf.len() as u64; + self.file_size.store(size, Ordering::Relaxed); Ok(()) } + + fn on_rotate(&self) { + self.file_size.store(0, Ordering::Relaxed) + } } #[cfg(test)] @@ -158,20 +265,65 @@ mod tests { use super::*; use crate::storage::mem::MemStorage; - use std::thread; use std::time::Duration; #[test] fn test_default_logger() { let s = MemStorage::default(); + // let s = &'static s; let db_path = "test"; - let logger = Logger::new(None, LevelFilter::Debug, &s, db_path); + let logger = Logger::new(None, LevelFilter::Debug, s, db_path); // Ignore the error if the logger have been set let _ = log::set_logger(Box::leak(Box::new(logger))); log::set_max_level(LevelFilter::Debug); - info!("Hello World"); + log::info!("Hello World"); // Wait for the async logger print the result thread::sleep(Duration::from_millis(100)); } + + #[test] + fn test_rotate_by_size() { + let db_path = "log"; + + let storage = MemStorage::default(); + let stor2 = storage.clone(); + let _ = storage.mkdir_all(db_path); + let file = create_file(&storage, db_path, 0).unwrap(); + let new_path = generate_filename(db_path, FileType::Log, 1); + + { + let file_fn = move |path: String| create_file(&storage, path.as_str(), 1); + + let drain = + FileBasedDrain::new(file, db_path, file_fn).add_rotator(RotatedFileBySize::new(1)); + let drain = slog_async::Async::new(drain).build().fuse(); + let _log = slog::Logger::root(drain, o!()); + slog::info!(_log, "Test log file rotated by size"); + } + assert_eq!(true, stor2.exists(new_path)); + } + + #[test] + fn test_not_rotate_by_size() { + let db_path = "norotate"; + + let storage = MemStorage::default(); + let stor2 = storage.clone(); + let _ = storage.mkdir_all(db_path); + let file = create_file(&storage, db_path, 0).unwrap(); + let new_path = generate_filename(db_path, FileType::Log, 1); + + { + let file_fn = move |path: String| create_file(&storage, path.as_str(), 1); + + let drain = FileBasedDrain::new(file, db_path, file_fn) + .add_rotator(RotatedFileBySize::new(100)); + let drain = slog_async::Async::new(drain).build().fuse(); + let _log = slog::Logger::root(drain, o!()); + slog::info!(_log, "Test log file rotated by size"); + } + assert_eq!(true, stor2.exists("norotate/000000.log")); + assert_eq!(false, stor2.exists(new_path)); + } } diff --git a/src/options.rs b/src/options.rs index 8e98e2d..5abdd9b 100644 --- a/src/options.rs +++ b/src/options.rs @@ -205,9 +205,9 @@ impl Options { } /// Initialize Options by limiting ranges of some flags, applying customized Logger and etc. - pub(crate) fn initialize>( + pub(crate) fn initialize + Clone + 'static>( &mut self, - db_name: String, + db_name: &'static str, storage: &S, ) { self.max_open_files = @@ -215,7 +215,7 @@ impl Options { self.write_buffer_size = Self::clip_range(self.write_buffer_size, 64 << 10, 1 << 30); self.max_file_size = Self::clip_range(self.max_file_size, 1 << 20, 1 << 30); self.block_size = Self::clip_range(self.block_size, 1 << 10, 4 << 20); - self.apply_logger(storage, &db_name); + self.apply_logger(storage, db_name); if self.block_cache.is_none() { self.block_cache = Some(Arc::new(LRUCache::new(8 << 20, None))) } @@ -225,9 +225,9 @@ impl Options { } #[allow(unused_must_use)] - fn apply_logger(&mut self, storage: &S, db_path: &str) { + fn apply_logger(&mut self, storage: &S, db_path: &'static str) { let user_logger = std::mem::replace(&mut self.logger, None); - let logger = Logger::new(user_logger, self.logger_level, storage, db_path); + let logger = Logger::new(user_logger, self.logger_level, storage.clone(), db_path); let static_logger: &'static dyn Log = Box::leak(Box::new(logger)); log::set_logger(static_logger); log::set_max_level(self.logger_level); diff --git a/src/storage/mem.rs b/src/storage/mem.rs index 88af4ac..13acd17 100644 --- a/src/storage/mem.rs +++ b/src/storage/mem.rs @@ -690,6 +690,7 @@ mod tests { let store = MemStorage::default(); // Test `create` let mut f = store.create("test1").unwrap(); + assert!(store.exists("test1")); f.write(b"hello world").unwrap(); diff --git a/src/version/version_edit.rs b/src/version/version_edit.rs index 3bca433..a7e4452 100644 --- a/src/version/version_edit.rs +++ b/src/version/version_edit.rs @@ -480,6 +480,7 @@ mod tests { let mut edit = VersionEdit::new(7); let filename = String::from("Hello"); edit.set_comparator_name(filename); + assert_eq!("Hello", edit.comparator_name.unwrap().as_str()); } @@ -488,6 +489,7 @@ mod tests { let mut edit = VersionEdit::new(7); let log_num = u64::max_value(); edit.set_log_number(log_num); + assert_eq!(edit.log_number.unwrap(), log_num); }