Skip to content

Commit 7d428c6

Browse files
MengjinYanedoakes
andauthored
[Core] Add Warning Log for the Infeasible Task Early Termination Feature (#51364)
This PR adds logs for the notification of the infeasible task early termination feature. Whenever there are more infeasible requests than what exists in autoscaling state, we will print the warning describe the infeasible task behavior and suggest to edit the cluster config or enable infeasible task early termination feature. Th log is tested manually and the following shows up in `gcs.out`: ``` [2025-03-13 23:06:14,518 W 42352 3849655] (gcs_server) gcs_autoscaler_state_manager.cc:72: There are tasks with infeasible resource requests and won't be scheduled. See https://docs.ray.io/en/latest/ray-core/scheduling/index.html#ray-scheduling-resources for more details. Please consider taking the following actions to solve the issue: 1. Updating the ray cluster to include nodes with all required resources in order to let the tasks be scheduled. 2. To prevent the tasks from hanging, you can consider enabling the infeasible task early exit feature by setting the 'enable_infeasible_task_early_exit' system config to 'true'.In a future release of Ray, we are planning to enable infeasible task early exit by default. ``` --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent d7601c1 commit 7d428c6

File tree

5 files changed

+55
-15
lines changed

5 files changed

+55
-15
lines changed

python/ray/_private/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2239,7 +2239,7 @@ def listen_error_messages(worker, threads_stopped):
22392239
_, error_data = worker.gcs_error_subscriber.poll()
22402240
if error_data is None:
22412241
continue
2242-
if error_data["job_id"] not in [
2242+
if error_data["job_id"] is not None and error_data["job_id"] not in [
22432243
worker.current_job_id.binary(),
22442244
JobID.nil().binary(),
22452245
]:

src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,16 @@ GcsAutoscalerStateManager::GcsAutoscalerStateManager(
3434
const GcsPlacementGroupManager &gcs_placement_group_manager,
3535
rpc::NodeManagerClientPool &raylet_client_pool,
3636
InternalKVInterface &kv,
37-
instrumented_io_context &io_context)
37+
instrumented_io_context &io_context,
38+
GcsPublisher *gcs_publisher)
3839
: session_name_(std::move(session_name)),
3940
gcs_node_manager_(gcs_node_manager),
4041
gcs_actor_manager_(gcs_actor_manager),
4142
gcs_placement_group_manager_(gcs_placement_group_manager),
4243
raylet_client_pool_(raylet_client_pool),
4344
kv_(kv),
44-
io_context_(io_context) {}
45+
io_context_(io_context),
46+
gcs_publisher_(gcs_publisher) {}
4547

4648
void GcsAutoscalerStateManager::HandleGetClusterResourceState(
4749
rpc::autoscaler::GetClusterResourceStateRequest request,
@@ -65,23 +67,50 @@ void GcsAutoscalerStateManager::HandleReportAutoscalingState(
6567
rpc::SendReplyCallback send_reply_callback) {
6668
RAY_CHECK(thread_checker_.IsOnSameThread());
6769

68-
// Never seen any autoscaling state before - so just takes this.
69-
if (!autoscaling_state_.has_value()) {
70-
autoscaling_state_ = *std::move(request.mutable_autoscaling_state());
71-
send_reply_callback(ray::Status::OK(), nullptr, nullptr);
72-
return;
73-
}
74-
75-
// Cancel the infeasible requests if the feature is enabled
76-
std::function<void()> callback = [this]() {
70+
// Create the callback to cancel the infeasible requests if the feature is enabled
71+
bool has_new_infeasible_requests = false;
72+
std::function<void()> callback = [this, &has_new_infeasible_requests]() {
7773
bool enable_infeasible_task_early_exit =
7874
RayConfig::instance().enable_infeasible_task_early_exit();
7975

8076
if (enable_infeasible_task_early_exit) {
8177
this->CancelInfeasibleRequests();
78+
} else if (has_new_infeasible_requests) {
79+
// publish error message
80+
std::string error_message =
81+
"There are tasks with infeasible resource requests that cannot "
82+
"be scheduled. See "
83+
"https://docs.ray.io/en/latest/ray-core/scheduling/"
84+
"index.html#ray-scheduling-resources "
85+
"for more details. Possible solutions: "
86+
"1. Updating the ray cluster to include nodes with all required resources "
87+
"2. To cause the tasks with infeasible requests to raise an error instead "
88+
"of hanging, set the 'RAY_enable_infeasible_task_early_exit=true'. "
89+
"This feature will be turned on by default in a future release of Ray.";
90+
RAY_LOG(WARNING) << error_message;
91+
92+
if (gcs_publisher_ != nullptr) {
93+
std::string error_type = "infeasible_resource_requests";
94+
auto error_data_ptr = gcs::CreateErrorTableData(
95+
error_type, error_message, absl::FromUnixMillis(current_time_ms()));
96+
RAY_CHECK_OK(
97+
gcs_publisher_->PublishError(session_name_, *error_data_ptr, nullptr));
98+
}
8299
}
83100
};
84101

102+
// Never seen any autoscaling state before - so just takes this.
103+
if (!autoscaling_state_.has_value()) {
104+
autoscaling_state_ = *std::move(request.mutable_autoscaling_state());
105+
106+
if (autoscaling_state_->infeasible_resource_requests_size() > 0) {
107+
has_new_infeasible_requests = true;
108+
}
109+
110+
send_reply_callback(ray::Status::OK(), callback, nullptr);
111+
return;
112+
}
113+
85114
// We have a state cached. We discard the incoming state if it's older than the
86115
// cached state.
87116
if (request.autoscaling_state().autoscaler_state_version() <
@@ -96,6 +125,10 @@ void GcsAutoscalerStateManager::HandleReportAutoscalingState(
96125
}
97126

98127
// We should overwrite the cache version.
128+
if (autoscaling_state_->infeasible_resource_requests_size() <
129+
request.mutable_autoscaling_state()->infeasible_resource_requests_size()) {
130+
has_new_infeasible_requests = true;
131+
}
99132
autoscaling_state_ = std::move(*request.mutable_autoscaling_state());
100133
send_reply_callback(ray::Status::OK(), callback, nullptr);
101134
}

src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "ray/gcs/gcs_server/gcs_init_data.h"
2323
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
24+
#include "ray/gcs/pubsub/gcs_pub_sub.h"
2425
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
2526
#include "ray/rpc/node_manager/node_manager_client_pool.h"
2627
#include "ray/util/thread_checker.h"
@@ -42,7 +43,8 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
4243
const GcsPlacementGroupManager &gcs_placement_group_manager,
4344
rpc::NodeManagerClientPool &raylet_client_pool,
4445
InternalKVInterface &kv,
45-
instrumented_io_context &io_context);
46+
instrumented_io_context &io_context,
47+
GcsPublisher *gcs_publisher);
4648

4749
void HandleGetClusterResourceState(
4850
rpc::autoscaler::GetClusterResourceStateRequest request,
@@ -188,6 +190,9 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
188190
InternalKVInterface &kv_;
189191
instrumented_io_context &io_context_;
190192

193+
// A publisher for publishing gcs messages.
194+
GcsPublisher *gcs_publisher_;
195+
191196
// The default value of the last seen version for the request is 0, which indicates
192197
// no version has been reported. So the first reported version should be 1.
193198
// We currently provide two guarantees for this version:

src/ray/gcs/gcs_server/gcs_server.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,8 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data)
708708
*gcs_placement_group_manager_,
709709
*raylet_client_pool_,
710710
kv_manager_->GetInstance(),
711-
io_context_provider_.GetDefaultIOContext());
711+
io_context_provider_.GetDefaultIOContext(),
712+
gcs_publisher_.get());
712713
gcs_autoscaler_state_manager_->Initialize(gcs_init_data);
713714

714715
autoscaler_state_service_.reset(new rpc::autoscaler::AutoscalerStateGrpcService(

src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
9696
*gcs_placement_group_manager_,
9797
*client_pool_,
9898
kv_manager_->GetInstance(),
99-
io_service_));
99+
io_service_,
100+
/*gcs_publisher=*/nullptr));
100101
}
101102

102103
public:

0 commit comments

Comments
 (0)