Skip to content

Commit d0812d5

Browse files
authored
fix: hdfs read into buffer fully (#2031)
* fix: hdfs read into buffer fully
1 parent b4ac876 commit d0812d5

File tree

1 file changed

+46
-18
lines changed
  • native/hdfs/src/object_store

1 file changed

+46
-18
lines changed

native/hdfs/src/object_store/hdfs.rs

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,33 @@ impl HadoopFileSystem {
8888

8989
fn read_range(range: &Range<u64>, file: &HdfsFile) -> Result<Bytes> {
9090
let to_read = (range.end - range.start) as usize;
91+
let mut total_read = 0;
9192
let mut buf = vec![0; to_read];
92-
let read = file
93-
.read_with_pos(range.start as i64, buf.as_mut_slice())
94-
.map_err(to_error)?;
95-
assert_eq!(
96-
to_read as i32,
97-
read,
98-
"Read path {} from {} with expected size {} and actual size {}",
99-
file.path(),
100-
range.start,
101-
to_read,
102-
read
103-
);
93+
while total_read < to_read {
94+
let read = file
95+
.read_with_pos(
96+
(range.start + total_read as u64) as i64,
97+
buf[total_read..].as_mut(),
98+
)
99+
.map_err(to_error)?;
100+
if read <= 0 {
101+
break;
102+
}
103+
total_read += read as usize;
104+
}
105+
106+
if total_read != to_read {
107+
return Err(Error::Generic {
108+
store: "HadoopFileSystem",
109+
source: Box::new(HdfsErr::Generic(format!(
110+
"Error reading path {} at position {} with expected size {} and actual size {}",
111+
file.path(),
112+
range.start,
113+
to_read,
114+
total_read
115+
))),
116+
});
117+
}
104118
Ok(buf.into())
105119
}
106120
}
@@ -141,13 +155,27 @@ impl ObjectStore for HadoopFileSystem {
141155
let file_status = file.get_file_status().map_err(to_error)?;
142156

143157
let to_read = file_status.len();
158+
let mut total_read = 0;
144159
let mut buf = vec![0; to_read];
145-
let read = file.read(buf.as_mut_slice()).map_err(to_error)?;
146-
assert_eq!(
147-
to_read as i32, read,
148-
"Read path {} with expected size {} and actual size {}",
149-
&location, to_read, read
150-
);
160+
while total_read < to_read {
161+
let read = file.read(buf.as_mut_slice()).map_err(to_error)?;
162+
if read <= 0 {
163+
break;
164+
}
165+
total_read += read as usize;
166+
}
167+
168+
if total_read != to_read {
169+
return Err(Error::Generic {
170+
store: "HadoopFileSystem",
171+
source: Box::new(HdfsErr::Generic(format!(
172+
"Error reading path {} with expected size {} and actual size {}",
173+
file.path(),
174+
to_read,
175+
total_read
176+
))),
177+
});
178+
}
151179

152180
file.close().map_err(to_error)?;
153181

0 commit comments

Comments
 (0)