Skip to content
This repository was archived by the owner on Nov 19, 2024. It is now read-only.

Commit 7a30d1d

Browse files
committed
feat(lib): zstd decompression optionally from flat files
also adds more docs for functions and tests for zstd compression
1 parent d15fe6c commit 7a30d1d

File tree

3 files changed

+69
-27
lines changed

3 files changed

+69
-27
lines changed

src/lib.rs

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::fs::File;
2626
use std::io::{BufReader, Cursor, Read, Write};
2727
use std::path::PathBuf;
2828
use tokio::join;
29+
use zstd::stream::decode_all;
2930

3031
const MERGE_BLOCK: usize = 15537393;
3132

@@ -34,20 +35,26 @@ pub enum DecodeInput {
3435
Reader(Box<dyn Read>),
3536
}
3637

37-
/**
38-
* Decode & verify flat files from a directory or a single file.
39-
* Input can be a directory or a file.
40-
* headers_dir is optional but must be a directory if provided.
41-
* If headers_dir is provided, the block headers will be verified against the files in the directory.
42-
* Header files must be named after the block number they represent and be in json format (e.g. 123.json).
43-
* If input is a directory, all files with the extension .dbin will be processed.
44-
* If output is provided, the decoded blocks will be written to the directory.
45-
* If output is not provided, the decoded blocks will not be written to disk.
46-
**/
38+
/// Decodes and optionally verifies block flat files from a given directory or single file.
39+
///
40+
/// This function processes input which can be a file or a directory containing multiple `.dbin` files.
41+
/// If `headers_dir` is provided, it verifies the block headers against the files found in this directory.
42+
/// These header files must be in JSON format and named after the block number they represent (e.g., `block-<block number>.json`).
43+
/// it can also handle `zstd` compressed flat files.
44+
///
45+
/// # Arguments
46+
///
47+
/// * `input`: A [`String`] specifying the path to the input directory or file.
48+
/// * `output`: An [`Option<&str>`] specifying the directory where decoded blocks should be written.
49+
/// If `None`, decoded blocks are not written to disk.
50+
/// * `headers_dir`: An [`Option<&str>`] specifying the directory containing header files for verification.
51+
/// Must be a directory if provided.
52+
/// * `decompress`: An [`Option<bool>`] specifying if it is necessary to decompress from zstd.
4753
pub fn decode_flat_files(
4854
input: String,
4955
output: Option<&str>,
5056
headers_dir: Option<&str>,
57+
decompress: Option<bool>,
5158
) -> Result<Vec<Block>, DecodeError> {
5259
let metadata = fs::metadata(&input).map_err(DecodeError::IoError)?;
5360

@@ -56,9 +63,9 @@ pub fn decode_flat_files(
5663
}
5764

5865
if metadata.is_dir() {
59-
decode_flat_files_dir(&input, output, headers_dir)
66+
decode_flat_files_dir(&input, output, headers_dir, decompress)
6067
} else if metadata.is_file() {
61-
handle_file(&PathBuf::from(input), output, headers_dir)
68+
handle_file(&PathBuf::from(input), output, headers_dir, decompress)
6269
} else {
6370
Err(DecodeError::InvalidInput)
6471
}
@@ -68,6 +75,7 @@ fn decode_flat_files_dir(
6875
input: &str,
6976
output: Option<&str>,
7077
headers_dir: Option<&str>,
78+
decompress: Option<bool>,
7179
) -> Result<Vec<Block>, DecodeError> {
7280
let paths = fs::read_dir(input).map_err(DecodeError::IoError)?;
7381

@@ -84,7 +92,7 @@ fn decode_flat_files_dir(
8492
};
8593

8694
println!("Processing file: {}", path.path().display());
87-
match handle_file(&path.path(), output, headers_dir) {
95+
match handle_file(&path.path(), output, headers_dir, decompress) {
8896
Ok(file_blocks) => {
8997
blocks.extend(file_blocks);
9098
}
@@ -97,21 +105,40 @@ fn decode_flat_files_dir(
97105
Ok(blocks)
98106
}
99107

100-
/**
101-
* Decode & verify a single flat file.
102-
* If output is provided, the decoded blocks will be written to the directory.
103-
* If output is not provided, the decoded blocks will not be written to disk.
104-
* headers_dir is optional but must be a directory if provided.
105-
* If headers_dir is provided, the block headers will be verified against the files in the directory.
106-
* Header files must be named after the block number they represent and be in json format. (e.g. 123.json)
107-
**/
108+
/// Decodes and optionally verifies block flat files from a single file.
109+
///
110+
/// This function decodes flat files and, if an `output` directory is provided, writes the decoded blocks to this directory.
111+
/// If no `output` is specified, the decoded blocks are not written to disk. The function can also verify block headers
112+
/// against header files found in an optional `headers_dir`. These header files must be in JSON format and named after
113+
/// the block number they represent (e.g., `block-<block number>.json`). Additionally, the function supports handling `zstd` compressed
114+
/// flat files if decompression is required.
115+
///
116+
/// # Arguments
117+
///
118+
/// * `input`: A [`String`] specifying the path to the file.
119+
/// * `output`: An [`Option<&str>`] specifying the directory where decoded blocks should be written.
120+
/// If `None`, decoded blocks are not written to disk.
121+
/// * `headers_dir`: An [`Option<&str>`] specifying the directory containing header files for verification.
122+
/// Must be a directory if provided.
123+
/// * `decompress`: An [`Option<bool>`] indicating whether decompression from `zstd` format is necessary.
124+
108125
pub fn handle_file(
109126
path: &PathBuf,
110127
output: Option<&str>,
111128
headers_dir: Option<&str>,
129+
decompress: Option<bool>,
112130
) -> Result<Vec<Block>, DecodeError> {
113-
let mut input_file = BufReader::new(File::open(path).map_err(DecodeError::IoError)?);
114-
let dbin_file = DbinFile::try_from_read(&mut input_file)?;
131+
let input_file = BufReader::new(File::open(path).map_err(DecodeError::IoError)?);
132+
// Check if decompression is required and read the file accordingly.
133+
let mut file_contents: Box<dyn Read> = if decompress == Some(true) {
134+
let decompressed_data = decode_all(input_file)
135+
.map_err(|e| DecodeError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
136+
Box::new(Cursor::new(decompressed_data))
137+
} else {
138+
Box::new(input_file)
139+
};
140+
141+
let dbin_file = DbinFile::try_from_read(&mut file_contents)?;
115142
if dbin_file.header.content_type != "ETH" {
116143
return Err(DecodeError::InvalidContentType(
117144
dbin_file.header.content_type,
@@ -295,9 +322,20 @@ mod tests {
295322
fn test_handle_file() {
296323
let path = PathBuf::from("example0017686312.dbin");
297324

298-
let result = handle_file(&path, None, None);
325+
let result = handle_file(&path, None, None, None);
326+
327+
assert!(result.is_ok());
328+
}
329+
330+
#[test]
331+
fn test_handle_file_zstd() {
332+
let path = PathBuf::from("./tests/0000000000.dbin.zst");
333+
334+
let result = handle_file(&path, None, None, Some(true));
299335

300336
assert!(result.is_ok());
337+
let blocks: Vec<Block> = result.unwrap();
338+
assert_eq!(blocks[0].number, 0);
301339
}
302340

303341
#[test]

src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ enum Commands {
3232
/// output folder where decoded headers will be stored as .json
3333
#[clap(short, long)]
3434
output: Option<String>,
35+
#[clap(short, long)]
36+
/// optionally decompress zstd compressed flat files
37+
decompress: Option<bool>,
3538
},
3639
}
3740
#[tokio::main]
@@ -44,7 +47,6 @@ async fn main() {
4447
end_block,
4548
} => {
4649
if decompress {
47-
// zst decompress first
4850
let reader =
4951
zstd::stream::Decoder::new(io::stdin()).expect("Failed to create zstd decoder");
5052
let writer = BufWriter::new(io::stdout().lock());
@@ -63,9 +65,11 @@ async fn main() {
6365
input,
6466
headers_dir,
6567
output,
68+
decompress,
6669
} => {
67-
let blocks = decode_flat_files(input, output.as_deref(), headers_dir.as_deref())
68-
.expect("Failed to decode files");
70+
let blocks =
71+
decode_flat_files(input, output.as_deref(), headers_dir.as_deref(), decompress)
72+
.expect("Failed to decode files");
6973

7074
println!("Total blocks: {}", blocks.len());
7175
}

tests/0000000000.dbin.zst

312 KB
Binary file not shown.

0 commit comments

Comments
 (0)