From f9c8c7cad780c7779af59ac05c3dc0b24ac7b49b Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 15 Jul 2025 16:05:52 -0700 Subject: [PATCH 1/4] fix: hdfs read into buffer fully --- native/hdfs/src/object_store/hdfs.rs | 37 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index 0123782716..c480823037 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -88,19 +88,18 @@ impl HadoopFileSystem { fn read_range(range: &Range, file: &HdfsFile) -> Result { let to_read = (range.end - range.start) as usize; + let mut total_read = 0u64; let mut buf = vec![0; to_read]; - let read = file - .read_with_pos(range.start as i64, buf.as_mut_slice()) - .map_err(to_error)?; - assert_eq!( - to_read as i32, - read, - "Read path {} from {} with expected size {} and actual size {}", - file.path(), - range.start, - to_read, - read - ); + while total_read < to_read as u64 { + let read = + file + .read_with_pos((range.start + total_read) as i64, buf[total_read as usize..].as_mut()) + .map_err(to_error)?; + if read == -1 { + break; + } + total_read = total_read + read as u64; + } Ok(buf.into()) } } @@ -141,13 +140,15 @@ impl ObjectStore for HadoopFileSystem { let file_status = file.get_file_status().map_err(to_error)?; let to_read = file_status.len(); + let mut total_read = 0u64; let mut buf = vec![0; to_read]; - let read = file.read(buf.as_mut_slice()).map_err(to_error)?; - assert_eq!( - to_read as i32, read, - "Read path {} with expected size {} and actual size {}", - &location, to_read, read - ); + while total_read < to_read as u64 { + let read = file.read(buf.as_mut_slice()).map_err(to_error)?; + if read == -1 { + break; + } + total_read = total_read + read as u64; + } file.close().map_err(to_error)?; From 16b5c55bcc32cde4c9d0495712fce657b65c1489 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 16 Jul 2025 12:55:02 -0700 Subject: [PATCH 2/4] handle error case correctly --- native/hdfs/src/object_store/hdfs.rs | 37 ++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index c480823037..98142eaec0 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -91,15 +91,30 @@ impl HadoopFileSystem { let mut total_read = 0u64; let mut buf = vec![0; to_read]; while total_read < to_read as u64 { - let read = - file - .read_with_pos((range.start + total_read) as i64, buf[total_read as usize..].as_mut()) + let read = file + .read_with_pos( + (range.start + total_read) as i64, + buf[total_read as usize..].as_mut(), + ) .map_err(to_error)?; - if read == -1 { + if read <= 0 { break; } total_read = total_read + read as u64; } + + if total_read != to_read as u64 { + return Err(Error::Generic { + store: "HadoopFileSystem", + source: Box::new(HdfsErr::Generic(format!( + "Error reading path {} at position {} with expected size {} and actual size {}", + file.path(), + range.start, + to_read, + total_read + ))), + }); + } Ok(buf.into()) } } @@ -144,12 +159,24 @@ impl ObjectStore for HadoopFileSystem { let mut buf = vec![0; to_read]; while total_read < to_read as u64 { let read = file.read(buf.as_mut_slice()).map_err(to_error)?; - if read == -1 { + if read <= 0 { break; } total_read = total_read + read as u64; } + if total_read != to_read as u64 { + return Err(Error::Generic { + store: "HadoopFileSystem", + source: Box::new(HdfsErr::Generic(format!( + "Error reading path {} with expected size {} and actual size {}", + file.path(), + to_read, + total_read + ))), + }); + } + file.close().map_err(to_error)?; let object_metadata = convert_metadata(file_status.clone(), &hdfs_root); From e630c1e237a8787f085563fd1cf626235bf59385 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 16 Jul 2025 13:29:35 -0700 Subject: [PATCH 3/4] clippy --- native/hdfs/src/object_store/hdfs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index 98142eaec0..7d0baa762e 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -100,7 +100,7 @@ impl HadoopFileSystem { if read <= 0 { break; } - total_read = total_read + read as u64; + total_read += read as u64; } if total_read != to_read as u64 { @@ -162,7 +162,7 @@ impl ObjectStore for HadoopFileSystem { if read <= 0 { break; } - total_read = total_read + read as u64; + total_read += read as u64; } if total_read != to_read as u64 { From b24df14ef27dc1a95a6bd2e54d379458df5b2fd0 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 17 Jul 2025 10:37:02 -0700 Subject: [PATCH 4/4] use usize for total_read --- native/hdfs/src/object_store/hdfs.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index 7d0baa762e..bb03001082 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -88,22 +88,22 @@ impl HadoopFileSystem { fn read_range(range: &Range, file: &HdfsFile) -> Result { let to_read = (range.end - range.start) as usize; - let mut total_read = 0u64; + let mut total_read = 0; let mut buf = vec![0; to_read]; - while total_read < to_read as u64 { + while total_read < to_read { let read = file .read_with_pos( - (range.start + total_read) as i64, - buf[total_read as usize..].as_mut(), + (range.start + total_read as u64) as i64, + buf[total_read..].as_mut(), ) .map_err(to_error)?; if read <= 0 { break; } - total_read += read as u64; + total_read += read as usize; } - if total_read != to_read as u64 { + if total_read != to_read { return Err(Error::Generic { store: "HadoopFileSystem", source: Box::new(HdfsErr::Generic(format!( @@ -155,17 +155,17 @@ impl ObjectStore for HadoopFileSystem { let file_status = file.get_file_status().map_err(to_error)?; let to_read = file_status.len(); - let mut total_read = 0u64; + let mut total_read = 0; let mut buf = vec![0; to_read]; - while total_read < to_read as u64 { + while total_read < to_read { let read = file.read(buf.as_mut_slice()).map_err(to_error)?; if read <= 0 { break; } - total_read += read as u64; + total_read += read as usize; } - if total_read != to_read as u64 { + if total_read != to_read { return Err(Error::Generic { store: "HadoopFileSystem", source: Box::new(HdfsErr::Generic(format!(