Skip to content

Commit c2275c9

Browse files
hanwen-clusterhanwen-pcluste
authored andcommitted
[Integ-tests] Improve test for external slurm dbd
Add a check to verify accounting information can be retrieved from another cluster. Add a check to verify relaunching Slurm DBD instance does not lose job information Signed-off-by: Hanwen <hanwenli@amazon.com>
1 parent d1474d3 commit c2275c9

File tree

2 files changed

+59
-5
lines changed

2 files changed

+59
-5
lines changed

tests/integration-tests/tests/common/schedulers_common.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,12 +588,16 @@ def get_accounting_users(
588588
return (dict(zip(fields, columns)) for columns in SlurmCommands._split_accounting_results(users))
589589

590590
def get_accounting_job_records(
591-
self, job_id, fields=("jobid", "jobname", "partition", "account", "alloccpus", "state", "exitcode")
591+
self,
592+
job_id,
593+
fields=("jobid", "jobname", "partition", "account", "alloccpus", "state", "exitcode"),
594+
clusters=None,
592595
):
593596
"""Return job steps of {job_id} as a series of dicts."""
594-
records = self._remote_command_executor.run_remote_command(
595-
f"sacct -nP -j {job_id} -o {','.join(fields)}"
596-
).stdout
597+
command = f"sacct -nP -j {job_id} -o {','.join(fields)}"
598+
if clusters:
599+
command = command + f" --clusters {clusters}"
600+
records = self._remote_command_executor.run_remote_command(command).stdout
597601
return (dict(zip(fields, columns)) for columns in SlurmCommands._split_accounting_results(records))
598602

599603
@staticmethod

tests/integration-tests/tests/schedulers/test_slurm_accounting.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import re
44

5+
import boto3
56
import pytest
67
from assertpy import assert_that
78
from remote_command_executor import RemoteCommandExecutor
@@ -122,7 +123,11 @@ def _test_jobs_get_recorded(scheduler_commands):
122123
job_id = scheduler_commands.assert_job_submitted(job_submission_output)
123124
logging.info(" Submitted Job ID: %s", job_id)
124125
scheduler_commands.wait_job_completed(job_id)
125-
results = scheduler_commands.get_accounting_job_records(job_id)
126+
_assert_job_completion_recorded_in_accounting(job_id, scheduler_commands)
127+
128+
129+
def _assert_job_completion_recorded_in_accounting(job_id, scheduler_commands, clusters=None):
130+
results = scheduler_commands.get_accounting_job_records(job_id, clusters=clusters)
126131
for row in results:
127132
logging.info(" Result: %s", row)
128133
assert_that(row.get("state")).is_equal_to("COMPLETED")
@@ -238,13 +243,19 @@ def test_slurm_accounting_external_dbd(
238243
**config_params,
239244
)
240245
cluster = clusters_factory(cluster_config)
246+
# Don't wait on the second cluster creation so that we can test the first cluster in the meantime.
241247
cluster_2 = clusters_factory(cluster_config, wait=False)
242248

249+
logging.info("Testing the first cluster")
243250
_check_cluster_external_dbd(cluster, config_params, region, scheduler_commands_factory, test_resources_dir)
244251

252+
logging.info("Testing the second cluster")
245253
cluster_2.wait_cluster_status("CREATE_COMPLETE")
246254
_check_cluster_external_dbd(cluster_2, config_params, region, scheduler_commands_factory, test_resources_dir)
247255

256+
logging.info("Testing the inter-clusters slurm accounting information")
257+
_check_inter_clusters_external_dbd(cluster, cluster_2, scheduler_commands_factory)
258+
248259

249260
def _check_cluster_external_dbd(cluster, config_params, region, scheduler_commands_factory, test_resources_dir):
250261
headnode_remote_command_executor = RemoteCommandExecutor(cluster)
@@ -262,3 +273,42 @@ def _check_cluster_external_dbd(cluster, config_params, region, scheduler_comman
262273
headnode_remote_command_executor,
263274
)
264275
_test_jobs_get_recorded(scheduler_commands)
276+
277+
278+
def _check_inter_clusters_external_dbd(cluster_1, cluster_2, scheduler_commands_factory):
279+
"""
280+
Verify accounting information can be retrieved from another cluster
281+
and information is not lost after AutoScaling group replaces Slurm DBD instance.
282+
"""
283+
headnode_remote_command_executor_1 = RemoteCommandExecutor(cluster_1)
284+
scheduler_commands_1 = scheduler_commands_factory(headnode_remote_command_executor_1)
285+
headnode_remote_command_executor_2 = RemoteCommandExecutor(cluster_2)
286+
scheduler_commands_2 = scheduler_commands_factory(headnode_remote_command_executor_2)
287+
288+
job_ids = []
289+
for index in range(20): # 20 is an arbitrary number and can be changed.
290+
job_submission_output = scheduler_commands_1.submit_command(
291+
'echo "$(hostname) ${SLURM_JOB_ACCOUNT} ${SLURM_JOB_ID} ${SLURM_JOB_NAME}"',
292+
).stdout
293+
job_id = scheduler_commands_1.assert_job_submitted(job_submission_output)
294+
job_ids.append(job_id)
295+
logging.info(" Submitted Job ID: %s", job_id)
296+
if index == 10:
297+
logging.info(
298+
"Terminating the Slurm DBD instance to test robustness of the setup: "
299+
"Job information should be synced after AutoScaling group launches another Slurm DBD instance."
300+
)
301+
ec2_client = boto3.client("ec2")
302+
slurm_dbd_instance_id = ec2_client.describe_instances(
303+
Filters=[
304+
{"Name": "instance-state-name", "Values": ["running"]},
305+
{"Name": "tag:aws:cloudformation:logical-id", "Values": ["ExternalSlurmdbdASG"]},
306+
]
307+
)["Reservations"][0]["Instances"][0]["InstanceId"]
308+
ec2_client.terminate_instances(InstanceIds=[slurm_dbd_instance_id])
309+
310+
logging.info("Checking jobs information from another cluster.")
311+
for job_id in job_ids:
312+
retry(stop_max_attempt_number=30, wait_fixed=seconds(20))(_assert_job_completion_recorded_in_accounting)(
313+
job_id, scheduler_commands_2, clusters=cluster_1.name
314+
)

0 commit comments

Comments
 (0)