Skip to content

Commit 47b96b9

Browse files
author
Omar Marzouk
committed
Read Ahead Bug Fixes
1 parent 1d10986 commit 47b96b9

File tree

4 files changed

+64
-28
lines changed

4 files changed

+64
-28
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,49 @@ void Cleanup(TF_RandomAccessFile* file) {
5252
int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
5353
char* ret, TF_Status* status) {
5454
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
55-
for (auto& read_buf : dfs_file->buffers) {
56-
if (read_buf.CacheHit(offset, n))
57-
return read_buf.CopyFromCache(ret, offset, n, dfs_file->file_size,
58-
status);
59-
}
55+
if (offset > dfs_file->file_size) return -1;
56+
size_t ret_offset = 0;
57+
size_t curr_offset = offset;
58+
int64_t total_bytes = 0;
59+
size_t ret_size = offset + n;
60+
while (curr_offset < ret_size && curr_offset < dfs_file->file_size) {
61+
size_t read_bytes = 0;
62+
for (auto& read_buf : dfs_file->buffers) {
63+
if (read_buf.CacheHit(curr_offset)) {
64+
read_bytes = read_buf.CopyFromCache(ret, ret_offset, offset, n,
65+
dfs_file->file_size, status);
66+
}
67+
}
6068

61-
size_t curr_offset = offset + BUFF_SIZE;
62-
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
63-
if (curr_offset > dfs_file->file_size) break;
64-
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file.file,
65-
curr_offset);
66-
curr_offset += BUFF_SIZE;
67-
}
69+
if (read_bytes > 0) {
70+
curr_offset += read_bytes;
71+
ret_offset += read_bytes;
72+
total_bytes += read_bytes;
73+
n -= read_bytes;
74+
continue;
75+
}
6876

69-
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
70-
offset);
77+
size_t async_offset = curr_offset + BUFF_SIZE;
78+
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
79+
if (async_offset > dfs_file->file_size) break;
80+
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs,
81+
dfs_file->daos_file.file, async_offset);
82+
async_offset += BUFF_SIZE;
83+
}
84+
85+
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
86+
curr_offset);
87+
88+
read_bytes = dfs_file->buffers[0].CopyFromCache(
89+
ret, ret_offset, curr_offset, n, dfs_file->file_size, status);
90+
91+
curr_offset += read_bytes;
92+
ret_offset += read_bytes;
93+
total_bytes += read_bytes;
94+
n -= read_bytes;
95+
}
7196

72-
return dfs_file->buffers[0].CopyFromCache(ret, offset, n, dfs_file->file_size,
73-
status);
97+
return total_bytes;
7498
}
7599

76100
} // namespace tf_random_access_file

tensorflow_io/core/filesystems/dfs/dfs_utils.cc

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ ReadBuffer::ReadBuffer(size_t id, daos_handle_t eqh, size_t size)
515515
: id(id), buffer_size(size), eqh(eqh) {
516516
buffer = new char[size];
517517
buffer_offset = 0;
518+
initialized = false;
518519
event = new daos_event_t;
519520
daos_event_init(event, eqh, nullptr);
520521
valid = false;
@@ -540,11 +541,12 @@ ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
540541
valid = false;
541542
read_buffer.buffer = nullptr;
542543
read_buffer.event = nullptr;
544+
initialized = false;
543545
}
544546

545-
bool ReadBuffer::CacheHit(const size_t pos, const size_t len) {
546-
return pos >= buffer_offset && len <= buffer_size &&
547-
(pos + len <= buffer_offset + buffer_size);
547+
bool ReadBuffer::CacheHit(const size_t pos) {
548+
return pos >= buffer_offset && (pos < buffer_offset + buffer_size) &&
549+
initialized;
548550
}
549551

550552
int ReadBuffer::WaitEvent() {
@@ -575,6 +577,7 @@ int ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
575577
rsgl.sg_iovs = &iov;
576578
valid = false;
577579
buffer_offset = off;
580+
initialized = true;
578581
dfs_read(daos_fs, file, &rsgl, buffer_offset, &read_size, event);
579582
return 0;
580583
}
@@ -587,23 +590,29 @@ int ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
587590
rsgl.sg_iovs = &iov;
588591
valid = false;
589592
buffer_offset = off;
593+
initialized = true;
590594
rc = dfs_read(daos_fs, file, &rsgl, off, &read_size, NULL);
591595
if (!rc) valid = true;
592596
return rc;
593597
}
594598

595-
int ReadBuffer::CopyData(char* ret, const size_t off, const size_t n) {
599+
int ReadBuffer::CopyData(char* ret, const size_t ret_offset, const size_t off,
600+
const size_t n) {
596601
int rc = WaitEvent();
597602
if (rc) return rc;
598-
memcpy(ret, buffer + (off - buffer_offset), n);
603+
memcpy(ret + ret_offset, buffer + (off - buffer_offset), n);
599604
return 0;
600605
}
601606

602-
int ReadBuffer::CopyFromCache(char* ret, const size_t off, const size_t n,
607+
int ReadBuffer::CopyFromCache(char* ret, const size_t ret_offset,
608+
const size_t off, const size_t n,
603609
const daos_size_t file_size, TF_Status* status) {
604610
size_t read_size;
605611
read_size = off + n > file_size ? file_size - off : n;
606-
int rc = CopyData(ret, off, read_size);
612+
read_size = off + read_size > buffer_offset + buffer_size
613+
? buffer_offset + buffer_size - off
614+
: read_size;
615+
int rc = CopyData(ret, ret_offset, off, read_size);
607616
if (rc) {
608617
TF_SetStatus(status, TF_INTERNAL, "");
609618
return 0;

tensorflow_io/core/filesystems/dfs/dfs_utils.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ class ReadBuffer {
215215

216216
~ReadBuffer();
217217

218-
bool CacheHit(const size_t pos, const size_t off);
218+
bool CacheHit(const size_t pos);
219219

220220
int WaitEvent();
221221

@@ -225,10 +225,12 @@ class ReadBuffer {
225225

226226
int ReadSync(dfs_t* dfs, dfs_obj_t* file, const size_t off);
227227

228-
int CopyData(char* ret, const size_t offset, const size_t n);
228+
int CopyData(char* ret, const size_t ret_offset, const size_t offset,
229+
const size_t n);
229230

230-
int CopyFromCache(char* ret, const size_t off, const size_t n,
231-
const daos_size_t file_size, TF_Status* status);
231+
int CopyFromCache(char* ret, const size_t ret_offset, const size_t off,
232+
const size_t n, const daos_size_t file_size,
233+
TF_Status* status);
232234

233235
private:
234236
size_t id;
@@ -241,6 +243,7 @@ class ReadBuffer {
241243
d_iov_t iov;
242244
bool valid;
243245
daos_size_t read_size;
246+
bool initialized;
244247
};
245248

246249
#endif // TENSORFLOW_IO_CORE_FILESYSTEMS_DFS_DFS_FILESYSTEM_H_

tests/test_dfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def test_list_directory(self):
139139
# Setup and check preconditions.
140140
dir_name = self._path_to("listdir")
141141
tf.io.gfile.mkdir(dir_name)
142-
file_names = [self._path_to(f"listdir/{i}") for i in range(1, 4)]
142+
file_names = [self._path_to(f"listdir/{i}") for i in range(1, 33)]
143143

144144
for file_name in file_names:
145145
with tf.io.gfile.GFile(file_name, "w") as write_file:

0 commit comments

Comments
 (0)