1
+ // Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License");
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ //
7
+ // http://www.apache.org/licenses/LICENSE-2.0
8
+ //
9
+ // Unless required by applicable law or agreed to in writing, software
10
+ // distributed under the License is distributed on an "AS IS" BASIS,
11
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ // See the License for the specific language governing permissions and
13
+ // limitations under the License.
14
+
15
+ #include " mdsv2/client/br.h"
16
+
17
+ #include < glog/logging.h>
18
+
19
+ #include < cstdint>
20
+ #include < fstream>
21
+ #include < memory>
22
+
23
+ #include " mdsv2/common/codec.h"
24
+ #include " mdsv2/common/helper.h"
25
+ #include " mdsv2/common/logging.h"
26
+ #include " mdsv2/common/tracing.h"
27
+ #include " mdsv2/filesystem/store_operation.h"
28
+ #include " mdsv2/storage/dingodb_storage.h"
29
+
30
+ namespace dingofs {
31
+ namespace mdsv2 {
32
+ namespace br {
33
+
34
+ class FileOutput ;
35
+ using FileOutputUPtr = std::unique_ptr<FileOutput>;
36
+
37
+ class StdOutput ;
38
+ using StdOutputUPtr = std::unique_ptr<StdOutput>;
39
+
40
+ class S3Output ;
41
+ using S3OutputUPtr = std::unique_ptr<S3Output>;
42
+
43
+ // output to a file
44
+ class FileOutput : public Output {
45
+ public:
46
+ explicit FileOutput (const std::string& file_path) : file_path_(file_path) {
47
+ file_stream_.open (file_path_, std::ios::out | std::ios::binary);
48
+ if (!file_stream_.is_open ()) {
49
+ throw std::runtime_error (" failed open file: " + file_path_);
50
+ }
51
+ }
52
+ ~FileOutput () override {
53
+ if (file_stream_.is_open ()) file_stream_.close ();
54
+ }
55
+
56
+ static FileOutputUPtr New (const std::string& file_path) { return std::make_unique<FileOutput>(file_path); }
57
+
58
+ void Append (const std::string& key, const std::string& value) override {
59
+ file_stream_ << key << " \n " << value << " \n " ;
60
+ }
61
+
62
+ void Flush () override { file_stream_.flush (); }
63
+
64
+ private:
65
+ std::string file_path_;
66
+ std::ofstream file_stream_;
67
+ };
68
+
69
+ // output to standard output
70
+ class StdOutput : public Output {
71
+ public:
72
+ StdOutput (bool is_binary = false ) : is_binary_(is_binary) {}
73
+
74
+ static StdOutputUPtr New (bool is_binary = false ) { return std::make_unique<StdOutput>(is_binary); }
75
+
76
+ void Append (const std::string& key, const std::string& value) override {
77
+ if (is_binary_) {
78
+ std::cout << Helper::StringToHex (key) << " : " << Helper::StringToHex (value) << " \n " ;
79
+
80
+ } else {
81
+ auto desc = MetaCodec::ParseKey (key, value);
82
+ std::cout << fmt::format (" key({}) value({})\n " , desc.first , desc.second );
83
+ }
84
+ }
85
+
86
+ void Flush () override { std::cout.flush (); }
87
+
88
+ private:
89
+ bool is_binary_{false };
90
+ };
91
+
92
+ // output to S3
93
+ class S3Output : public Output {
94
+ public:
95
+ S3Output (const std::string& bucket_name, const std::string& object_name)
96
+ : bucket_name_(bucket_name), object_name_(object_name) {
97
+ // Initialize S3 client here
98
+ }
99
+
100
+ static S3OutputUPtr New (const std::string& bucket_name, const std::string& object_name) {
101
+ return std::make_unique<S3Output>(bucket_name, object_name);
102
+ }
103
+
104
+ void Append (const std::string& key, const std::string& value) override {
105
+ // Upload key-value pair to S3
106
+ }
107
+
108
+ void Flush () override {
109
+ // Finalize the upload to S3
110
+ }
111
+
112
+ private:
113
+ std::string bucket_name_;
114
+ std::string object_name_;
115
+ };
116
+
117
+ Backup::~Backup () { Destroy (); }
118
+
119
+ bool Backup::Init (const std::string& coor_addr) {
120
+ CHECK (!coor_addr.empty ()) << " coor addr is empty." ;
121
+
122
+ auto kv_storage = DingodbStorage::New ();
123
+ CHECK (kv_storage != nullptr ) << " new DingodbStorage fail." ;
124
+
125
+ std::string store_addrs = Helper::ParseCoorAddr (coor_addr);
126
+ if (store_addrs.empty ()) {
127
+ return false ;
128
+ }
129
+
130
+ if (!kv_storage->Init (store_addrs)) {
131
+ return false ;
132
+ }
133
+
134
+ operation_processor_ = OperationProcessor::New (kv_storage);
135
+
136
+ return operation_processor_->Init ();
137
+ }
138
+
139
+ void Backup::Destroy () { operation_processor_->Destroy (); }
140
+
141
+ void Backup::BackupMetaTable (const Options& options) {
142
+ OutputUPtr output;
143
+ switch (options.type ) {
144
+ case Output::Type::kFile :
145
+ output = FileOutput::New (options.file_path );
146
+ break ;
147
+ case Output::Type::kStdout :
148
+ output = StdOutput::New (options.is_binary );
149
+ break ;
150
+ case Output::Type::kS3 :
151
+ output = S3Output::New (" bucket_name" , " object_name" );
152
+ break ;
153
+ default :
154
+ throw std::runtime_error (" unsupported output type" );
155
+ }
156
+
157
+ auto status = BackupMetaTable (std::move (output));
158
+ if (!status.ok ()) {
159
+ DINGO_LOG (ERROR) << fmt::format (" backup meta table fail, status({})." , status.error_str ());
160
+ }
161
+ }
162
+
163
+ void Backup::BackupFsMetaTable (const Options& options, uint32_t fs_id) {
164
+ if (fs_id == 0 ) {
165
+ DINGO_LOG (ERROR) << " fs_id is zero, cannot backup fs meta table." ;
166
+ return ;
167
+ }
168
+
169
+ OutputUPtr output;
170
+ switch (options.type ) {
171
+ case Output::Type::kFile :
172
+ output = FileOutput::New (options.file_path );
173
+ break ;
174
+ case Output::Type::kStdout :
175
+ output = StdOutput::New (options.is_binary );
176
+ break ;
177
+ case Output::Type::kS3 :
178
+ output = S3Output::New (options.bucket_name , options.object_name );
179
+ break ;
180
+ default :
181
+ throw std::runtime_error (" unsupported output type" );
182
+ }
183
+
184
+ auto status = BackupFsMetaTable (fs_id, std::move (output));
185
+ if (!status.ok ()) {
186
+ DINGO_LOG (ERROR) << fmt::format (" backup fsmeta table fail, status({})." , status.error_str ());
187
+ }
188
+ }
189
+
190
+ Status Backup::BackupMetaTable (OutputUPtr output) {
191
+ CHECK (output != nullptr ) << " output is nullptr." ;
192
+
193
+ uint64_t total_count = 0 , lock_count = 0 , auto_increment_id_count = 0 ;
194
+ uint64_t mds_heartbeat_count = 0 , client_heartbeat_count = 0 , fs_count = 0 , fs_quota_count = 0 ;
195
+
196
+ Trace trace;
197
+ ScanMetaTableOperation operation (trace, [&](const std::string& key, const std::string& value) -> bool {
198
+ output->Append (key, value);
199
+
200
+ if (MetaCodec::IsLockKey (key)) {
201
+ ++lock_count;
202
+ } else if (MetaCodec::IsAutoIncrementIDKey (key)) {
203
+ ++auto_increment_id_count;
204
+ } else if (MetaCodec::IsMdsHeartbeatKey (key)) {
205
+ ++mds_heartbeat_count;
206
+ } else if (MetaCodec::IsClientHeartbeatKey (key)) {
207
+ ++client_heartbeat_count;
208
+ } else if (MetaCodec::IsFsKey (key)) {
209
+ ++fs_count;
210
+ } else if (MetaCodec::IsFsQuotaKey (key)) {
211
+ ++fs_quota_count;
212
+ }
213
+
214
+ ++total_count;
215
+
216
+ return true ;
217
+ });
218
+
219
+ auto status = operation_processor_->RunAlone (&operation);
220
+
221
+ std::cout << fmt::format (
222
+ " backup meta table done, total_count({}) lock_count({}) auto_increment_id_count({}) mds_heartbeat_count({}) "
223
+ " client_heartbeat_count({}) fs_count({}) fs_quota_count({})." ,
224
+ total_count, lock_count, auto_increment_id_count, mds_heartbeat_count, client_heartbeat_count, fs_count,
225
+ fs_quota_count);
226
+
227
+ return status;
228
+ }
229
+
230
+ Status Backup::BackupFsMetaTable (uint32_t fs_id, OutputUPtr output) {
231
+ CHECK (output != nullptr ) << " output is nullptr." ;
232
+
233
+ uint64_t total_count = 0 , dir_quota_count = 0 , inode_count = 0 ;
234
+ uint64_t file_session_count = 0 , del_slice_count = 0 , del_file_count = 0 ;
235
+
236
+ Trace trace;
237
+ ScanFsMetaTableOperation operation (trace, fs_id, [&](const std::string& key, const std::string& value) -> bool {
238
+ output->Append (key, value);
239
+
240
+ if (MetaCodec::IsDirQuotaKey (key)) {
241
+ ++dir_quota_count;
242
+ } else if (MetaCodec::IsInodeKey (key)) {
243
+ ++inode_count;
244
+ } else if (MetaCodec::IsFileSessionKey (key)) {
245
+ ++file_session_count;
246
+ } else if (MetaCodec::IsDelSliceKey (key)) {
247
+ ++del_slice_count;
248
+ } else if (MetaCodec::IsDelFileKey (key)) {
249
+ ++del_file_count;
250
+ }
251
+
252
+ ++total_count;
253
+
254
+ return true ;
255
+ });
256
+
257
+ auto status = operation_processor_->RunAlone (&operation);
258
+
259
+ std::cout << fmt::format (
260
+ " backup fsmeta table done, total_count({}) dir_quota_count({}) inode_count({}) file_session_count({}) "
261
+ " del_slice_count({}) del_file_count({})." ,
262
+ total_count, dir_quota_count, inode_count, file_session_count, del_slice_count, del_file_count);
263
+
264
+ return status;
265
+ }
266
+
267
+ bool Restore::Init (const std::string& coor_addr) {
268
+ CHECK (!coor_addr.empty ()) << " coor addr is empty." ;
269
+
270
+ auto kv_storage = DingodbStorage::New ();
271
+ CHECK (kv_storage != nullptr ) << " new DingodbStorage fail." ;
272
+
273
+ std::string store_addrs = Helper::ParseCoorAddr (coor_addr);
274
+ if (store_addrs.empty ()) {
275
+ return false ;
276
+ }
277
+
278
+ if (!kv_storage->Init (store_addrs)) {
279
+ return false ;
280
+ }
281
+
282
+ operation_processor_ = OperationProcessor::New (kv_storage);
283
+
284
+ return operation_processor_->Init ();
285
+ }
286
+
287
+ void Restore::Destroy () {
288
+ if (operation_processor_) {
289
+ operation_processor_->Destroy ();
290
+ }
291
+ }
292
+
293
+ void Restore::RestoreMetaTable (const Options& options) {}
294
+
295
+ void Restore::RestoreFsMetaTable (const Options& options, uint32_t fs_id) {}
296
+
297
+ Status Restore::RestoreMetaTable (InputUPtr input) { return Status::OK (); }
298
+
299
+ Status Restore::RestoreFsMetaTable (uint32_t fs_id, InputUPtr input) { return Status::OK (); }
300
+
301
+ bool BackupCommandRunner::Run (const Options& options, const std::string& coor_addr, const std::string& cmd) {
302
+ using Helper = dingofs::mdsv2::Helper;
303
+
304
+ if (cmd != " backup" ) return false ;
305
+
306
+ if (coor_addr.empty ()) {
307
+ std::cout << " coordinator address is empty." << ' \n ' ;
308
+ return true ;
309
+ }
310
+
311
+ dingofs::mdsv2::br::Backup backup;
312
+ if (!backup.Init (coor_addr)) {
313
+ std::cout << " init backup fail." << ' \n ' ;
314
+ return true ;
315
+ }
316
+
317
+ dingofs::mdsv2::br::Backup::Options inner_options;
318
+ inner_options.type = dingofs::mdsv2::br::Output::Type::kStdout ;
319
+ if (options.output_type == " file" ) {
320
+ inner_options.type = dingofs::mdsv2::br::Output::Type::kFile ;
321
+ inner_options.file_path = options.file_path ;
322
+ } else if (options.output_type == " s3" ) {
323
+ inner_options.type = dingofs::mdsv2::br::Output::Type::kS3 ;
324
+ }
325
+
326
+ if (options.type == Helper::ToLowerCase (" meta" )) {
327
+ backup.BackupMetaTable (inner_options);
328
+
329
+ } else if (options.type == Helper::ToLowerCase (" fsmeta" )) {
330
+ backup.BackupFsMetaTable (inner_options, options.fs_id );
331
+
332
+ } else {
333
+ std::cout << " unknown type: " << options.type << ' \n ' ;
334
+ }
335
+
336
+ return true ;
337
+ }
338
+
339
+ bool RestoreCommandRunner::Run (const Options& options, const std::string& coor_addr, // NOLINT
340
+ const std::string& cmd) { // NOLINT
341
+ if (cmd != " restore" ) return false ;
342
+
343
+ return true ;
344
+ }
345
+
346
+ } // namespace br
347
+ } // namespace mdsv2
348
+ } // namespace dingofs
0 commit comments