14
14
15
15
#include " mdsv2/filesystem/file_session.h"
16
16
17
- #include < fmt/format.h>
18
- #include < glog/logging.h>
19
-
20
17
#include < cstdint>
21
18
#include < string>
22
19
#include < utility>
23
20
24
21
#include " dingofs/error.pb.h"
25
- #include " mdsv2/common/codec .h"
22
+ #include " fmt/format .h"
26
23
#include " mdsv2/common/logging.h"
27
24
#include " mdsv2/common/status.h"
28
- #include " mdsv2/storage/storage.h"
25
+ #include " mdsv2/common/type.h"
26
+ #include " mdsv2/filesystem/store_operation.h"
29
27
#include " utils/concurrent/concurrent.h"
28
+ #include " utils/uuid.h"
30
29
31
30
namespace dingofs {
32
31
namespace mdsv2 {
@@ -39,26 +38,20 @@ static const std::string kFileSessionCacheCountMetricsName = "dingofs_file_sessi
39
38
static const std::string kFileSessionTatalCountMetricsName = " dingofs_file_session_total_count" ;
40
39
static const std::string kFileSessionCountMetricsName = " dingofs_file_session_count" ;
41
40
42
- std::string FileSession::EncodeKey ( uint32_t fs_id, uint64_t ino, const std::string& session_id ) {
43
- return MetaCodec::EncodeFileSessionKey (fs_id, ino, session_id );
44
- }
45
-
46
- std::string FileSession::EncodeKey () const { return MetaCodec::EncodeFileSessionKey (fs_id_, ino_, session_id_); }
41
+ static FileSessionPtr NewFileSession ( uint64_t ino, const std::string& client_id ) {
42
+ auto file_session = std::make_shared<FileSessionEntry>( );
43
+ file_session-> set_session_id ( utils::UUIDGenerator::GenerateUUID ());
44
+ file_session-> set_ino (ino);
45
+ file_session-> set_client_id (client_id);
47
46
48
- std::string FileSession::EncodeValue () const {
49
- pb::mdsv2::FileSession file_session;
50
- file_session.set_session_id (session_id_);
51
- file_session.set_ino (ino_);
52
- file_session.set_client_id (client_id_);
53
-
54
- return MetaCodec::EncodeFileSessionValue (file_session);
47
+ return file_session;
55
48
}
56
49
57
50
FileSessionCache::FileSessionCache () : count_metrics_(kFileSessionCacheCountMetricsName ) {}
58
51
59
52
bool FileSessionCache::Put (FileSessionPtr file_session) {
60
53
utils::WriteLockGuard guard (lock_);
61
- auto key = Key{.ino = file_session->Ino (), .session_id = file_session->SessionId ()};
54
+ auto key = Key{.ino = file_session->ino (), .session_id = file_session->session_id ()};
62
55
auto it = file_session_map_.find (key);
63
56
if (it != file_session_map_.end ()) {
64
57
return false ;
@@ -74,7 +67,7 @@ bool FileSessionCache::Put(FileSessionPtr file_session) {
74
67
void FileSessionCache::Upsert (FileSessionPtr file_session) {
75
68
utils::WriteLockGuard guard (lock_);
76
69
77
- auto key = Key{.ino = file_session->Ino (), .session_id = file_session->SessionId ()};
70
+ auto key = Key{.ino = file_session->ino (), .session_id = file_session->session_id ()};
78
71
79
72
auto it = file_session_map_.find (key);
80
73
if (it == file_session_map_.end ()) {
@@ -150,21 +143,14 @@ bool FileSessionCache::IsExist(uint64_t ino, const std::string& session_id) {
150
143
return file_session_map_.find (key) != file_session_map_.end ();
151
144
}
152
145
153
- FileSessionManager::FileSessionManager (uint32_t fs_id, KVStorageSPtr kv_storage )
146
+ FileSessionManager::FileSessionManager (uint32_t fs_id, OperationProcessorSPtr operation_processor )
154
147
: fs_id_(fs_id),
155
- kv_storage_ (kv_storage ),
148
+ operation_processor_ (operation_processor ),
156
149
total_count_metrics_(kFileSessionTatalCountMetricsName ),
157
150
count_metrics_(kFileSessionCountMetricsName ) {}
158
151
159
152
Status FileSessionManager::Create (uint64_t ino, const std::string& client_id, FileSessionPtr& file_session) {
160
- file_session = FileSession::New (fs_id_, ino, client_id);
161
-
162
- KVStorage::WriteOption write_option;
163
- auto status = kv_storage_->Put (write_option, file_session->EncodeKey (), file_session->EncodeValue ());
164
- if (!status.ok ()) {
165
- DINGO_LOG (ERROR) << fmt::format (" [filesession] create fail, {}/{} {}" , ino, client_id, status.error_str ());
166
- return status;
167
- }
153
+ file_session = NewFileSession (ino, client_id);
168
154
169
155
// add to cache
170
156
CHECK (file_session_cache_.Put (file_session))
@@ -230,12 +216,6 @@ Status FileSessionManager::IsExist(uint64_t ino, bool just_cache, bool& is_exist
230
216
}
231
217
232
218
Status FileSessionManager::Delete (uint64_t ino, const std::string& session_id) {
233
- auto status = kv_storage_->Delete (FileSession::EncodeKey (fs_id_, ino, session_id));
234
- if (!status.ok ()) {
235
- DINGO_LOG (ERROR) << fmt::format (" [filesession] delete fail, {}/{} {}" , ino, session_id, status.error_str ());
236
- return status;
237
- }
238
-
239
219
// delete cache
240
220
file_session_cache_.Delete (ino, session_id);
241
221
@@ -245,95 +225,64 @@ Status FileSessionManager::Delete(uint64_t ino, const std::string& session_id) {
245
225
}
246
226
247
227
Status FileSessionManager::Delete (uint64_t ino) {
248
- std::vector<FileSessionPtr> file_sessions;
249
- auto status = GetFileSessionsFromStore (ino, file_sessions);
250
- if (!status.ok ()) {
251
- return status;
252
- }
253
-
254
- if (file_sessions.empty ()) {
255
- return Status::OK ();
256
- }
257
-
258
- int retry = 0 ;
259
- do {
260
- auto txn = kv_storage_->NewTxn ();
261
-
262
- for (auto & file_session : file_sessions) {
263
- txn->Delete (file_session->EncodeKey ());
264
- }
265
-
266
- status = txn->Commit ();
267
- if (status.error_code () != pb::error::ESTORE_MAYBE_RETRY) {
268
- break ;
269
- }
270
-
271
- ++retry;
272
- } while (retry < FLAGS_txn_max_retry_times);
228
+ // delete cache
229
+ file_session_cache_.Delete (ino);
273
230
274
- if (status.ok ()) {
275
- // delete cache
276
- file_session_cache_.Delete (ino);
231
+ // count_metrics_ << (0 - static_cast<int64_t>(file_sessions.size()));
277
232
278
- count_metrics_ << (0 - static_cast <int64_t >(file_sessions.size ()));
279
- }
280
-
281
- return status;
233
+ return Status::OK ();
282
234
}
283
235
284
236
Status FileSessionManager::GetFileSessionsFromStore (uint64_t ino, std::vector<FileSessionPtr>& file_sessions) {
285
- Range range ;
286
- MetaCodec::GetFileSessionRange (fs_id_, ino, range. start_key , range. end_key );
237
+ Trace trace ;
238
+ ScanFileSessionOperation operation (trace, fs_id_, ino );
287
239
288
- auto txn = kv_storage_->NewTxn ();
289
- std::vector<KeyValue> kvs;
290
- do {
291
- kvs.clear ();
292
-
293
- auto status = txn->Scan (range, FLAGS_fs_scan_batch_size, kvs);
294
- if (!status.ok ()) {
295
- DINGO_LOG (ERROR) << fmt::format (" [filesession] scan fail, {} {}" , ino, status.error_str ());
296
- break ;
297
- }
240
+ auto status = operation_processor_->RunAlone (&operation);
241
+ if (!status.ok ()) {
242
+ DINGO_LOG (ERROR) << fmt::format (" [filesession] scan file session fail, status({})." , status.error_str ());
243
+ return status;
244
+ }
298
245
299
- for (auto & kv : kvs) {
300
- file_sessions.push_back (FileSession::New (fs_id_, MetaCodec::DecodeFileSessionValue (kv.value )));
301
- }
246
+ auto & result = operation.GetResult ();
302
247
303
- } while (kvs.size () >= FLAGS_fs_scan_batch_size);
248
+ for (const auto & file_session : result.file_sessions ) {
249
+ file_sessions.push_back (std::make_shared<FileSessionEntry>(file_session));
250
+ }
304
251
305
252
return Status::OK ();
306
253
}
307
254
308
255
Status FileSessionManager::GetFileSessionFromStore (uint64_t ino, const std::string& session_id,
309
256
FileSessionPtr& file_session) {
310
- std::string key = FileSession::EncodeKey (fs_id_, ino, session_id);
257
+ Trace trace;
258
+ GetFileSessionOperation operation (trace, fs_id_, ino, session_id);
311
259
312
- std::string value;
313
- auto status = kv_storage_->Get (key, value);
260
+ auto status = operation_processor_->RunAlone (&operation);
314
261
if (!status.ok ()) {
315
- DINGO_LOG (ERROR) << fmt::format (" [filesession] get fail, {}/{} {} " , ino, session_id , status.error_str ());
262
+ DINGO_LOG (ERROR) << fmt::format (" [filesession] get file session fail, status({}). " , status.error_str ());
316
263
return status;
317
264
}
318
265
319
- file_session = FileSession::New (fs_id_, MetaCodec::DecodeFileSessionValue (value));
266
+ auto & result = operation.GetResult ();
267
+
268
+ file_session = std::make_shared<FileSessionEntry>(result.file_session );
320
269
321
270
return Status::OK ();
322
271
}
323
272
324
273
Status FileSessionManager::IsExistFromStore (uint64_t ino, bool & is_exist) {
325
- Range range ;
326
- MetaCodec::GetFileSessionRange (fs_id_, ino, range. start_key , range. end_key );
274
+ Trace trace ;
275
+ ScanFileSessionOperation operation (trace, fs_id_, ino );
327
276
328
- std::vector<KeyValue> kvs;
329
- auto txn = kv_storage_->NewTxn ();
330
- auto status = txn->Scan (range, 1 , kvs);
277
+ auto status = operation_processor_->RunAlone (&operation);
331
278
if (!status.ok ()) {
332
- DINGO_LOG (ERROR) << fmt::format (" [filesession] scan fail, {} {} " , ino , status.error_str ());
279
+ DINGO_LOG (ERROR) << fmt::format (" [filesession] scan file session fail, status({}). " , status.error_str ());
333
280
return status;
334
281
}
335
282
336
- is_exist = !kvs.empty ();
283
+ auto & result = operation.GetResult ();
284
+
285
+ is_exist = !result.file_sessions .empty ();
337
286
338
287
return Status::OK ();
339
288
}
0 commit comments