Skip to content

fix: hdfs read into buffer fully #2031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions native/hdfs/src/object_store/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,33 @@ impl HadoopFileSystem {

fn read_range(range: &Range<u64>, file: &HdfsFile) -> Result<Bytes> {
let to_read = (range.end - range.start) as usize;
let mut total_read = 0;
let mut buf = vec![0; to_read];
let read = file
.read_with_pos(range.start as i64, buf.as_mut_slice())
.map_err(to_error)?;
assert_eq!(
to_read as i32,
read,
"Read path {} from {} with expected size {} and actual size {}",
file.path(),
range.start,
to_read,
read
);
while total_read < to_read {
let read = file
.read_with_pos(
(range.start + total_read as u64) as i64,
buf[total_read..].as_mut(),
)
.map_err(to_error)?;
if read <= 0 {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return an unexpected eof error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, otherwise the function result is probably not predictable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this implementation is incorrect. read_with_pos never returns -1. Instead, it returns 0 when hitting EOF.

libhdfs behaves strangely, and so does fs-hdfs.

References:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @Kontinuation! I based this on the return value of FSDataInputStream, but looks like both fs-hdfs and the official hadoop libhdfs behave differently.
Let me address this.
In fs-hdfs
: a return value < 0 is an error
: a return value of 0 is OK even if the buffer is not fully read into. However, it looks like other object store implementations treat this as an error. So I will treat both as an error (maybe reintroduce the original assertion).

}
total_read += read as usize;
}

if total_read != to_read {
return Err(Error::Generic {
store: "HadoopFileSystem",
source: Box::new(HdfsErr::Generic(format!(
"Error reading path {} at position {} with expected size {} and actual size {}",
file.path(),
range.start,
to_read,
total_read
))),
});
}
Ok(buf.into())
}
}
Expand Down Expand Up @@ -141,13 +155,27 @@ impl ObjectStore for HadoopFileSystem {
let file_status = file.get_file_status().map_err(to_error)?;

let to_read = file_status.len();
let mut total_read = 0;
let mut buf = vec![0; to_read];
let read = file.read(buf.as_mut_slice()).map_err(to_error)?;
assert_eq!(
to_read as i32, read,
"Read path {} with expected size {} and actual size {}",
&location, to_read, read
);
while total_read < to_read {
let read = file.read(buf.as_mut_slice()).map_err(to_error)?;
if read <= 0 {
break;
}
total_read += read as usize;
}

if total_read != to_read {
return Err(Error::Generic {
store: "HadoopFileSystem",
source: Box::new(HdfsErr::Generic(format!(
"Error reading path {} with expected size {} and actual size {}",
file.path(),
to_read,
total_read
))),
});
}

file.close().map_err(to_error)?;

Expand Down
Loading