From b3a4e8df9c756047c0ba8fc21d42aca706a1266d Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 17:19:26 -0700 Subject: [PATCH 01/15] grpo multi-node --- .../grpo/grpo-sync-multi-node.sbatch | 37 +++++++++++ .../grpo/run_in_ray_cluster.sh | 65 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 sota-implementations/grpo/grpo-sync-multi-node.sbatch create mode 100644 sota-implementations/grpo/run_in_ray_cluster.sh diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch new file mode 100644 index 00000000000..6b29b91538f --- /dev/null +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -0,0 +1,37 @@ +#!/bin/bash +#SBATCH --job-name=grpo-sync-multi-node +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=96 +#SBATCH --exclusive +#SBATCH --output=logs/%x.job%j.out +#SBATCH --time=1:00:00 + +# Exit on any error +set -euo pipefail + +# Ensure logs directory exists +mkdir -p logs + +# Set up Ray cluster configuration +export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) +export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -i | head -n 1) +export RAY_PORT=6379 + +# Environment variables for the application +export VLLM_USE_V1=0 + +# Optional: Set Ray-specific environment variables +export RAY_DEDUP_LOGS=0 # Avoid duplicate logs +export PYTHONUNBUFFERED=1 # Ensure Python output is not buffered + +echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" +echo "Total nodes: $SLURM_NNODES" +echo "Job ID: $SLURM_JOB_ID" + +CMD="python sota-implementations/grpo/grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" + +# Run the Ray cluster +srun bash run_in_ray_cluster.sh "$CMD" + +echo "Job completed" diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh new file mode 100644 index 00000000000..889a6838284 --- /dev/null +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +set -euo pipefail + +# Set up cleanup trap early +trap cleanup EXIT + +# Utility to check required environment variables +check_env_var() { + if [ -z "${!1}" ]; then + echo "Error: Required environment variable $1 is not set" + exit 1 + fi +} + +CURRENT_NODE=$(hostname | cut -d. -f1) +CMD="$1" + +echo "SLURM_NODEID: $SLURM_NODEID" +echo "SLURM_NNODES: $SLURM_NNODES" +echo "Current node: $CURRENT_NODE" +echo "Head node: $HEAD_NODE ($HEAD_NODE_IP)" +echo "Ray port: $RAY_PORT" +echo "Command: $CMD" + +check_env_var "HEAD_NODE" +check_env_var "HEAD_NODE_IP" +check_env_var "RAY_PORT" +check_env_var "SLURM_NODEID" +check_env_var "SLURM_NNODES" + +# Node 0 is the Ray head node +if [ "$SLURM_NODEID" -eq 0 ]; then + echo "Starting Ray head on Node 0" + ray start --head --disable-usage-stats --port=$RAY_PORT + echo "Ray head node started at $HEAD_NODE_IP:$RAY_PORT" + + # Give Ray head time to initialize + sleep 5 + + # Run the command on head node + echo "Running command on head node $CURRENT_NODE" + bash -c "$CMD" + +else + echo "Waiting for Ray head node to be ready..." + sleep 10 + + echo "Starting Ray worker on node $CURRENT_NODE (ID: $SLURM_NODEID)" + ray start --disable-usage-stats --address="$HEAD_NODE_IP:$RAY_PORT" + + # Run the command on worker node + echo "Running command on worker node $CURRENT_NODE" + bash -c "$CMD" +fi + +echo "Node $CURRENT_NODE: Done" + +# Define cleanup function at the end +cleanup() { + if [ -n "$(command -v ray)" ]; then + echo "Stopping Ray on node $CURRENT_NODE" + ray stop + fi +} From 01437984e47b1a2c8db3d6702181de8bb7cdb27f Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 17:27:42 -0700 Subject: [PATCH 02/15] grpo multi-node --- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 3 +++ sota-implementations/grpo/grpo-sync.py | 11 +++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 6b29b91538f..0df094d9a3f 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -21,6 +21,9 @@ export RAY_PORT=6379 # Environment variables for the application export VLLM_USE_V1=0 +# Indicate that Ray cluster is managed externally +export RAY_CLUSTER_MANAGED_EXTERNALLY=1 + # Optional: Set Ray-specific environment variables export RAY_DEDUP_LOGS=0 # Avoid duplicate logs export PYTHONUNBUFFERED=1 # Ensure Python output is not buffered diff --git a/sota-implementations/grpo/grpo-sync.py b/sota-implementations/grpo/grpo-sync.py index bd88bfd6be2..c9ed2443fec 100644 --- a/sota-implementations/grpo/grpo-sync.py +++ b/sota-implementations/grpo/grpo-sync.py @@ -160,8 +160,11 @@ def train( model_metadata = vLLMUpdater.get_model_metadata(policy_training) # Create weight updater with remote LLM + ray_managed_externally = os.environ.get("RAY_CLUSTER_MANAGED_EXTERNALLY") weight_updater: vLLMUpdater = make_weight_updater( - master_address="localhost", # Since we're running locally + master_address="localhost" + if not ray_managed_externally + else ray.util.get_node_ip_address(), # Since we're running locally master_port=None, # Will auto-assign an open port model_metadata=model_metadata, vllm_tp_size=cfg.inference_model.num_devices @@ -436,7 +439,11 @@ def main(cfg): ray_init_config["runtime_env"]["env_vars"] ) torchrl_logger.info(f"Ray init config: {ray_init_config=}") - ray.init(**ray_init_config) + ray_managed_externally = os.environ.get("RAY_CLUSTER_MANAGED_EXTERNALLY") + if ray_managed_externally: + ray.init(address="auto") + else: + ray.init(**ray_init_config) # Check if num_devices is set if cfg.inference_model.num_devices is None: From ac90875983882c4bc9bfb9ca485d8f72df1156f4 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:05:57 -0700 Subject: [PATCH 03/15] fixes --- .../grpo/grpo-async-multi-node.sbatch | 41 +++++++++++++++++++ sota-implementations/grpo/grpo-async.py | 11 ++++- .../grpo/grpo-sync-multi-node.sbatch | 5 ++- sota-implementations/grpo/grpo-sync.py | 2 +- .../grpo/run_in_ray_cluster.sh | 39 ++++++++---------- 5 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 sota-implementations/grpo/grpo-async-multi-node.sbatch diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch new file mode 100644 index 00000000000..fa15bc3cf7f --- /dev/null +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -0,0 +1,41 @@ +#!/bin/bash +#SBATCH --job-name=grpo-sync-multi-node +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=96 +#SBATCH --exclusive +#SBATCH --output=logs/%x.job%j.out +#SBATCH --time=1:00:00 + +# Exit on any error +set -euo pipefail + +# Ensure logs directory exists +mkdir -p logs + +# Set up Ray cluster configuration +export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) +# Grab the first valid IPv4 address only +export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') +export RAY_PORT=6379 + +# Environment variables for the application +export VLLM_USE_V1=0 + +# Indicate that Ray cluster is managed externally +export RAY_CLUSTER_MANAGED_EXTERNALLY=1 + +# Optional: Set Ray-specific environment variables +export RAY_DEDUP_LOGS=0 # Avoid duplicate logs +export PYTHONUNBUFFERED=1 # Ensure Python output is not buffered + +echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" +echo "Total nodes: $SLURM_NNODES" +echo "Job ID: $SLURM_JOB_ID" + +CMD="python grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" + +# Run the Ray cluster +srun bash run_in_ray_cluster.sh "$CMD" + +echo "Job completed" diff --git a/sota-implementations/grpo/grpo-async.py b/sota-implementations/grpo/grpo-async.py index 6ea882cd5dc..da29876786f 100644 --- a/sota-implementations/grpo/grpo-async.py +++ b/sota-implementations/grpo/grpo-async.py @@ -159,8 +159,11 @@ def train( model_metadata = vLLMUpdater.get_model_metadata(policy_training) # Create weight updater with remote LLM + ray_managed_externally = os.environ.get("RAY_CLUSTER_MANAGED_EXTERNALLY") weight_updater: vLLMUpdater = make_weight_updater( - master_address="localhost", # Since we're running locally + master_address="localhost" + if not ray_managed_externally + else ray.util.get_node_ip_address(), master_port=None, # Will auto-assign an open port model_metadata=model_metadata, vllm_tp_size=cfg.inference_model.num_devices @@ -422,7 +425,11 @@ def main(cfg): ray_init_config["runtime_env"]["env_vars"] ) torchrl_logger.info(f"Ray init config: {ray_init_config=}") - ray.init(**ray_init_config) + ray_managed_externally = os.environ.get("RAY_CLUSTER_MANAGED_EXTERNALLY") + if ray_managed_externally: + ray.init(address="auto") + else: + ray.init(**ray_init_config) # Check if num_devices is set if cfg.inference_model.num_devices is None: diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 0df094d9a3f..fa15bc3cf7f 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -15,7 +15,8 @@ mkdir -p logs # Set up Ray cluster configuration export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -i | head -n 1) +# Grab the first valid IPv4 address only +export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') export RAY_PORT=6379 # Environment variables for the application @@ -32,7 +33,7 @@ echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" -CMD="python sota-implementations/grpo/grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" +CMD="python grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" # Run the Ray cluster srun bash run_in_ray_cluster.sh "$CMD" diff --git a/sota-implementations/grpo/grpo-sync.py b/sota-implementations/grpo/grpo-sync.py index c9ed2443fec..c100688b20e 100644 --- a/sota-implementations/grpo/grpo-sync.py +++ b/sota-implementations/grpo/grpo-sync.py @@ -164,7 +164,7 @@ def train( weight_updater: vLLMUpdater = make_weight_updater( master_address="localhost" if not ray_managed_externally - else ray.util.get_node_ip_address(), # Since we're running locally + else ray.util.get_node_ip_address(), master_port=None, # Will auto-assign an open port model_metadata=model_metadata, vllm_tp_size=cfg.inference_model.num_devices diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh index 889a6838284..e364f679b91 100644 --- a/sota-implementations/grpo/run_in_ray_cluster.sh +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -2,6 +2,14 @@ set -euo pipefail +# Define cleanup function BEFORE trap +cleanup() { + if command -v ray &>/dev/null; then + echo "Stopping Ray on node $CURRENT_NODE" + ray stop || true + fi +} + # Set up cleanup trap early trap cleanup EXIT @@ -32,34 +40,23 @@ check_env_var "SLURM_NNODES" # Node 0 is the Ray head node if [ "$SLURM_NODEID" -eq 0 ]; then echo "Starting Ray head on Node 0" - ray start --head --disable-usage-stats --port=$RAY_PORT + ray start --head --disable-usage-stats --port="$RAY_PORT" echo "Ray head node started at $HEAD_NODE_IP:$RAY_PORT" - - # Give Ray head time to initialize - sleep 5 - - # Run the command on head node - echo "Running command on head node $CURRENT_NODE" - bash -c "$CMD" - + + echo "Ray head is running on $CURRENT_NODE — waiting indefinitely to keep cluster alive..." + sleep infinity else echo "Waiting for Ray head node to be ready..." sleep 10 - + echo "Starting Ray worker on node $CURRENT_NODE (ID: $SLURM_NODEID)" - ray start --disable-usage-stats --address="$HEAD_NODE_IP:$RAY_PORT" - - # Run the command on worker node + ray start --disable-usage-stats --address="$HEAD_NODE_IP:$RAY_PORT" || { + echo "Failed to start Ray worker" + exit 1 + } + echo "Running command on worker node $CURRENT_NODE" bash -c "$CMD" fi echo "Node $CURRENT_NODE: Done" - -# Define cleanup function at the end -cleanup() { - if [ -n "$(command -v ray)" ]; then - echo "Stopping Ray on node $CURRENT_NODE" - ray stop - fi -} From b2c934fb0dc54ef13f8b799585e7b274449f0c23 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:07:52 -0700 Subject: [PATCH 04/15] fixes --- sota-implementations/grpo/grpo-async-multi-node.sbatch | 5 +---- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index fa15bc3cf7f..a6ad4ac22f2 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -20,15 +20,12 @@ export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | aw export RAY_PORT=6379 # Environment variables for the application +export LIST_TO_STACK=1 export VLLM_USE_V1=0 # Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 -# Optional: Set Ray-specific environment variables -export RAY_DEDUP_LOGS=0 # Avoid duplicate logs -export PYTHONUNBUFFERED=1 # Ensure Python output is not buffered - echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index fa15bc3cf7f..a6ad4ac22f2 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -20,15 +20,12 @@ export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | aw export RAY_PORT=6379 # Environment variables for the application +export LIST_TO_STACK=1 export VLLM_USE_V1=0 # Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 -# Optional: Set Ray-specific environment variables -export RAY_DEDUP_LOGS=0 # Avoid duplicate logs -export PYTHONUNBUFFERED=1 # Ensure Python output is not buffered - echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" From 55c1e546d98e1a495769099d5a55166cd8ffe3c9 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:10:41 -0700 Subject: [PATCH 05/15] fixes --- sota-implementations/grpo/grpo-async-multi-node.sbatch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index a6ad4ac22f2..0aa9d66f20a 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -30,7 +30,7 @@ echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" -CMD="python grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" +CMD="python grpo-async.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" # Run the Ray cluster srun bash run_in_ray_cluster.sh "$CMD" From 5be1e22cc05747bd3dfe039c5fc79626129aeb5d Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:11:52 -0700 Subject: [PATCH 06/15] fixes --- sota-implementations/grpo/grpo-async-multi-node.sbatch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index 0aa9d66f20a..50b5ccac5b5 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -30,7 +30,7 @@ echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" -CMD="python grpo-async.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" +CMD="python grpo-async.py mode=async train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" # Run the Ray cluster srun bash run_in_ray_cluster.sh "$CMD" From 0bf03e11e7728709bc1d6e9118171117360f6e91 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:12:44 -0700 Subject: [PATCH 07/15] fixes --- sota-implementations/grpo/grpo-async-multi-node.sbatch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index 50b5ccac5b5..245dff2a24c 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -1,5 +1,5 @@ #!/bin/bash -#SBATCH --job-name=grpo-sync-multi-node +#SBATCH --job-name=grpo-async-multi-node #SBATCH --nodes=2 #SBATCH --ntasks-per-node=1 #SBATCH --cpus-per-task=96 From f6eb1845ac5f2ec17549e8fafba08842a9e49287 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:25:56 -0700 Subject: [PATCH 08/15] README --- sota-implementations/grpo/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sota-implementations/grpo/README.md b/sota-implementations/grpo/README.md index 58f2a4fe633..74ee2b422be 100644 --- a/sota-implementations/grpo/README.md +++ b/sota-implementations/grpo/README.md @@ -134,6 +134,20 @@ The async mode offers better performance by: - Better throughput - More flexible buffer management +### Running GRPO on More Than One Node with SLURM + +GRPO can be run across more than one node using Ray and SLURM, enabling distributed training for moderately scaled workloads. + +Two scripts are provided for launching multi-node runs: + +- `grpo-sync-multi-node.sbatch`: SLURM job script that launches sync GRPO across multiple nodes using Ray. +- `grpo-async-multi-node.sbatch`: SLURM job script that launches async GRPO across multiple nodes using Ray. + +Example Usage: + +```bash +sbatch sota-implementations/grpo/grpo-sync-multi-node.sbatch + ### KL Divergences in PPO: Reference vs Inference KL divergence is a key regularization term in policy optimization algorithms like PPO and in LLM post-training. It measures how much the updated policy diverges from a baseline or reference policy, helping to prevent the new policy from drifting too far and ensuring stable learning. From d22f5001e97fea2e15222a7f00fda49732d9023f Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:30:38 -0700 Subject: [PATCH 09/15] fixes --- sota-implementations/grpo/README.md | 2 +- sota-implementations/grpo/grpo-async-multi-node.sbatch | 6 +++--- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 3 --- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sota-implementations/grpo/README.md b/sota-implementations/grpo/README.md index 74ee2b422be..e526338f16e 100644 --- a/sota-implementations/grpo/README.md +++ b/sota-implementations/grpo/README.md @@ -136,7 +136,7 @@ The async mode offers better performance by: ### Running GRPO on More Than One Node with SLURM -GRPO can be run across more than one node using Ray and SLURM, enabling distributed training for moderately scaled workloads. +GRPO can be run across more than one node using SLURM, enabling distributed training for moderately scaled workloads. Two scripts are provided for launching multi-node runs: diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index 245dff2a24c..add0cc58887 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -15,17 +15,17 @@ mkdir -p logs # Set up Ray cluster configuration export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -# Grab the first valid IPv4 address only export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') export RAY_PORT=6379 - -# Environment variables for the application export LIST_TO_STACK=1 export VLLM_USE_V1=0 # Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 +# Indicate that Ray cluster is managed externally +export RAY_CLUSTER_MANAGED_EXTERNALLY=1 + echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" echo "Total nodes: $SLURM_NNODES" echo "Job ID: $SLURM_JOB_ID" diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index a6ad4ac22f2..7ae4acf66e5 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -15,14 +15,11 @@ mkdir -p logs # Set up Ray cluster configuration export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -# Grab the first valid IPv4 address only export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') export RAY_PORT=6379 -# Environment variables for the application export LIST_TO_STACK=1 export VLLM_USE_V1=0 - # Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 From 2f0983d2f53218e135f92c7e49d5ce3792a9fc53 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:53:19 -0700 Subject: [PATCH 10/15] fixes --- .../grpo/grpo-async-multi-node.sbatch | 17 +---- .../grpo/grpo-sync-multi-node.sbatch | 15 +--- .../grpo/run_in_ray_cluster.sh | 69 ++++++++----------- 3 files changed, 33 insertions(+), 68 deletions(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index add0cc58887..968fb1a8896 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -13,26 +13,13 @@ set -euo pipefail # Ensure logs directory exists mkdir -p logs -# Set up Ray cluster configuration -export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') -export RAY_PORT=6379 +# Environment variables export LIST_TO_STACK=1 export VLLM_USE_V1=0 - -# Indicate that Ray cluster is managed externally -export RAY_CLUSTER_MANAGED_EXTERNALLY=1 - -# Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 -echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" -echo "Total nodes: $SLURM_NNODES" -echo "Job ID: $SLURM_JOB_ID" - +# Run command in Ray cluster CMD="python grpo-async.py mode=async train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" - -# Run the Ray cluster srun bash run_in_ray_cluster.sh "$CMD" echo "Job completed" diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 7ae4acf66e5..7f3a2d12531 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -13,23 +13,12 @@ set -euo pipefail # Ensure logs directory exists mkdir -p logs -# Set up Ray cluster configuration -export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') -export RAY_PORT=6379 - +# Environment variables export LIST_TO_STACK=1 export VLLM_USE_V1=0 -# Indicate that Ray cluster is managed externally export RAY_CLUSTER_MANAGED_EXTERNALLY=1 -echo "Starting Ray cluster with head node: $HEAD_NODE ($HEAD_NODE_IP)" -echo "Total nodes: $SLURM_NNODES" -echo "Job ID: $SLURM_JOB_ID" - -CMD="python grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" - -# Run the Ray cluster +# Run command in Ray cluster srun bash run_in_ray_cluster.sh "$CMD" echo "Job completed" diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh index e364f679b91..aebb79440f7 100644 --- a/sota-implementations/grpo/run_in_ray_cluster.sh +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -2,61 +2,50 @@ set -euo pipefail -# Define cleanup function BEFORE trap +# Get command from argument +CMD="$1" + +# Set up Ray cluster configuration +export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) +export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') +export RAY_PORT=6379 + +# Get current node name (normalize hostname) +CURRENT_NODE=$(hostname | cut -d. -f1) +# Set up cleanup function cleanup() { if command -v ray &>/dev/null; then echo "Stopping Ray on node $CURRENT_NODE" ray stop || true fi } - -# Set up cleanup trap early trap cleanup EXIT -# Utility to check required environment variables -check_env_var() { - if [ -z "${!1}" ]; then - echo "Error: Required environment variable $1 is not set" - exit 1 - fi -} - -CURRENT_NODE=$(hostname | cut -d. -f1) -CMD="$1" - -echo "SLURM_NODEID: $SLURM_NODEID" -echo "SLURM_NNODES: $SLURM_NNODES" -echo "Current node: $CURRENT_NODE" -echo "Head node: $HEAD_NODE ($HEAD_NODE_IP)" -echo "Ray port: $RAY_PORT" -echo "Command: $CMD" - -check_env_var "HEAD_NODE" -check_env_var "HEAD_NODE_IP" -check_env_var "RAY_PORT" -check_env_var "SLURM_NODEID" -check_env_var "SLURM_NNODES" - -# Node 0 is the Ray head node +# Start Ray based on node role if [ "$SLURM_NODEID" -eq 0 ]; then - echo "Starting Ray head on Node 0" - ray start --head --disable-usage-stats --port="$RAY_PORT" + echo "Starting Ray head node on $CURRENT_NODE" + ray start --head --disable-usage-stats --port=$RAY_PORT echo "Ray head node started at $HEAD_NODE_IP:$RAY_PORT" - - echo "Ray head is running on $CURRENT_NODE — waiting indefinitely to keep cluster alive..." - sleep infinity else - echo "Waiting for Ray head node to be ready..." + echo "Waiting for head node to be ready..." sleep 10 - echo "Starting Ray worker on node $CURRENT_NODE (ID: $SLURM_NODEID)" - ray start --disable-usage-stats --address="$HEAD_NODE_IP:$RAY_PORT" || { - echo "Failed to start Ray worker" - exit 1 - } + ray start --disable-usage-stats --address="$HEAD_NODE_IP:$RAY_PORT" +fi + +# Ensure Ray cluster is ready +sleep 2 - echo "Running command on worker node $CURRENT_NODE" +# Only head node runs the training command +if [ "$SLURM_NODEID" -eq 0 ]; then + echo "Starting training process on head node $CURRENT_NODE" bash -c "$CMD" +else + # Worker nodes just wait for the head to finish + echo "Worker node $CURRENT_NODE waiting for head node to complete..." + while ray status --address="$HEAD_NODE_IP:$RAY_PORT" &>/dev/null; do + sleep 10 + done fi echo "Node $CURRENT_NODE: Done" From a6aad1207c414bf54bc7884f3b63a7af933c6eb9 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:54:15 -0700 Subject: [PATCH 11/15] fixes --- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 1 + 1 file changed, 1 insertion(+) diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 7f3a2d12531..98c37600fa4 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -19,6 +19,7 @@ export VLLM_USE_V1=0 export RAY_CLUSTER_MANAGED_EXTERNALLY=1 # Run command in Ray cluster +CMD="python grpo-sync.py mode=sync train_model.num_devices=8 ref_model.num_devices=4 inference_model.num_devices=4" srun bash run_in_ray_cluster.sh "$CMD" echo "Job completed" From b76959462dd972b7d51f7b18e2f9a61af8783f8c Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 20:58:24 -0700 Subject: [PATCH 12/15] fixes --- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 5 +++++ sota-implementations/grpo/run_in_ray_cluster.sh | 5 ----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 98c37600fa4..edd5573fa84 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -13,6 +13,11 @@ set -euo pipefail # Ensure logs directory exists mkdir -p logs +# Set up Ray cluster configuration +export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) +export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') +export RAY_PORT=6379 + # Environment variables export LIST_TO_STACK=1 export VLLM_USE_V1=0 diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh index aebb79440f7..8181c7d303a 100644 --- a/sota-implementations/grpo/run_in_ray_cluster.sh +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -5,11 +5,6 @@ set -euo pipefail # Get command from argument CMD="$1" -# Set up Ray cluster configuration -export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') -export RAY_PORT=6379 - # Get current node name (normalize hostname) CURRENT_NODE=$(hostname | cut -d. -f1) # Set up cleanup function From fe78c3fe84875cbff6b71c5356b3d2a0b83e44e1 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 21:00:45 -0700 Subject: [PATCH 13/15] fixes --- .../grpo/grpo-sync-multi-node.sbatch | 5 ----- sota-implementations/grpo/run_in_ray_cluster.sh | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index edd5573fa84..98c37600fa4 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -13,11 +13,6 @@ set -euo pipefail # Ensure logs directory exists mkdir -p logs -# Set up Ray cluster configuration -export HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) -export HEAD_NODE_IP=$(srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" hostname -I | awk '{for(i=1;i<=NF;i++) if ($i ~ /^[0-9]+\./) {print $i; exit}}') -export RAY_PORT=6379 - # Environment variables export LIST_TO_STACK=1 export VLLM_USE_V1=0 diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh index 8181c7d303a..df123b7395b 100644 --- a/sota-implementations/grpo/run_in_ray_cluster.sh +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -5,8 +5,22 @@ set -euo pipefail # Get command from argument CMD="$1" +# Set up Ray cluster configuration +HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) +RAY_PORT=6379 + # Get current node name (normalize hostname) CURRENT_NODE=$(hostname | cut -d. -f1) + +# Get HEAD_NODE_IP +if [ "$SLURM_NODEID" -eq 0 ]; then + # We're on the head node, get our own IP + HEAD_NODE_IP=$(hostname -I | awk '{print $1}') +else + # We're on a worker, resolve the head node's IP using DNS + HEAD_NODE_IP=$(getent hosts "$HEAD_NODE" | awk '{print $1}') +fi + # Set up cleanup function cleanup() { if command -v ray &>/dev/null; then From 8bbb9233655ba4d2d2366541533900f529c492b5 Mon Sep 17 00:00:00 2001 From: albertbou Date: Sun, 6 Jul 2025 21:03:19 -0700 Subject: [PATCH 14/15] fixes --- sota-implementations/grpo/run_in_ray_cluster.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sota-implementations/grpo/run_in_ray_cluster.sh b/sota-implementations/grpo/run_in_ray_cluster.sh index df123b7395b..5325e26737c 100644 --- a/sota-implementations/grpo/run_in_ray_cluster.sh +++ b/sota-implementations/grpo/run_in_ray_cluster.sh @@ -9,7 +9,7 @@ CMD="$1" HEAD_NODE=$(scontrol show hostname "$SLURM_NODELIST" | head -n 1) RAY_PORT=6379 -# Get current node name (normalize hostname) +# Get current node name CURRENT_NODE=$(hostname | cut -d. -f1) # Get HEAD_NODE_IP @@ -51,7 +51,6 @@ if [ "$SLURM_NODEID" -eq 0 ]; then bash -c "$CMD" else # Worker nodes just wait for the head to finish - echo "Worker node $CURRENT_NODE waiting for head node to complete..." while ray status --address="$HEAD_NODE_IP:$RAY_PORT" &>/dev/null; do sleep 10 done From 2f7778e2c44e61b52b40d5754a00cfa743a5326e Mon Sep 17 00:00:00 2001 From: albertbou Date: Mon, 7 Jul 2025 12:40:40 -0700 Subject: [PATCH 15/15] fixes --- sota-implementations/grpo/grpo-async-multi-node.sbatch | 2 +- sota-implementations/grpo/grpo-sync-multi-node.sbatch | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sota-implementations/grpo/grpo-async-multi-node.sbatch b/sota-implementations/grpo/grpo-async-multi-node.sbatch index 968fb1a8896..5abb5d2b167 100644 --- a/sota-implementations/grpo/grpo-async-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-async-multi-node.sbatch @@ -5,7 +5,7 @@ #SBATCH --cpus-per-task=96 #SBATCH --exclusive #SBATCH --output=logs/%x.job%j.out -#SBATCH --time=1:00:00 +#SBATCH --time=24:00:00 # Exit on any error set -euo pipefail diff --git a/sota-implementations/grpo/grpo-sync-multi-node.sbatch b/sota-implementations/grpo/grpo-sync-multi-node.sbatch index 98c37600fa4..b3044279c42 100644 --- a/sota-implementations/grpo/grpo-sync-multi-node.sbatch +++ b/sota-implementations/grpo/grpo-sync-multi-node.sbatch @@ -5,7 +5,7 @@ #SBATCH --cpus-per-task=96 #SBATCH --exclusive #SBATCH --output=logs/%x.job%j.out -#SBATCH --time=1:00:00 +#SBATCH --time=24:00:00 # Exit on any error set -euo pipefail