From f386944583b85e0833adda9b95e92c069f87e929 Mon Sep 17 00:00:00 2001 From: Dirk Winkelhardt Date: Wed, 23 Apr 2025 17:01:01 +0200 Subject: [PATCH] add support for compression level 1 --- Cargo.lock | 35 ++-- Cargo.toml | 1 + .../readers/file_readers/tdf_blob_reader.rs | 191 ++++++++++++++++-- src/io/readers/frame_reader.rs | 64 ++++-- src/io/readers/metadata_reader.rs | 3 + src/ms_data/metadata.rs | 1 + 6 files changed, 253 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 612b5a4..d83a457 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "adler2" @@ -713,6 +713,12 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lzf" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c034ef7ca69017088f8522e6f74301232f13ba43b5640941abc4dce26f530521" + [[package]] name = "memchr" version = "2.7.4" @@ -907,9 +913,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.88" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] @@ -1039,7 +1045,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.100", ] [[package]] @@ -1091,9 +1097,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" dependencies = [ "proc-macro2", "quote", @@ -1102,22 +1108,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.64" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.64" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.100", ] [[package]] @@ -1144,6 +1150,7 @@ dependencies = [ "bytemuck", "criterion", "linreg", + "lzf", "memmap2", "parquet", "rayon", @@ -1240,7 +1247,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.100", "wasm-bindgen-shared", ] @@ -1262,7 +1269,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.100", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1400,7 +1407,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.100", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1923f06..64ca11d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ parquet = { version = "53.0.0", optional = true } serde = { version = "1.0.210", features = ["derive"], optional = true } serde_json = { version = "1.0.128", optional = true } timscompress = {version = "0.1.0", optional=true} +lzf = "1.0.0" [features] tdf = ["rusqlite"] diff --git a/src/io/readers/file_readers/tdf_blob_reader.rs b/src/io/readers/file_readers/tdf_blob_reader.rs index b4d2e81..28b1465 100644 --- a/src/io/readers/file_readers/tdf_blob_reader.rs +++ b/src/io/readers/file_readers/tdf_blob_reader.rs @@ -1,5 +1,6 @@ mod tdf_blobs; +use lzf::decompress as lzf_decompress; use memmap2::Mmap; use std::fs::File; use std::io; @@ -9,6 +10,7 @@ use zstd::decode_all; use crate::readers::{TimsTofFileType, TimsTofPathError, TimsTofPathLike}; const U32_SIZE: usize = std::mem::size_of::(); + const HEADER_SIZE: usize = 2; #[derive(Debug)] @@ -23,7 +25,14 @@ impl TdfBlobReader { Ok(reader) } - pub fn get(&self, offset: usize) -> Result { + /// Returns a TDF blob with uncompressed data + /// + pub fn get( + &self, + offset: usize, + compression_type: u8, + max_peaks_per_scan: usize, + ) -> Result { let offset = self.bin_file_reader.global_file_offset + offset; let byte_count = self .bin_file_reader @@ -36,10 +45,126 @@ impl TdfBlobReader { if data.len() == 0 { return Err(TdfBlobReaderError::EmptyData); } - let bytes = - decode_all(data).map_err(|_| TdfBlobReaderError::Decompression)?; - let blob = TdfBlob::new(bytes)?; - Ok(blob) + if compression_type == 1 { + let bytes = self.decompress_v1( + offset, + byte_count, + data, + max_peaks_per_scan, + )?; + let blob = TdfBlob::new(bytes)?; + Ok(blob) + } else { + let bytes = decode_all(data) + .map_err(|_| TdfBlobReaderError::Decompression)?; + let blob: TdfBlob = TdfBlob::new(bytes)?; + Ok(blob) + } + } + + /// Get a TDF blob compressed with version 1 + /// Basically a reimplementation of the alphatims implementation + /// Returns the uncompressed data compatible + /// * scan_count: 4 bytes + /// * scan_indices: (scan_count) * 4 bytes + /// * scan: remaining bytes + /// + /// # Arguments + /// * `offset` - The offset of the blob in the binary file + /// * `max_peaks_per_scan` - The maximum number of peaks per scan + /// * `data` - The compressed data + /// * `max_peaks_per_scan` - The maximum number of peaks per scan from the metadata + fn decompress_v1( + &self, + offset: usize, + byte_count: usize, + data: &[u8], + max_peaks_per_scan: usize, + ) -> Result, TdfBlobReaderError> { + // bin_size = int.from_bytes(infile.read(4), "little") + // bin_size == byte_count + + // scan_count = int.from_bytes(infile.read(4), "little") + let scan_count = self + .bin_file_reader + .get_scan_count(offset) + .ok_or(TdfBlobReaderError::NoScanCount)?; + + // TODO: frame_end - frame_start should be equal to bin_size/byte_count? + // max_peak_count = min( + // max_peaks_per_scan, + // frame_end - frame_start + // ) + let max_peak_count = std::cmp::min(max_peaks_per_scan, byte_count); + + // compression_offset = 8 + (scan_count + 1) * 4 + let compression_offset = (scan_count + 1) * U32_SIZE; + + // TODO: For some reason scan offsets were i32 not u32. Convert to u32 than to usize for easier indexing + // scan_offsets = np.frombuffer( + // infile.read((scan_count + 1) * 4), + // dtype=np.int32 + // ) - compression_offset + let mut scan_offsets = self + .bin_file_reader + .get_scan_offsets(offset, scan_count) + .ok_or(TdfBlobReaderError::CorruptData)?; + scan_offsets = scan_offsets.iter_mut().map(|x| *x - compression_offset).collect(); + + // bin_size + scan_count + scan_offsets + compressed_data (which is max_peak_count * 4) + let tdf_bytes_capacity = U32_SIZE + U32_SIZE + scan_offsets.len() * U32_SIZE + scan_count * max_peak_count; + + // this is basically the uncompressed frame + // scan_count: 4 bytes + // scan_indices: (scan_count) * 4 bytes + // scan: remaining bytes + let mut tdf_bytes = Vec::with_capacity(tdf_bytes_capacity); + tdf_bytes.extend_from_slice(&byte_count.to_le_bytes()); + + + let mut scan_indexes: Vec = Vec::with_capacity(scan_count * U32_SIZE); + let mut scans: Vec = Vec::with_capacity(byte_count); + + let mut scan_start: u32 = 0; + // for scan_index in range(scan_count): + for scan_index in 0..scan_count { + //start = scan_offsets[scan_index] + let start = scan_offsets[scan_index]; + + //end = scan_offsets[scan_index + 1] + let end = scan_offsets[scan_index + 1]; + + //if start == end: + // continue + if start == end { + continue; + } + + //decompressed_bytes = lzf.decompress( + // compressed_data[start: end], + // max_peak_count * 4 * 2 + //) + let mut decompressed_bytes = match lzf_decompress( + &data[start as usize..end as usize], + max_peak_count * U32_SIZE * 2, + ) { + Ok(bytes) => bytes, + Err(_) => return Err(TdfBlobReaderError::Decompression), + }; + + if decompressed_bytes.len() % U32_SIZE != 0 { + return Err(TdfBlobReaderError::CorruptData); + } + + scan_indexes.extend_from_slice(&scan_start.to_le_bytes()); + scan_start = decompressed_bytes.len() as u32; + scans.append(&mut decompressed_bytes); + } + + tdf_bytes.append(&mut scan_indexes); + tdf_bytes.append(&mut scans); + + Ok(tdf_bytes) } } @@ -68,6 +193,8 @@ impl TdfBinFileReader { Ok(reader) } + /// Get byte count, first 4 bytes of the blob + /// fn get_byte_count(&self, offset: usize) -> Option { let start = offset as usize; let end = start + U32_SIZE as usize; @@ -77,14 +204,33 @@ impl TdfBinFileReader { Some(byte_count) } - // fn get_scan_count(&self, offset: usize) -> Option { - // let start = (offset + U32_SIZE) as usize; - // let end = start + U32_SIZE as usize; - // let raw_scan_count = self.mmap.get(start..end)?; - // let scan_count = - // u32::from_le_bytes(raw_scan_count.try_into().ok()?) as usize; - // Some(scan_count) - // } + /// Get scan count, second 4 bytes of the blob + /// + fn get_scan_count(&self, offset: usize) -> Option { + let start = (offset + U32_SIZE) as usize; + let end = start + U32_SIZE as usize; + let raw_scan_count = self.mmap.get(start..end)?; + let scan_count = + u32::from_le_bytes(raw_scan_count.try_into().ok()?) as usize; + Some(scan_count) + } + + /// Get scan offsets, third 4 bytes of the blob + /// + fn get_scan_offsets(&self, offset: usize, scan_count: usize) -> Option> { + let start = (offset + U32_SIZE * 2) as usize; + let end = start + U32_SIZE * (scan_count + 1) as usize; + let raw_scan_offsets = self.mmap.get(start..end)?; + if raw_scan_offsets.len() % U32_SIZE != 0 { + return None; + } + let scan_offsets = raw_scan_offsets + .chunks_exact(U32_SIZE) + .map(|x| u32::from_le_bytes(x.try_into().unwrap())) + .map(|x| x as usize) + .collect::>(); + Some(scan_offsets) + } fn get_data(&self, offset: usize, byte_count: usize) -> Option<&[u8]> { let start = offset + HEADER_SIZE * U32_SIZE; @@ -106,10 +252,11 @@ impl IndexedTdfBlobReader { path: impl TimsTofPathLike, binary_offsets: Vec, ) -> Result { + let blob_reader = TdfBlobReader::new(path)?; let reader = Self { binary_offsets, - blob_reader: blob_reader, + blob_reader, }; Ok(reader) } @@ -122,7 +269,13 @@ impl IndexedTdfBlobReader { .binary_offsets .get(index) .ok_or(IndexedTdfBlobReaderError::InvalidIndex(index))?; - let blob = self.blob_reader.get(offset)?; + let blob = self.blob_reader.get( + offset, + // TODO: Compression type 1 seems to be irrelevant for minitdf. Correct? + // Set compression to type 2 for latest compression and max peaks to 0 which is only relevant for type 1 + 2, + 0 + )?; Ok(blob) } } @@ -145,6 +298,14 @@ pub enum TdfBlobReaderError { TimsTofPathError(#[from] TimsTofPathError), #[error("No binary file found")] NoBinary, + #[error("No scan count found")] + NoScanCount, + #[error("No binary size found")] + NoBinarySize, + #[error("Scan offset error")] + ScanOffsetError, + #[error("No scan offsets found")] + NoScanOffsets, } #[derive(Debug, thiserror::Error)] diff --git a/src/io/readers/frame_reader.rs b/src/io/readers/frame_reader.rs index 8417f3c..1f0b4db 100644 --- a/src/io/readers/frame_reader.rs +++ b/src/io/readers/frame_reader.rs @@ -12,7 +12,9 @@ use super::{ frame_groups::SqlWindowGroup, frames::SqlFrame, ReadableSqlTable, SqlReader, SqlReaderError, }, - tdf_blob_reader::{TdfBlob, TdfBlobReader, TdfBlobReaderError}, + tdf_blob_reader::{ + TdfBlob, TdfBlobReader, TdfBlobReaderError, + }, }, MetadataReader, MetadataReaderError, QuadrupoleSettingsReader, QuadrupoleSettingsReaderError, TimsTofPathLike, @@ -28,23 +30,27 @@ pub struct FrameReader { offsets: Vec, dia_windows: Option>>, compression_type: u8, + max_peaks_per_scan: usize, #[cfg(feature = "timscompress")] scan_count: usize, } impl FrameReader { pub fn new(path: impl TimsTofPathLike) -> Result { - let compression_type = - match MetadataReader::new(&path)?.compression_type { - 2 => 2, - #[cfg(feature = "timscompress")] - 3 => 3, - compression_type => { - return Err(FrameReaderError::CompressionTypeError( - compression_type, - )) - }, - }; + let metadata = MetadataReader::new(&path)?; + let compression_type = match metadata.compression_type { + 1 => 1, + 2 => 2, + #[cfg(feature = "timscompress")] + 3 => 3, + compression_type => { + return Err(FrameReaderError::CompressionTypeError( + compression_type, + )) + }, + }; + + let max_peaks_per_scan = metadata.max_peaks_per_scan; let tdf_sql_reader = SqlReader::open(&path)?; let sql_frames = SqlFrame::from_sql_reader(&tdf_sql_reader)?; @@ -110,6 +116,7 @@ impl FrameReader { _ => None, }, compression_type, + max_peaks_per_scan, #[cfg(feature = "timscompress")] compressed_reader, #[cfg(feature = "timscompress")] @@ -149,6 +156,7 @@ impl FrameReader { pub fn get(&self, index: usize) -> Result { match self.compression_type { + 1 => self.get_from_compression_type_1(index), 2 => self.get_from_compression_type_2(index), #[cfg(feature = "timscompress")] 3 => self.get_from_compression_type_3(index), @@ -158,6 +166,34 @@ impl FrameReader { } } + // TODO: As the resulting TDFBlob contains the uncompressed data in the same format as in + // `get_from_compression_type_2` the two functions can be merged. + fn get_from_compression_type_1( + &self, + index: usize, + ) -> Result { + // NOTE: get does it by 0-offsetting the vec, not by Frame index!!! + let mut frame = self.get_frame_without_coordinates(index)?; + let offset = self.get_binary_offset(index); + let blob = self + .tdf_bin_reader + .get(offset, self.compression_type, self.max_peaks_per_scan)?; + let scan_count: usize = + blob.get(0).ok_or(FrameReaderError::CorruptFrame)? as usize; + let peak_count: usize = (blob.len() - scan_count) / 2; + + frame.scan_offsets = read_scan_offsets(scan_count, peak_count, &blob)?; + frame.intensities = read_intensities(scan_count, peak_count, &blob)?; + frame.tof_indices = read_tof_indices( + scan_count, + peak_count, + &blob, + &frame.scan_offsets, + )?; + + Ok(frame) + } + fn get_from_compression_type_2( &self, index: usize, @@ -165,7 +201,7 @@ impl FrameReader { // NOTE: get does it by 0-offsetting the vec, not by Frame index!!! let mut frame = self.get_frame_without_coordinates(index)?; let offset = self.get_binary_offset(index); - let blob = self.tdf_bin_reader.get(offset)?; + let blob = self.tdf_bin_reader.get(offset, self.compression_type, self.max_peaks_per_scan)?; let scan_count: usize = blob.get(0).expect("Blob cannot be empty") as usize; let peak_count: usize = (blob.len() - scan_count) / 2; @@ -335,4 +371,6 @@ pub enum FrameReaderError { IndexOutOfBounds, #[error("Compression type {0} not understood")] CompressionTypeError(u8), + #[error("Got unexpected TdfBlob type")] + UnexpectedTdfBlobError, } diff --git a/src/io/readers/metadata_reader.rs b/src/io/readers/metadata_reader.rs index a5ba1db..04ed519 100644 --- a/src/io/readers/metadata_reader.rs +++ b/src/io/readers/metadata_reader.rs @@ -25,6 +25,8 @@ impl MetadataReader { SqlMetadata::from_sql_reader(&tdf_sql_reader)?; let compression_type = parse_value(&sql_metadata, "TimsCompressionType")?; + let max_peaks_per_scan = + parse_value(&sql_metadata, "MaxNumPeaksPerScan")?; let (mz_min, mz_max) = get_mz_bounds(&sql_metadata)?; let (im_min, im_max) = get_im_bounds(&sql_metadata)?; let rt_values: Vec = @@ -52,6 +54,7 @@ impl MetadataReader { lower_mz: mz_min, upper_mz: mz_max, compression_type, + max_peaks_per_scan, }; Ok(metadata) } diff --git a/src/ms_data/metadata.rs b/src/ms_data/metadata.rs index 06a3ead..b95a8a8 100644 --- a/src/ms_data/metadata.rs +++ b/src/ms_data/metadata.rs @@ -10,6 +10,7 @@ pub struct Metadata { pub im_converter: Scan2ImConverter, pub mz_converter: Tof2MzConverter, pub compression_type: u8, + pub max_peaks_per_scan: usize, pub lower_rt: f64, pub upper_rt: f64, pub lower_im: f64,