Skip to content

Commit 5e73f46

Browse files
authored
Initial commit of gcs modular file system plugin (vnvo2409) (#1203)
* Initial commit of gcs modular file system plugin (vnvo2409) This is the initial commit of gcs modular file system plugin authored by vnvo2409. This is the first step with code moved here. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Namespace changes to avoid collision & conflict Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add wrapper to GCSFileSystem so that it will only lazy-loaded. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Disable scheme check Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Migrate GetTempFileName Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add placeholder APIs Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Disable for now. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Empty commit to trigger GitHub Actions Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Rename the guards for header inclusion based on review feedback. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
1 parent 47a3c25 commit 5e73f46

13 files changed

+2532
-1
lines changed

tensorflow_io/core/plugins/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ cc_library(
3030
linkstatic = True,
3131
deps = [
3232
"//tensorflow_io/core/plugins/az",
33+
"//tensorflow_io/core/plugins/gs",
3334
"//tensorflow_io/core/plugins/hdfs",
3435
"//tensorflow_io/core/plugins/http",
3536
"//tensorflow_io/core/plugins/s3",

tensorflow_io/core/plugins/file_system_plugins.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ limitations under the License.
1818
void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
1919
info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate;
2020
info->plugin_memory_free = tensorflow::io::plugin_memory_free;
21-
info->num_schemes = 6;
21+
info->num_schemes = 7;
2222
info->ops = static_cast<TF_FilesystemPluginOps*>(
2323
tensorflow::io::plugin_memory_allocate(info->num_schemes *
2424
sizeof(info->ops[0])));
@@ -28,4 +28,5 @@ void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
2828
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[3], "hdfse");
2929
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "viewfse");
3030
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "hare");
31+
tensorflow::io::gs::ProvideFilesystemSupportFor(&info->ops[6], "gse");
3132
}

tensorflow_io/core/plugins/file_system_plugins.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);
3232

3333
} // namespace az
3434

35+
namespace gs {
36+
37+
void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);
38+
39+
} // namespace gs
40+
3541
namespace hdfs {
3642

3743
void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);

tensorflow_io/core/plugins/gs/BUILD

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
licenses(["notice"]) # Apache 2.0
2+
3+
package(default_visibility = ["//visibility:public"])
4+
5+
load(
6+
"//:tools/build/tensorflow_io.bzl",
7+
"tf_io_copts",
8+
)
9+
10+
cc_library(
11+
name = "gs",
12+
srcs = [
13+
"cleanup.h",
14+
"expiring_lru_cache.h",
15+
"gcs_env.cc",
16+
"gcs_env.h",
17+
"gcs_filesystem.cc",
18+
"gcs_helper.cc",
19+
"gcs_helper.h",
20+
"ram_file_block_cache.cc",
21+
"ram_file_block_cache.h",
22+
],
23+
copts = tf_io_copts(),
24+
linkstatic = True,
25+
deps = [
26+
"//tensorflow_io/core/plugins:plugins_header",
27+
"@com_github_googleapis_google_cloud_cpp//:storage_client",
28+
"@com_google_absl//absl/base:core_headers",
29+
"@com_google_absl//absl/strings",
30+
"@com_google_absl//absl/synchronization",
31+
"@com_google_absl//absl/types:variant",
32+
],
33+
alwayslink = 1,
34+
)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
==============================================================================*/
15+
16+
// MakeCleanup(f) returns an RAII cleanup object that calls 'f' in its
17+
// destructor. The easiest way to use MakeCleanup is with a lambda argument,
18+
// capturing the return value in an 'auto' local variable. Most users will not
19+
// need more sophisticated syntax than that.
20+
//
21+
// Example:
22+
// void func() {
23+
// FILE* fp = fopen("data.txt", "r");
24+
// if (fp == nullptr) return;
25+
// auto fp_cleaner = gtl::MakeCleanup([fp] { fclose(fp); });
26+
// // No matter what, fclose(fp) will happen.
27+
// DataObject d;
28+
// while (ReadDataObject(fp, &d)) {
29+
// if (d.IsBad()) {
30+
// LOG(ERROR) << "Bad Data";
31+
// return;
32+
// }
33+
// PushGoodData(d);
34+
// }
35+
// }
36+
//
37+
// You can use Cleanup<F> directly, instead of using MakeCleanup and auto,
38+
// but there's rarely a reason to do that.
39+
//
40+
// You can call 'release()' on a Cleanup object to cancel the cleanup.
41+
42+
#ifndef TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_
43+
#define TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_
44+
45+
#include <type_traits>
46+
#include <utility>
47+
48+
namespace tensorflow {
49+
namespace io {
50+
namespace gs {
51+
52+
namespace tf_gcs_filesystem {
53+
54+
// A move-only RAII object that calls a stored cleanup functor when
55+
// destroyed. Cleanup<F> is the return type of gtl::MakeCleanup(F).
56+
template <typename F>
57+
class Cleanup {
58+
public:
59+
Cleanup() : released_(true), f_() {}
60+
61+
template <typename G>
62+
explicit Cleanup(G&& f) // NOLINT
63+
: f_(std::forward<G>(f)) {} // NOLINT(build/c++11)
64+
65+
Cleanup(Cleanup&& src) // NOLINT
66+
: released_(src.is_released()), f_(src.release()) {}
67+
68+
// Implicitly move-constructible from any compatible Cleanup<G>.
69+
// The source will be released as if src.release() were called.
70+
// A moved-from Cleanup can be safely destroyed or reassigned.
71+
template <typename G>
72+
Cleanup(Cleanup<G>&& src) // NOLINT
73+
: released_(src.is_released()), f_(src.release()) {}
74+
75+
// Assignment to a Cleanup object behaves like destroying it
76+
// and making a new one in its place, analogous to unique_ptr
77+
// semantics.
78+
Cleanup& operator=(Cleanup&& src) { // NOLINT
79+
if (!released_) f_();
80+
released_ = src.released_;
81+
f_ = src.release();
82+
return *this;
83+
}
84+
85+
~Cleanup() {
86+
if (!released_) f_();
87+
}
88+
89+
// Releases the cleanup function instead of running it.
90+
// Hint: use c.release()() to run early.
91+
F release() {
92+
released_ = true;
93+
return std::move(f_);
94+
}
95+
96+
bool is_released() const { return released_; }
97+
98+
private:
99+
static_assert(!std::is_reference<F>::value, "F must not be a reference");
100+
101+
bool released_ = false;
102+
F f_;
103+
};
104+
105+
template <int&... ExplicitParameterBarrier, typename F,
106+
typename DecayF = typename std::decay<F>::type>
107+
Cleanup<DecayF> MakeCleanup(F&& f) {
108+
return Cleanup<DecayF>(std::forward<F>(f));
109+
}
110+
111+
} // namespace tf_gcs_filesystem
112+
113+
} // namespace gs
114+
} // namespace io
115+
} // namespace tensorflow
116+
117+
#endif // TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
==============================================================================*/
15+
16+
#ifndef TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_
17+
#define TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_
18+
19+
#include <functional>
20+
#include <list>
21+
#include <map>
22+
#include <memory>
23+
#include <string>
24+
25+
#include "absl/base/thread_annotations.h"
26+
#include "absl/synchronization/mutex.h"
27+
#include "tensorflow/c/tf_status.h"
28+
#include "tensorflow_io/core/plugins/gs/gcs_env.h"
29+
30+
namespace tensorflow {
31+
namespace io {
32+
namespace gs {
33+
34+
namespace tf_gcs_filesystem {
35+
36+
/// \brief An LRU cache of string keys and arbitrary values, with configurable
37+
/// max item age (in seconds) and max entries.
38+
///
39+
/// This class is thread safe.
40+
template <typename T>
41+
class ExpiringLRUCache {
42+
public:
43+
/// A `max_age` of 0 means that nothing is cached. A `max_entries` of 0 means
44+
/// that there is no limit on the number of entries in the cache (however, if
45+
/// `max_age` is also 0, the cache will not be populated).
46+
ExpiringLRUCache(uint64_t max_age, size_t max_entries,
47+
std::function<uint64_t()> timer_seconds = GCSNowSeconds)
48+
: max_age_(max_age),
49+
max_entries_(max_entries),
50+
timer_seconds_(timer_seconds) {}
51+
52+
/// Insert `value` with key `key`. This will replace any previous entry with
53+
/// the same key.
54+
void Insert(const std::string& key, const T& value) {
55+
if (max_age_ == 0) {
56+
return;
57+
}
58+
absl::MutexLock lock(&mu_);
59+
InsertLocked(key, value);
60+
}
61+
62+
// Delete the entry with key `key`. Return true if the entry was found for
63+
// `key`, false if the entry was not found. In both cases, there is no entry
64+
// with key `key` existed after the call.
65+
bool Delete(const std::string& key) {
66+
absl::MutexLock lock(&mu_);
67+
return DeleteLocked(key);
68+
}
69+
70+
/// Look up the entry with key `key` and copy it to `value` if found. Returns
71+
/// true if an entry was found for `key`, and its timestamp is not more than
72+
/// max_age_ seconds in the past.
73+
bool Lookup(const std::string& key, T* value) {
74+
if (max_age_ == 0) {
75+
return false;
76+
}
77+
absl::MutexLock lock(&mu_);
78+
return LookupLocked(key, value);
79+
}
80+
81+
typedef std::function<void(const std::string&, T*, TF_Status*)> ComputeFunc;
82+
83+
/// Look up the entry with key `key` and copy it to `value` if found. If not
84+
/// found, call `compute_func`. If `compute_func` set `status` to `TF_OK`,
85+
/// store a copy of the output parameter in the cache, and another copy in
86+
/// `value`.
87+
void LookupOrCompute(const std::string& key, T* value,
88+
const ComputeFunc& compute_func, TF_Status* status) {
89+
if (max_age_ == 0) {
90+
return compute_func(key, value, status);
91+
}
92+
93+
// Note: we hold onto mu_ for the rest of this function. In practice, this
94+
// is okay, as stat requests are typically fast, and concurrent requests are
95+
// often for the same file. Future work can split this up into one lock per
96+
// key if this proves to be a significant performance bottleneck.
97+
absl::MutexLock lock(&mu_);
98+
if (LookupLocked(key, value)) {
99+
return TF_SetStatus(status, TF_OK, "");
100+
}
101+
compute_func(key, value, status);
102+
if (TF_GetCode(status) == TF_OK) {
103+
InsertLocked(key, *value);
104+
}
105+
}
106+
107+
/// Clear the cache.
108+
void Clear() {
109+
absl::MutexLock lock(&mu_);
110+
cache_.clear();
111+
lru_list_.clear();
112+
}
113+
114+
/// Accessors for cache parameters.
115+
uint64_t max_age() const { return max_age_; }
116+
size_t max_entries() const { return max_entries_; }
117+
118+
private:
119+
struct Entry {
120+
/// The timestamp (seconds) at which the entry was added to the cache.
121+
uint64_t timestamp;
122+
123+
/// The entry's value.
124+
T value;
125+
126+
/// A list iterator pointing to the entry's position in the LRU list.
127+
std::list<std::string>::iterator lru_iterator;
128+
};
129+
130+
bool LookupLocked(const std::string& key, T* value)
131+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
132+
auto it = cache_.find(key);
133+
if (it == cache_.end()) {
134+
return false;
135+
}
136+
lru_list_.erase(it->second.lru_iterator);
137+
if (timer_seconds_() - it->second.timestamp > max_age_) {
138+
cache_.erase(it);
139+
return false;
140+
}
141+
*value = it->second.value;
142+
lru_list_.push_front(it->first);
143+
it->second.lru_iterator = lru_list_.begin();
144+
return true;
145+
}
146+
147+
void InsertLocked(const std::string& key, const T& value)
148+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
149+
lru_list_.push_front(key);
150+
Entry entry{timer_seconds_(), value, lru_list_.begin()};
151+
auto insert = cache_.insert(std::make_pair(key, entry));
152+
if (!insert.second) {
153+
lru_list_.erase(insert.first->second.lru_iterator);
154+
insert.first->second = entry;
155+
} else if (max_entries_ > 0 && cache_.size() > max_entries_) {
156+
cache_.erase(lru_list_.back());
157+
lru_list_.pop_back();
158+
}
159+
}
160+
161+
bool DeleteLocked(const std::string& key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
162+
auto it = cache_.find(key);
163+
if (it == cache_.end()) {
164+
return false;
165+
}
166+
lru_list_.erase(it->second.lru_iterator);
167+
cache_.erase(it);
168+
return true;
169+
}
170+
171+
/// The maximum age of entries in the cache, in seconds. A value of 0 means
172+
/// that no entry is ever placed in the cache.
173+
const uint64_t max_age_;
174+
175+
/// The maximum number of entries in the cache. A value of 0 means there is no
176+
/// limit on entry count.
177+
const size_t max_entries_;
178+
179+
/// The callback to read timestamps.
180+
std::function<uint64_t()> timer_seconds_;
181+
182+
/// Guards access to the cache and the LRU list.
183+
absl::Mutex mu_;
184+
185+
/// The cache (a map from string key to Entry).
186+
std::map<std::string, Entry> cache_ ABSL_GUARDED_BY(mu_);
187+
188+
/// The LRU list of entries. The front of the list identifies the most
189+
/// recently accessed entry.
190+
std::list<std::string> lru_list_ ABSL_GUARDED_BY(mu_);
191+
};
192+
193+
} // namespace tf_gcs_filesystem
194+
195+
} // namespace gs
196+
} // namespace io
197+
} // namespace tensorflow
198+
199+
#endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_

0 commit comments

Comments
 (0)