Skip to content

Commit 85231f7

Browse files
author
Omar Marzouk
committed
Event Queue De-Centralization
1 parent 47b96b9 commit 85231f7

File tree

3 files changed

+37
-21
lines changed

3 files changed

+37
-21
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ typedef struct DFSRandomAccessFile {
1313
DAOS_FILE daos_file;
1414
std::vector<ReadBuffer> buffers;
1515
daos_size_t file_size;
16-
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system,
17-
daos_handle_t eqh, dfs_obj_t* obj)
16+
daos_handle_t mEventQueueHandle{};
17+
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
1818
: dfs_path(std::move(dfs_path)) {
1919
daos_fs = file_system;
2020
daos_file.file = obj;
2121
dfs_get_size(daos_fs, obj, &file_size);
2222
size_t num_of_buffers;
2323
size_t buff_size;
24+
int rc = daos_eq_create(&mEventQueueHandle);
2425

2526
if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
2627
num_of_buffers = atoi(env_num_of_buffers);
@@ -34,16 +35,18 @@ typedef struct DFSRandomAccessFile {
3435
buff_size = BUFF_SIZE;
3536
}
3637
for (size_t i = 0; i < num_of_buffers; i++) {
37-
buffers.push_back(ReadBuffer(i, eqh, buff_size));
38+
buffers.push_back(ReadBuffer(i, mEventQueueHandle, buff_size));
3839
}
3940
}
4041
} DFSRandomAccessFile;
4142

4243
void Cleanup(TF_RandomAccessFile* file) {
4344
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
44-
for (auto& read_buf : dfs_file->buffers) {
45-
read_buf.AbortEvent();
45+
for (auto& buffer : dfs_file->buffers) {
46+
buffer.FinalizeEvent();
4647
}
48+
49+
daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
4750
dfs_release(dfs_file->daos_file.file);
4851
dfs_file->daos_fs = nullptr;
4952
delete dfs_file;
@@ -52,7 +55,11 @@ void Cleanup(TF_RandomAccessFile* file) {
5255
int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
5356
char* ret, TF_Status* status) {
5457
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
55-
if (offset > dfs_file->file_size) return -1;
58+
if (offset > dfs_file->file_size) {
59+
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
60+
return -1;
61+
}
62+
5663
size_t ret_offset = 0;
5764
size_t curr_offset = offset;
5865
int64_t total_bytes = 0;
@@ -61,7 +68,7 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
6168
size_t read_bytes = 0;
6269
for (auto& read_buf : dfs_file->buffers) {
6370
if (read_buf.CacheHit(curr_offset)) {
64-
read_bytes = read_buf.CopyFromCache(ret, ret_offset, offset, n,
71+
read_bytes = read_buf.CopyFromCache(ret, ret_offset, curr_offset, n,
6572
dfs_file->file_size, status);
6673
}
6774
}
@@ -92,6 +99,12 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
9299
ret_offset += read_bytes;
93100
total_bytes += read_bytes;
94101
n -= read_bytes;
102+
103+
if (curr_offset >= dfs_file->file_size) {
104+
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
105+
dfs_file->buffers[i].WaitEvent();
106+
}
107+
}
95108
}
96109

97110
return total_bytes;
@@ -227,8 +240,8 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
227240
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
228241
return;
229242
}
230-
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(
231-
path, daos->daos_fs, daos->mEventQueueHandle, obj);
243+
auto random_access_file =
244+
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
232245
random_access_file->buffers[0].ReadAsync(
233246
daos->daos_fs, random_access_file->daos_file.file, 0);
234247
file->plugin_file = random_access_file;

tensorflow_io/core/filesystems/dfs/dfs_utils.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,7 @@ DFS* DFS::Load() {
116116
return this;
117117
}
118118

119-
int DFS::dfsInit() {
120-
int rc = daos_init();
121-
if (rc) return rc;
122-
return daos_eq_create(&mEventQueueHandle);
123-
}
119+
int DFS::dfsInit() { return daos_init(); }
124120

125121
void DFS::dfsCleanup() {
126122
Teardown();
@@ -149,12 +145,6 @@ int DFS::Setup(const std::string& path, std::string& pool_string,
149145
}
150146

151147
void DFS::Teardown() {
152-
daos_event_t* temp_event;
153-
int ret;
154-
do {
155-
ret = daos_eq_poll(mEventQueueHandle, 1, -1, 1, &(temp_event));
156-
} while (ret == 1);
157-
daos_eq_destroy(mEventQueueHandle, 0);
158148
Unmount();
159149
ClearConnections();
160150
}
@@ -544,6 +534,18 @@ ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
544534
initialized = false;
545535
}
546536

537+
int ReadBuffer::FinalizeEvent() {
538+
int rc = 0;
539+
if (event != nullptr) {
540+
bool event_status;
541+
daos_event_test(event, 0, &event_status);
542+
rc = daos_event_fini(event);
543+
}
544+
delete event;
545+
event = nullptr;
546+
return rc;
547+
}
548+
547549
bool ReadBuffer::CacheHit(const size_t pos) {
548550
return pos >= buffer_offset && (pos < buffer_offset + buffer_size) &&
549551
initialized;

tensorflow_io/core/filesystems/dfs/dfs_utils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ class DFS {
146146
id_handle_t pool;
147147
id_handle_t container;
148148
std::map<std::string, pool_info_t*> pools;
149-
daos_handle_t mEventQueueHandle{};
150149

151150
DFS();
152151

@@ -221,6 +220,8 @@ class ReadBuffer {
221220

222221
int AbortEvent();
223222

223+
int FinalizeEvent();
224+
224225
int ReadAsync(dfs_t* dfs, dfs_obj_t* file, const size_t off);
225226

226227
int ReadSync(dfs_t* dfs, dfs_obj_t* file, const size_t off);

0 commit comments

Comments
 (0)