Skip to content

Commit b0a7b7d

Browse files
author
Omar Marzouk
committed
Adjustments to Reading, Single Event Queue Handle, Paths Caching, and FileSize Caching
2 parents b0d5ad2 + 1af1181 commit b0a7b7d

File tree

3 files changed

+279
-128
lines changed

3 files changed

+279
-128
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 101 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,76 @@ typedef struct DFSRandomAccessFile {
1717
dfs_obj_t* daos_file;
1818
std::vector<ReadBuffer> buffers;
1919
daos_size_t file_size;
20-
daos_handle_t mEventQueueHandle{};
21-
22-
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
23-
: dfs_path(std::move(dfs_path)) {
20+
bool caching;
21+
size_t buff_size;
22+
size_t num_of_buffers;
23+
DFSRandomAccessFile(std::string aDfs_path, dfs_t* file_system, dfs_obj_t* obj,
24+
daos_handle_t eq_handle)
25+
: dfs_path(std::move(aDfs_path)) {
2426
daos_fs = file_system;
2527
daos_file = obj;
26-
dfs_get_size(daos_fs, obj, &file_size);
27-
size_t num_of_buffers;
28-
size_t buff_size;
29-
int rc = daos_eq_create(&mEventQueueHandle);
30-
assert(rc == 0);
31-
32-
if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
33-
num_of_buffers = atoi(env_num_of_buffers);
28+
if (DFS::size_map.count(aDfs_path) == 0) {
29+
dfs_get_size(daos_fs, obj, &file_size);
30+
DFS::size_map[aDfs_path] = file_size;
3431
} else {
35-
num_of_buffers = NUM_OF_BUFFERS;
32+
file_size = DFS::size_map[aDfs_path];
3633
}
37-
38-
if (char* env_buff_size = std::getenv("TF_IO_DAOS_BUFFER_SIZE")) {
39-
buff_size = GetStorageSize(env_buff_size);
34+
if (char* env_caching = std::getenv("TF_IO_DAOS_CACHING")) {
35+
caching = atoi(env_caching) > 0;
4036
} else {
41-
buff_size = BUFF_SIZE;
37+
caching = false;
38+
}
39+
40+
if (caching) {
41+
if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
42+
num_of_buffers = atoi(env_num_of_buffers);
43+
} else {
44+
num_of_buffers = NUM_OF_BUFFERS;
45+
}
46+
47+
if (char* env_buff_size = std::getenv("TF_IO_DAOS_BUFFER_SIZE")) {
48+
buff_size = GetStorageSize(env_buff_size);
49+
} else {
50+
buff_size = BUFF_SIZE;
51+
}
52+
for (size_t i = 0; i < num_of_buffers; i++) {
53+
buffers.push_back(ReadBuffer(i, eq_handle, buff_size));
54+
}
55+
}
56+
}
57+
58+
int64_t ReadNoCache(uint64_t offset, size_t n, char* buffer,
59+
TF_Status* status) {
60+
int rc;
61+
d_sg_list_t rsgl;
62+
d_iov_t iov;
63+
d_iov_set(&iov, (void*)buffer, n);
64+
rsgl.sg_nr = 1;
65+
rsgl.sg_iovs = &iov;
66+
67+
daos_size_t read_size;
68+
69+
rc = dfs_read(daos_fs, daos_file, &rsgl, offset, &read_size, NULL);
70+
if (rc) {
71+
TF_SetStatus(status, TF_INTERNAL, "");
72+
return read_size;
4273
}
43-
for (size_t i = 0; i < num_of_buffers; i++) {
44-
buffers.push_back(ReadBuffer(i, mEventQueueHandle, buff_size));
74+
75+
if (read_size != n) {
76+
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
77+
return read_size;
4578
}
79+
80+
TF_SetStatus(status, TF_OK, "");
81+
return read_size;
4682
}
4783
} DFSRandomAccessFile;
4884

4985
void Cleanup(TF_RandomAccessFile* file) {
86+
int rc = 0;
5087
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
5188
dfs_file->buffers.clear();
5289

53-
int rc = daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
54-
assert(rc == 0);
5590
rc = dfs_release(dfs_file->daos_file);
5691
assert(rc == 0);
5792
dfs_file->daos_fs = nullptr;
@@ -66,6 +101,10 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
66101
return -1;
67102
}
68103

104+
if (!dfs_file->caching) {
105+
return dfs_file->ReadNoCache(offset, n, ret, status);
106+
}
107+
69108
size_t ret_offset = 0;
70109
size_t curr_offset = offset;
71110
int64_t total_bytes = 0;
@@ -96,8 +135,8 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
96135
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
97136
if (async_offset > dfs_file->file_size) break;
98137
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file,
99-
async_offset);
100-
async_offset += BUFF_SIZE;
138+
async_offset, dfs_file->file_size);
139+
async_offset += dfs_file->buff_size;
101140
}
102141
}
103142

@@ -240,6 +279,7 @@ void NewFile(const TF_Filesystem* filesystem, const char* path, File_Mode mode,
240279
}
241280
std::string pool, cont, file_path;
242281
rc = daos->Setup(path, pool, cont, file_path, status);
282+
243283
if (rc) return;
244284
daos->dfsNewFile(file_path, mode, flags, obj, status);
245285
}
@@ -269,10 +309,18 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
269309
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
270310
return;
271311
}
272-
auto random_access_file =
273-
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
274-
random_access_file->buffers[0].ReadAsync(daos->daos_fs,
275-
random_access_file->daos_file, 0);
312+
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(
313+
path, daos->daos_fs, obj, daos->mEventQueueHandle);
314+
if (random_access_file->caching) {
315+
size_t async_offset = 0;
316+
for (size_t i = 0; i < random_access_file->num_of_buffers; i++) {
317+
if (async_offset > random_access_file->file_size) break;
318+
random_access_file->buffers[i].ReadAsync(
319+
daos->daos_fs, random_access_file->daos_file, async_offset,
320+
random_access_file->file_size);
321+
async_offset += random_access_file->buff_size;
322+
}
323+
}
276324
file->plugin_file = random_access_file;
277325
TF_SetStatus(status, TF_OK, "");
278326
}
@@ -304,12 +352,15 @@ void PathExists(const TF_Filesystem* filesystem, const char* path,
304352
rc = daos->Setup(path, pool, cont, file, status);
305353
if (rc) return;
306354
dfs_obj_t* obj;
355+
307356
rc = daos->dfsPathExists(file, &obj);
308357
if (rc) {
309358
TF_SetStatus(status, TF_NOT_FOUND, "");
310359
} else {
311360
TF_SetStatus(status, TF_OK, "");
312361
}
362+
363+
dfs_release(obj);
313364
}
314365

315366
void CreateDir(const TF_Filesystem* filesystem, const char* path,
@@ -371,7 +422,6 @@ void DeleteFileSystemEntry(const TF_Filesystem* filesystem, const char* path,
371422
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
372423
return;
373424
}
374-
375425
daos->dfsDeleteObject(dir_path, is_dir, recursive, status);
376426
}
377427

@@ -417,15 +467,13 @@ bool IsDir(const TF_Filesystem* filesystem, const char* path,
417467
}
418468

419469
dfs_obj_t* obj;
420-
rc = daos->dfsPathExists(file, &obj, 0);
470+
rc = daos->dfsPathExists(file, &obj, true);
421471
if (rc) {
422472
TF_SetStatus(status, TF_NOT_FOUND, "");
423473
} else {
424474
is_dir = S_ISDIR(obj->mode);
425475
}
426476

427-
dfs_release(obj);
428-
429477
if (is_dir) {
430478
TF_SetStatus(status, TF_OK, "");
431479
} else {
@@ -446,9 +494,11 @@ int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
446494
std::string pool, cont, file;
447495
rc = daos->Setup(path, pool, cont, file, status);
448496
if (rc) return -1;
449-
497+
if (DFS::size_map.count(path) != 0) {
498+
return DFS::size_map[path];
499+
}
450500
dfs_obj_t* obj;
451-
rc = daos->dfsPathExists(file, &obj, 0);
501+
rc = daos->dfsPathExists(file, &obj, false);
452502
if (rc) {
453503
TF_SetStatus(status, TF_NOT_FOUND, "");
454504
return -1;
@@ -460,6 +510,8 @@ int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
460510
TF_SetStatus(status, TF_OK, "");
461511
daos_size_t size;
462512
dfs_get_size(daos->daos_fs, obj, &size);
513+
DFS::size_map[path] = size;
514+
463515
dfs_release(obj);
464516
return size;
465517
}
@@ -482,14 +534,14 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
482534
}
483535
int allow_cont_creation = 1;
484536
std::string pool_src, cont_src, file_src;
485-
rc = ParseDFSPath(src, pool_src, cont_src, file_src);
537+
rc = daos->ParseDFSPath(src, pool_src, cont_src, file_src);
486538
if (rc) {
487539
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
488540
return;
489541
}
490542

491543
std::string pool_dst, cont_dst, file_dst;
492-
rc = ParseDFSPath(dst, pool_dst, cont_dst, file_dst);
544+
rc = daos->ParseDFSPath(dst, pool_dst, cont_dst, file_dst);
493545
if (rc) {
494546
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
495547
return;
@@ -515,23 +567,22 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
515567
file_dst = "/" + file_dst;
516568

517569
dfs_obj_t* temp_obj;
518-
rc = daos->dfsPathExists(file_src, &temp_obj, 0);
570+
rc = daos->dfsPathExists(file_src, &temp_obj, false);
519571
if (rc) {
520572
TF_SetStatus(status, TF_NOT_FOUND, "");
521573
return;
522574
} else {
523575
if (S_ISDIR(temp_obj->mode)) {
524576
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
525-
dfs_release(temp_obj);
526577
return;
527578
}
528579
}
529580

530581
dfs_release(temp_obj);
531-
rc = daos->dfsPathExists(file_dst, &temp_obj, 0);
582+
583+
rc = daos->dfsPathExists(file_dst, &temp_obj, false);
532584
if (!rc && S_ISDIR(temp_obj->mode)) {
533585
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
534-
dfs_release(temp_obj);
535586
return;
536587
}
537588

@@ -543,7 +594,6 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
543594
rc = daos->dfsFindParent(file_src, &parent_src);
544595
if (rc) {
545596
TF_SetStatus(status, TF_NOT_FOUND, "");
546-
dfs_release(parent_src);
547597
return;
548598
}
549599

@@ -553,13 +603,11 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
553603
rc = daos->dfsFindParent(file_dst, &parent_dst);
554604
if (rc) {
555605
TF_SetStatus(status, TF_NOT_FOUND, "");
556-
dfs_release(parent_dst);
557606
return;
558607
}
559608

560609
if (!S_ISDIR(parent_dst->mode)) {
561610
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
562-
dfs_release(parent_dst);
563611
return;
564612
}
565613

@@ -571,8 +619,6 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
571619
rc = dfs_move(daos->daos_fs, parent_src, name, parent_dst, new_name, NULL);
572620
free(name);
573621
free(new_name);
574-
dfs_release(parent_src);
575-
dfs_release(parent_dst);
576622
if (rc) {
577623
TF_SetStatus(status, TF_INTERNAL, "");
578624
return;
@@ -594,7 +640,8 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
594640
if (rc) return;
595641

596642
dfs_obj_t* obj;
597-
rc = daos->dfsPathExists(dir_path, &obj, 0);
643+
644+
rc = daos->dfsPathExists(dir_path, &obj);
598645
if (rc) {
599646
TF_SetStatus(status, TF_NOT_FOUND, "");
600647
return;
@@ -606,7 +653,13 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
606653
} else {
607654
stats->is_directory = false;
608655
daos_size_t size;
609-
dfs_get_size(daos->daos_fs, obj, &size);
656+
if (DFS::size_map.count(path) == 0) {
657+
dfs_get_size(daos->daos_fs, obj, &size);
658+
DFS::size_map[path] = size;
659+
} else {
660+
size = DFS::size_map[path];
661+
}
662+
610663
stats->length = size;
611664
}
612665

@@ -617,7 +670,6 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
617670
stats->mtime_nsec = static_cast<int64_t>(stbuf.st_mtime) * 1e9;
618671

619672
dfs_release(obj);
620-
621673
TF_SetStatus(status, TF_OK, "");
622674
}
623675

@@ -634,29 +686,24 @@ int GetChildren(const TF_Filesystem* filesystem, const char* path,
634686
if (rc) return -1;
635687

636688
dfs_obj_t* obj;
637-
rc = daos->dfsPathExists(dir_path, &obj, 0);
689+
rc = daos->dfsPathExists(dir_path, &obj, true);
638690
if (rc) {
639691
TF_SetStatus(status, TF_NOT_FOUND, "");
640-
dfs_release(obj);
641692
return -1;
642693
}
643694

644695
if (!S_ISDIR(obj->mode)) {
645696
TF_SetStatus(status, TF_FAILED_PRECONDITION, "");
646-
dfs_release(obj);
647697
return -1;
648698
}
649699

650700
std::vector<std::string> children;
651701
rc = daos->dfsReadDir(obj, children);
652702
if (rc) {
653703
TF_SetStatus(status, TF_INTERNAL, "");
654-
dfs_release(obj);
655704
return -1;
656705
}
657706

658-
dfs_release(obj);
659-
660707
uint32_t nr = children.size();
661708

662709
CopyEntries(entries, children);
@@ -675,6 +722,7 @@ void FlushCaches(const TF_Filesystem* filesystem) {
675722
return;
676723
}
677724
daos->ClearConnections();
725+
daos->path_map.clear();
678726
}
679727

680728
} // namespace tf_dfs_filesystem

0 commit comments

Comments
 (0)