15
15
#include " ray/gcs/gcs_client/accessor.h"
16
16
17
17
#include < future>
18
+ #include < memory>
19
+ #include < string>
20
+ #include < unordered_map>
21
+ #include < utility>
22
+ #include < vector>
18
23
19
24
#include " ray/common/asio/instrumented_io_context.h"
20
25
#include " ray/common/common_protocol.h"
24
29
namespace ray {
25
30
namespace gcs {
26
31
27
- using namespace ray ::rpc;
28
-
29
32
int64_t GetGcsTimeoutMs () {
30
33
return absl::ToInt64Milliseconds (
31
34
absl::Seconds (RayConfig::instance ().gcs_server_request_timeout_seconds ()));
32
35
}
33
36
34
37
JobInfoAccessor::JobInfoAccessor (GcsClient *client_impl) : client_impl_(client_impl) {}
35
38
36
- Status JobInfoAccessor::AsyncAdd (const std::shared_ptr<JobTableData> &data_ptr,
39
+ Status JobInfoAccessor::AsyncAdd (const std::shared_ptr<rpc:: JobTableData> &data_ptr,
37
40
const StatusCallback &callback) {
38
41
JobID job_id = JobID::FromBinary (data_ptr->job_id ());
39
42
RAY_LOG (DEBUG).WithField (job_id)
@@ -70,7 +73,8 @@ Status JobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
70
73
}
71
74
72
75
Status JobInfoAccessor::AsyncSubscribeAll (
73
- const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
76
+ const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
77
+ const StatusCallback &done) {
74
78
RAY_CHECK (subscribe != nullptr );
75
79
fetch_all_data_operation_ = [this , subscribe](const StatusCallback &done) {
76
80
auto callback = [subscribe, done](const Status &status,
@@ -474,13 +478,13 @@ bool ActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) {
474
478
475
479
NodeInfoAccessor::NodeInfoAccessor (GcsClient *client_impl) : client_impl_(client_impl) {}
476
480
477
- Status NodeInfoAccessor::RegisterSelf (const GcsNodeInfo &local_node_info,
481
+ Status NodeInfoAccessor::RegisterSelf (const rpc:: GcsNodeInfo &local_node_info,
478
482
const StatusCallback &callback) {
479
483
auto node_id = NodeID::FromBinary (local_node_info.node_id ());
480
484
RAY_LOG (DEBUG).WithField (node_id)
481
485
<< " Registering node info, address is = " << local_node_info.node_manager_address ();
482
486
RAY_CHECK (local_node_id_.IsNil ()) << " This node is already connected." ;
483
- RAY_CHECK (local_node_info.state () == GcsNodeInfo::ALIVE);
487
+ RAY_CHECK (local_node_info.state () == rpc:: GcsNodeInfo::ALIVE);
484
488
rpc::RegisterNodeRequest request;
485
489
request.mutable_node_info ()->CopyFrom (local_node_info);
486
490
client_impl_->GetGcsRpcClient ().RegisterNode (
@@ -518,7 +522,7 @@ void NodeInfoAccessor::UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
518
522
[this , node_id, unregister_done_callback](const Status &status,
519
523
rpc::UnregisterNodeReply &&reply) {
520
524
if (status.ok ()) {
521
- local_node_info_.set_state (GcsNodeInfo::DEAD);
525
+ local_node_info_.set_state (rpc:: GcsNodeInfo::DEAD);
522
526
local_node_id_ = NodeID::Nil ();
523
527
}
524
528
RAY_LOG (INFO).WithField (node_id)
@@ -529,7 +533,7 @@ void NodeInfoAccessor::UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
529
533
530
534
const NodeID &NodeInfoAccessor::GetSelfId () const { return local_node_id_; }
531
535
532
- const GcsNodeInfo &NodeInfoAccessor::GetSelfInfo () const { return local_node_info_; }
536
+ const rpc:: GcsNodeInfo &NodeInfoAccessor::GetSelfInfo () const { return local_node_info_; }
533
537
534
538
Status NodeInfoAccessor::AsyncRegister (const rpc::GcsNodeInfo &node_info,
535
539
const StatusCallback &callback) {
@@ -614,7 +618,7 @@ Status NodeInfoAccessor::DrainNodes(const std::vector<NodeID> &node_ids,
614
618
return Status::OK ();
615
619
}
616
620
617
- Status NodeInfoAccessor::AsyncGetAll (const MultiItemCallback<GcsNodeInfo> &callback,
621
+ Status NodeInfoAccessor::AsyncGetAll (const MultiItemCallback<rpc:: GcsNodeInfo> &callback,
618
622
int64_t timeout_ms,
619
623
std::optional<NodeID> node_id) {
620
624
RAY_LOG (DEBUG) << " Getting information of all nodes." ;
@@ -625,7 +629,7 @@ Status NodeInfoAccessor::AsyncGetAll(const MultiItemCallback<GcsNodeInfo> &callb
625
629
client_impl_->GetGcsRpcClient ().GetAllNodeInfo (
626
630
request,
627
631
[callback](const Status &status, rpc::GetAllNodeInfoReply &&reply) {
628
- std::vector<GcsNodeInfo> result;
632
+ std::vector<rpc:: GcsNodeInfo> result;
629
633
result.reserve ((reply.node_info_list_size ()));
630
634
for (int index = 0 ; index < reply.node_info_list_size (); ++index) {
631
635
result.emplace_back (reply.node_info_list (index));
@@ -639,14 +643,15 @@ Status NodeInfoAccessor::AsyncGetAll(const MultiItemCallback<GcsNodeInfo> &callb
639
643
}
640
644
641
645
Status NodeInfoAccessor::AsyncSubscribeToNodeChange (
642
- const SubscribeCallback<NodeID, GcsNodeInfo> &subscribe, const StatusCallback &done) {
646
+ const SubscribeCallback<NodeID, rpc::GcsNodeInfo> &subscribe,
647
+ const StatusCallback &done) {
643
648
RAY_CHECK (subscribe != nullptr );
644
649
RAY_CHECK (node_change_callback_ == nullptr );
645
650
node_change_callback_ = subscribe;
646
651
647
652
fetch_node_data_operation_ = [this ](const StatusCallback &done) {
648
653
auto callback = [this , done](const Status &status,
649
- std::vector<GcsNodeInfo> &&node_info_list) {
654
+ std::vector<rpc:: GcsNodeInfo> &&node_info_list) {
650
655
for (auto &node_info : node_info_list) {
651
656
HandleNotification (std::move (node_info));
652
657
}
@@ -658,7 +663,7 @@ Status NodeInfoAccessor::AsyncSubscribeToNodeChange(
658
663
};
659
664
660
665
subscribe_node_operation_ = [this ](const StatusCallback &done) {
661
- auto on_subscribe = [this ](GcsNodeInfo &&data) {
666
+ auto on_subscribe = [this ](rpc:: GcsNodeInfo &&data) {
662
667
HandleNotification (std::move (data));
663
668
};
664
669
return client_impl_->GetGcsSubscriber ().SubscribeAllNodeInfo (on_subscribe, done);
@@ -669,8 +674,8 @@ Status NodeInfoAccessor::AsyncSubscribeToNodeChange(
669
674
});
670
675
}
671
676
672
- const GcsNodeInfo *NodeInfoAccessor::Get (const NodeID &node_id,
673
- bool filter_dead_nodes) const {
677
+ const rpc:: GcsNodeInfo *NodeInfoAccessor::Get (const NodeID &node_id,
678
+ bool filter_dead_nodes) const {
674
679
RAY_CHECK (!node_id.IsNil ());
675
680
auto entry = node_cache_.find (node_id);
676
681
if (entry != node_cache_.end ()) {
@@ -682,7 +687,7 @@ const GcsNodeInfo *NodeInfoAccessor::Get(const NodeID &node_id,
682
687
return nullptr ;
683
688
}
684
689
685
- const absl::flat_hash_map<NodeID, GcsNodeInfo> &NodeInfoAccessor::GetAll () const {
690
+ const absl::flat_hash_map<NodeID, rpc:: GcsNodeInfo> &NodeInfoAccessor::GetAll () const {
686
691
return node_cache_;
687
692
}
688
693
@@ -725,9 +730,9 @@ bool NodeInfoAccessor::IsRemoved(const NodeID &node_id) const {
725
730
return removed_nodes_.count (node_id) == 1 ;
726
731
}
727
732
728
- void NodeInfoAccessor::HandleNotification (GcsNodeInfo &&node_info) {
733
+ void NodeInfoAccessor::HandleNotification (rpc:: GcsNodeInfo &&node_info) {
729
734
NodeID node_id = NodeID::FromBinary (node_info.node_id ());
730
- bool is_alive = (node_info.state () == GcsNodeInfo::ALIVE);
735
+ bool is_alive = (node_info.state () == rpc:: GcsNodeInfo::ALIVE);
731
736
auto entry = node_cache_.find (node_id);
732
737
bool is_notif_new;
733
738
if (entry == node_cache_.end ()) {
@@ -736,7 +741,7 @@ void NodeInfoAccessor::HandleNotification(GcsNodeInfo &&node_info) {
736
741
} else {
737
742
// If the entry is in the cache, then the notification is new if the node
738
743
// was alive and is now dead or resources have been updated.
739
- bool was_alive = (entry->second .state () == GcsNodeInfo::ALIVE);
744
+ bool was_alive = (entry->second .state () == rpc:: GcsNodeInfo::ALIVE);
740
745
is_notif_new = was_alive && !is_alive;
741
746
742
747
// Once a node with a given ID has been removed, it should never be added
@@ -778,7 +783,7 @@ void NodeInfoAccessor::HandleNotification(GcsNodeInfo &&node_info) {
778
783
}
779
784
if (node_change_callback_) {
780
785
// Copy happens!
781
- GcsNodeInfo cache_data_copied = node_cache_[node_id];
786
+ rpc:: GcsNodeInfo cache_data_copied = node_cache_[node_id];
782
787
node_change_callback_ (node_id, std::move (cache_data_copied));
783
788
}
784
789
}
@@ -1160,7 +1165,7 @@ Status InternalKVAccessor::AsyncInternalKVGet(
1160
1165
client_impl_->GetGcsRpcClient ().InternalKVGet (
1161
1166
req,
1162
1167
[callback](const Status &status, rpc::InternalKVGetReply &&reply) {
1163
- if (reply.status ().code () == ( int ) StatusCode::NotFound) {
1168
+ if (reply.status ().code () == static_cast < int >( StatusCode::NotFound) ) {
1164
1169
callback (status, std::nullopt);
1165
1170
} else {
1166
1171
callback (status, reply.value ());
0 commit comments