Skip to content

Commit 572f676

Browse files
authored
Remove unused "nums_reconnect_retry" argument from GCS clients (#51298)
This had no effect... --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent 58e1f34 commit 572f676

File tree

14 files changed

+25
-91
lines changed

14 files changed

+25
-91
lines changed

python/ray/_private/gcs_aio_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ def __init__(
2020
address: str = None,
2121
loop=None,
2222
executor=None,
23-
nums_reconnect_retry: int = 5,
2423
cluster_id: Optional[str] = None,
2524
):
2625
# This must be consistent with GcsClient.__cinit__ in _raylet.pyx

python/ray/_private/usage/usage_lib.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -854,9 +854,7 @@ def generate_report_data(
854854
"""
855855
assert cluster_id
856856

857-
gcs_client = ray._raylet.GcsClient(
858-
address=gcs_address, nums_reconnect_retry=20, cluster_id=cluster_id
859-
)
857+
gcs_client = ray._raylet.GcsClient(address=gcs_address, cluster_id=cluster_id)
860858

861859
cluster_metadata = get_cluster_metadata(gcs_client)
862860
cluster_status_to_report = get_cluster_status_to_report(gcs_client)

python/ray/_raylet.pyx

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2720,42 +2720,6 @@ cdef class EmptyProfileEvent:
27202720
pass
27212721

27222722

2723-
def _auto_reconnect(f):
2724-
@wraps(f)
2725-
def wrapper(self, *args, **kwargs):
2726-
if "TEST_RAY_COLLECT_KV_FREQUENCY" in os.environ:
2727-
with ray._private.utils._CALLED_FREQ_LOCK:
2728-
ray._private.utils._CALLED_FREQ[f.__name__] += 1
2729-
remaining_retry = self._nums_reconnect_retry
2730-
while True:
2731-
try:
2732-
return f(self, *args, **kwargs)
2733-
except RpcError as e:
2734-
if e.rpc_code in [
2735-
GRPC_STATUS_CODE_UNAVAILABLE,
2736-
GRPC_STATUS_CODE_UNKNOWN,
2737-
]:
2738-
if remaining_retry <= 0:
2739-
logger.error(
2740-
"Failed to connect to GCS. Please check"
2741-
" `gcs_server.out` for more details."
2742-
)
2743-
raise
2744-
logger.debug(
2745-
f"Failed to send request to gcs, reconnecting. Error {e}"
2746-
)
2747-
try:
2748-
self._connect()
2749-
except Exception:
2750-
logger.error(f"Connecting to gcs failed. Error {e}")
2751-
time.sleep(1)
2752-
remaining_retry -= 1
2753-
continue
2754-
raise
2755-
2756-
return wrapper
2757-
2758-
27592723
cdef class GcsClient:
27602724
"""
27612725
Client to the GCS server. Only contains synchronous methods. For async methods,
@@ -2766,10 +2730,8 @@ cdef class GcsClient:
27662730

27672731
cdef InnerGcsClient inner
27682732

2769-
def __cinit__(self, address,
2770-
nums_reconnect_retry=RayConfig.instance().nums_py_gcs_reconnect_retry(
2771-
),
2772-
cluster_id: str = None):
2733+
def __cinit__(self, address: str,
2734+
cluster_id: Optional[str] = None):
27732735
# For timeout (DEADLINE_EXCEEDED): retries once with timeout_ms.
27742736
#
27752737
# For other RpcError (UNAVAILABLE, UNKNOWN): retries indefinitely until it

python/ray/dashboard/agent.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ def __init__(
7777
# Used by the agent and sub-modules.
7878
self.gcs_aio_client = GcsAioClient(
7979
address=self.gcs_address,
80-
nums_reconnect_retry=ray._config.gcs_rpc_server_reconnect_timeout_s(),
8180
cluster_id=self.cluster_id_hex,
8281
)
8382

python/ray/dashboard/head.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,9 @@ async def run(self):
257257
gcs_address = self.gcs_address
258258

259259
# Dashboard will handle connection failure automatically
260-
self.gcs_client = GcsClient(
261-
address=gcs_address, nums_reconnect_retry=0, cluster_id=self.cluster_id_hex
262-
)
260+
self.gcs_client = GcsClient(address=gcs_address, cluster_id=self.cluster_id_hex)
263261
self.gcs_aio_client = GcsAioClient(
264-
address=gcs_address, nums_reconnect_retry=0, cluster_id=self.cluster_id_hex
262+
address=gcs_address, cluster_id=self.cluster_id_hex
265263
)
266264
internal_kv._initialize_internal_kv(self.gcs_client)
267265

python/ray/dashboard/modules/event/tests/test_generate_export_events.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,7 @@ async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa:
163163
"""
164164

165165
address_info = ray.init(address=call_ray_start)
166-
gcs_aio_client = GcsAioClient(
167-
address=address_info["gcs_address"], nums_reconnect_retry=0
168-
)
166+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
169167
job_manager = JobManager(gcs_aio_client, tmp_path)
170168

171169
# Submit a job.

python/ray/dashboard/modules/job/tests/conftest.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ def create_ray_cluster(_tracing_startup_hook=None):
2020

2121
def create_job_manager(ray_cluster, tmp_path):
2222
address_info = ray_cluster
23-
gcs_aio_client = GcsAioClient(
24-
address=address_info["gcs_address"], nums_reconnect_retry=0
25-
)
23+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
2624
return JobManager(gcs_aio_client, tmp_path)
2725

2826

python/ray/dashboard/modules/job/tests/test_job_manager.py

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ async def test_get_scheduling_strategy(
5858
):
5959
monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0")
6060
address_info = ray.init(address=call_ray_start)
61-
gcs_aio_client = GcsAioClient(
62-
address=address_info["gcs_address"], nums_reconnect_retry=0
63-
)
61+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
6462

6563
job_manager = JobManager(gcs_aio_client, tmp_path)
6664

@@ -104,9 +102,7 @@ async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
104102
"""Test that a job script with an unspecified Ray address works."""
105103

106104
address_info = ray.init(address=call_ray_start)
107-
gcs_aio_client = GcsAioClient(
108-
address=address_info["gcs_address"], nums_reconnect_retry=0
109-
)
105+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
110106
job_manager = JobManager(gcs_aio_client, tmp_path)
111107

112108
init_ray_no_address_script = """
@@ -141,9 +137,7 @@ async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
141137
async def test_get_all_job_info(call_ray_start, tmp_path): # noqa: F811
142138
"""Test that JobInfo is correctly populated in the GCS get_all_job_info API."""
143139
address_info = ray.init(address=call_ray_start)
144-
gcs_aio_client = GcsAioClient(
145-
address=address_info["gcs_address"], nums_reconnect_retry=0
146-
)
140+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
147141
job_manager = JobManager(gcs_aio_client, tmp_path)
148142

149143
# Submit a job.
@@ -190,9 +184,7 @@ async def test_get_all_job_info_with_is_running_tasks(call_ray_start): # noqa:
190184
"""Test the is_running_tasks bit in the GCS get_all_job_info API."""
191185

192186
address_info = ray.init(address=call_ray_start)
193-
gcs_aio_client = GcsAioClient(
194-
address=address_info["gcs_address"], nums_reconnect_retry=0
195-
)
187+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
196188

197189
@ray.remote
198190
def sleep_forever():
@@ -271,9 +263,7 @@ class Actor:
271263
async def test_job_supervisor_log_json(call_ray_start, tmp_path): # noqa: F811
272264
"""Test JobSupervisor logs are structured JSON logs"""
273265
address_info = ray.init(address=call_ray_start)
274-
gcs_aio_client = GcsAioClient(
275-
address=address_info["gcs_address"], nums_reconnect_retry=0
276-
)
266+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
277267
job_manager = JobManager(gcs_aio_client, tmp_path)
278268
job_id = await job_manager.submit_job(
279269
entrypoint="echo hello 1", submission_id="job_1"
@@ -304,9 +294,7 @@ async def test_job_supervisor_logs_saved(
304294
):
305295
"""Test JobSupervisor logs are saved to jobs/supervisor-{submission_id}.log"""
306296
address_info = ray.init(address=call_ray_start)
307-
gcs_aio_client = GcsAioClient(
308-
address=address_info["gcs_address"], nums_reconnect_retry=0
309-
)
297+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
310298
job_manager = JobManager(gcs_aio_client, tmp_path)
311299
job_id = await job_manager.submit_job(
312300
entrypoint="echo hello 1", submission_id="job_1"
@@ -342,9 +330,7 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
342330
):
343331
"""Test runtime env setup messages are logged to jobs driver log"""
344332
address_info = ray.init(address=call_ray_start)
345-
gcs_aio_client = GcsAioClient(
346-
address=address_info["gcs_address"], nums_reconnect_retry=0
347-
)
333+
gcs_aio_client = GcsAioClient(address=address_info["gcs_address"])
348334
job_manager = JobManager(gcs_aio_client, tmp_path)
349335

350336
job_id = await job_manager.submit_job(

python/ray/dashboard/utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ def gcs_client(self):
169169
if self._gcs_client is None:
170170
self._gcs_client = GcsClient(
171171
address=self._config.gcs_address,
172-
nums_reconnect_retry=0,
173172
cluster_id=self._config.cluster_id_hex,
174173
)
175174
return self._gcs_client
@@ -179,7 +178,6 @@ def gcs_aio_client(self):
179178
if self._gcs_aio_client is None:
180179
self._gcs_aio_client = GcsAioClient(
181180
address=self._config.gcs_address,
182-
nums_reconnect_retry=0,
183181
cluster_id=self._config.cluster_id_hex,
184182
)
185183
if not internal_kv._internal_kv_initialized():
@@ -835,7 +833,7 @@ def ray_address_to_api_server_url(address: Optional[str]) -> str:
835833
"""
836834

837835
address = services.canonicalize_bootstrap_address_or_die(address)
838-
gcs_client = GcsClient(address=address, nums_reconnect_retry=0)
836+
gcs_client = GcsClient(address=address)
839837

840838
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
841839
api_server_url = ray._private.utils.internal_kv_get_with_retry(

python/ray/tests/test_gcs_fault_tolerance.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ def ready(self):
437437

438438
def test_gcs_client_reconnect(ray_start_regular_with_external_redis):
439439
gcs_address = ray._private.worker.global_worker.gcs_client.address
440-
gcs_client = ray._raylet.GcsClient(address=gcs_address, nums_reconnect_retry=20)
440+
gcs_client = ray._raylet.GcsClient(address=gcs_address)
441441

442442
gcs_client.internal_kv_put(b"a", b"b", True, None)
443443
assert gcs_client.internal_kv_get(b"a", None) == b"b"
@@ -467,9 +467,7 @@ def test_gcs_aio_client_reconnect(ray_start_regular_with_external_redis):
467467
passed = [False]
468468

469469
async def async_kv_get():
470-
gcs_aio_client = gcs_utils.GcsAioClient(
471-
address=gcs_address, nums_reconnect_retry=20
472-
)
470+
gcs_aio_client = gcs_utils.GcsAioClient(address=gcs_address)
473471
assert await gcs_aio_client.internal_kv_get(b"a", None) == b"b"
474472
return True
475473

0 commit comments

Comments
 (0)