Skip to content

Commit f2883fe

Browse files
authored
[Subnet Prioritization] Support capacity-optimized-prioritized and prioritized Allocation Strategy (#6865)
* [Subnet Prioritization] Add prioritized|capacity-optimized-prioritized to AllocationStrategy configuration; Add new configuration of enable_single_availability_zone; Signed-off-by: Hanxuan Zhang <hanxuanz@amazon.com> * [Subnet Prioritization] Add test cases for instance allocation strategy validator * [Subnet Prioritization] Update the default value and update policy of enable_single_availability_zone * [Subnet Prioritization] Move AllocationStrategy Enum from pcluster.config.cluster_config to pcluster.config.common * [Subnet Prioritization] Add validator and validator test for enable_single_availability_zone * [Subnet Prioritization] Move AllocationStrategy Enum from cluster_config.py to common.py * Revert "[Subnet Prioritization] Move AllocationStrategy Enum from cluster_config.py to common.py" This reverts commit 13d8cfa. * [Subnet Prioritization] Register enable_single_availability_zone_validator in cluster_config.py and add registration tests * [Subnet Prioritization] Change default value of enable_single_availability_zone from False to None * [Subnet Prioritization] Add enable_single_availability_zone parameter to slurm.full_config.snapshot.yaml * [Subnet Prioritization] Fix format issues * [Subnet Prioritization] Update CHANGELOG.md * [Subnet Prioritization] Remove duplicated AllocationStrategy Enum * [Subnet Prioritization] Remove duplicated EnableSingleAvailabilityZoneValidator registration * [Subnet Prioritization] Update the failure message of InstancesAllocationStrategyValidator * [Subnet Prioritization] Update enable_single_availability_zone_validator registration test * [Subnet Prioritization] Update format * [Subnet Prioritization] Fix EnableSingleAvailabilityZoneValidator * [Subnet Prioritization] Fix format issue * [Subnet Prioritization] Add integration test for subnet prioritization * [Subnet Prioritization] Update integration test for subnet prioritization * [Subnet Prioritization] Remove EnableSingleAvailabilityZone parameter from configuration * [Subnet Prioritization] Update Integration Test * [Subnet Prioritization] Fix format issues * [Subnet Prioritization] Remove EnableSingleAvailabilityZone from integration test * [Subnet Prioritization] Move AllocationStrategy Enum from common.py back to cluster_config.py * [Subnet Prioritization] Update integration test and format * [Subnet Prioritization] Remove custom hardcode settings from integration test config file * [Subnet Prioritization] Update format in instances_validators.py --------- Signed-off-by: Hanxuan Zhang <hanxuanz@amazon.com>
1 parent 60f744d commit f2883fe

File tree

8 files changed

+153
-13
lines changed

8 files changed

+153
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
**CHANGES**
88
- Ubuntu 20.04 is no longer supported.
9+
- Support prioritized and capacity-optimized-prioritized Allocation Strategy
910
- Support DCV on Amazon Linux 2023.
1011
- Upgrade Python runtime used by Lambda functions to python3.12 (from python3.9).
1112
- Remove `berkshelf`. All cookbooks are local and do not need `berkshelf` dependency management.

cli/src/pcluster/config/cluster_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2578,6 +2578,8 @@ class AllocationStrategy(Enum):
25782578
LOWEST_PRICE = "lowest-price"
25792579
CAPACITY_OPTIMIZED = "capacity-optimized"
25802580
PRICE_CAPACITY_OPTIMIZED = "price-capacity-optimized"
2581+
PRIORITIZED = "prioritized"
2582+
CAPACITY_OPTIMIZED_PRIORITIZED = "capacity-optimized-prioritized"
25812583

25822584

25832585
class SlurmQueue(_CommonQueue):

cli/src/pcluster/validators/instances_validators.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,17 +221,22 @@ class InstancesAllocationStrategyValidator(Validator, _FlexibleInstanceTypesVali
221221
"""Confirm Allocation Strategy matches with the Capacity Type."""
222222

223223
def _validate(self, compute_resource_name: str, capacity_type: Enum, allocation_strategy: Enum, **kwargs):
224-
"""On-demand Capacity type only supports "lowest-price" allocation strategy."""
224+
"""On-demand Capacity type only supports "lowest-price" and "prioritized" allocation strategy."""
225+
valid_on_demand_allocation_strategy = {
226+
cluster_config.AllocationStrategy.LOWEST_PRICE,
227+
cluster_config.AllocationStrategy.PRIORITIZED,
228+
}
225229
if (
226230
capacity_type == cluster_config.CapacityType.ONDEMAND
227231
and allocation_strategy
228-
and allocation_strategy != cluster_config.AllocationStrategy.LOWEST_PRICE
232+
and allocation_strategy not in valid_on_demand_allocation_strategy
229233
):
230234
alloc_strategy_msg = allocation_strategy.value if allocation_strategy else "not set"
231235
self._add_failure(
232-
f"Compute Resource {compute_resource_name} is using an OnDemand CapacityType but the Allocation "
233-
f"Strategy specified is {alloc_strategy_msg}. OnDemand CapacityType can only use '"
234-
f"{cluster_config.AllocationStrategy.LOWEST_PRICE.value}' allocation strategy.",
236+
f"Compute Resource {compute_resource_name} is using an OnDemand CapacityType but "
237+
f"the Allocation Strategy specified is {alloc_strategy_msg}. OnDemand CapacityType can only use '"
238+
f"{cluster_config.AllocationStrategy.LOWEST_PRICE.value}' or "
239+
f"'{cluster_config.AllocationStrategy.PRIORITIZED.value}' allocation strategy.",
235240
FailureLevel.ERROR,
236241
)
237242
if capacity_type == cluster_config.CapacityType.CAPACITY_BLOCK and allocation_strategy:
@@ -241,6 +246,19 @@ def _validate(self, compute_resource_name: str, capacity_type: Enum, allocation_
241246
"When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
242247
FailureLevel.ERROR,
243248
)
249+
if (
250+
capacity_type == cluster_config.CapacityType.SPOT
251+
and allocation_strategy == cluster_config.AllocationStrategy.PRIORITIZED
252+
):
253+
self._add_failure(
254+
f"Compute Resource {compute_resource_name} is using a SPOT CapacityType but the "
255+
f"Allocation Strategy specified is {allocation_strategy.value}. SPOT CapacityType can only use "
256+
f"'{cluster_config.AllocationStrategy.LOWEST_PRICE.value}', "
257+
f"'{cluster_config.AllocationStrategy.CAPACITY_OPTIMIZED.value}', "
258+
f"'{cluster_config.AllocationStrategy.PRICE_CAPACITY_OPTIMIZED.value}' "
259+
f"or '{cluster_config.AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED.value}' allocation strategy.",
260+
FailureLevel.ERROR,
261+
)
244262

245263

246264
class InstancesMemorySchedulingWarningValidator(Validator):

cli/tests/pcluster/validators/test_all_validators/test_slurm_validators_are_called_with_correct_argument/slurm.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Scheduling:
3939
- Name: compute_resource2
4040
InstanceType: c4.2xlarge
4141
- Name: queue2
42+
AllocationStrategy: "prioritized"
4243
Networking:
4344
SubnetIds:
4445
- subnet-23456789

cli/tests/pcluster/validators/test_instances_validators.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -591,31 +591,49 @@ def test_instances_networking_validator(
591591
CapacityType.ONDEMAND,
592592
AllocationStrategy.CAPACITY_OPTIMIZED,
593593
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation Strategy "
594-
"specified is capacity-optimized. OnDemand CapacityType can only use 'lowest-price' allocation strategy.",
594+
"specified is capacity-optimized. "
595+
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
595596
),
596597
(
597598
CapacityType.ONDEMAND,
598599
AllocationStrategy.PRICE_CAPACITY_OPTIMIZED,
599600
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation Strategy "
600601
"specified is price-capacity-optimized. "
601-
"OnDemand CapacityType can only use 'lowest-price' allocation strategy.",
602+
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
602603
),
604+
(
605+
CapacityType.ONDEMAND,
606+
AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
607+
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation "
608+
"Strategy specified is capacity-optimized-prioritized. "
609+
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
610+
),
611+
(CapacityType.ONDEMAND, AllocationStrategy.PRIORITIZED, ""),
603612
(CapacityType.ONDEMAND, AllocationStrategy.LOWEST_PRICE, ""),
604613
(CapacityType.ONDEMAND, None, ""),
605614
# Spot Capacity type supports both "lowest-price" and "capacity-optimized" allocation strategy
606615
(CapacityType.SPOT, AllocationStrategy.LOWEST_PRICE, ""),
607616
(CapacityType.SPOT, AllocationStrategy.CAPACITY_OPTIMIZED, ""),
608617
(CapacityType.SPOT, AllocationStrategy.PRICE_CAPACITY_OPTIMIZED, ""),
618+
(CapacityType.SPOT, AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED, ""),
609619
(CapacityType.SPOT, None, ""),
620+
(
621+
CapacityType.SPOT,
622+
AllocationStrategy.PRIORITIZED,
623+
"Compute Resource TestComputeResource is using a SPOT CapacityType but the "
624+
"Allocation Strategy specified is prioritized. SPOT CapacityType can only use "
625+
"'lowest-price', "
626+
"'capacity-optimized', "
627+
"'price-capacity-optimized' "
628+
"or 'capacity-optimized-prioritized' allocation strategy.",
629+
),
610630
# Capacity Block type supports does not support any allocation strategy
611631
(
612632
CapacityType.CAPACITY_BLOCK,
613633
AllocationStrategy.CAPACITY_OPTIMIZED,
614-
(
615-
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation "
616-
"Strategy specified is capacity-optimized. When using CAPACITY_BLOCK CapacityType, "
617-
"allocation strategy should not be set."
618-
),
634+
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation "
635+
"Strategy specified is capacity-optimized. When using CAPACITY_BLOCK CapacityType, "
636+
"allocation strategy should not be set.",
619637
),
620638
(
621639
CapacityType.CAPACITY_BLOCK,
@@ -632,6 +650,19 @@ def test_instances_networking_validator(
632650
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
633651
"specified is lowest-price. When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
634652
),
653+
(
654+
CapacityType.CAPACITY_BLOCK,
655+
AllocationStrategy.PRIORITIZED,
656+
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
657+
"specified is prioritized. When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
658+
),
659+
(
660+
CapacityType.CAPACITY_BLOCK,
661+
AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
662+
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
663+
"specified is capacity-optimized-prioritized. When using CAPACITY_BLOCK CapacityType, "
664+
"allocation strategy should not be set.",
665+
),
635666
(CapacityType.CAPACITY_BLOCK, None, ""),
636667
],
637668
)

tests/integration-tests/tests/networking/test_cluster_networking.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import boto3
1515
import pytest
16-
from assertpy import assert_that
16+
from assertpy import assert_that, soft_assertions
1717
from cfn_stacks_factory import CfnStack
1818
from constants import OSU_BENCHMARK_VERSION, UNSUPPORTED_OSES_FOR_DCV
1919
from fabric import Connection
@@ -24,16 +24,19 @@
2424
check_pcluster_list_cluster_log_streams,
2525
generate_stack_name,
2626
get_compute_nodes_instance_ids,
27+
get_compute_nodes_subnet_ids,
2728
get_username_for_os,
2829
is_dcv_supported,
2930
render_jinja_template,
3031
)
3132

3233
from tests.common.assertions import (
3334
assert_lambda_vpc_settings_are_correct,
35+
assert_msg_in_log,
3436
assert_no_errors_in_logs,
3537
assert_no_msg_in_logs,
3638
wait_for_num_instances_in_cluster,
39+
wait_for_num_instances_in_queue,
3740
)
3841
from tests.common.osu_common import compile_osu
3942
from tests.common.schedulers_common import SlurmCommands
@@ -238,3 +241,33 @@ def _run_mpi_jobs(mpi_variants, remote_command_executor, test_datadir, slurm_com
238241
slurm_commands.assert_job_succeeded(job_id)
239242
logging.info("Checking cluster has two nodes after running MPI jobs") # 1 static node + 1 dynamic node
240243
assert_that(len(get_compute_nodes_instance_ids(cluster.cfn_name, region))).is_equal_to(2)
244+
245+
246+
@pytest.mark.usefixtures("instance")
247+
def test_cluster_with_subnet_prioritization(
248+
region, os, pcluster_config_reader, clusters_factory, vpc_stack, scheduler_commands_factory
249+
):
250+
# Create cluster with subnet prioritization
251+
init_config_file = pcluster_config_reader(config_file="pcluster.config.yaml")
252+
cluster = clusters_factory(init_config_file)
253+
254+
remote_command_executor = RemoteCommandExecutor(cluster)
255+
scheduler_commands = scheduler_commands_factory(remote_command_executor)
256+
public_subnets = vpc_stack.get_all_public_subnets()
257+
queues = ["queue1", "queue2"]
258+
logging.info(f"Public subnets: {public_subnets}")
259+
# Check that all instances are launched in the subnet with the highest priority
260+
with soft_assertions():
261+
for queue in queues:
262+
scheduler_commands.submit_command("sleep 60", nodes=5, partition=queue)
263+
wait_for_num_instances_in_queue(cluster.cfn_name, cluster.region, desired=5, queue=queue)
264+
265+
subnet_ids = get_compute_nodes_subnet_ids(cluster.cfn_name, region, node_type="Compute", queue_name=queue)
266+
logging.info(f"Subnets: {subnet_ids}")
267+
for subnet_id in subnet_ids:
268+
assert_that(subnet_id).is_equal_to(public_subnets[0])
269+
270+
# Check that the CreateFleet request contains priorities for each subnet
271+
slurm_resume_log = "/var/log/parallelcluster/slurm_resume.log"
272+
assert_msg_in_log(remote_command_executor, slurm_resume_log, f"'SubnetId': '{public_subnets[0]}', 'Priority': 0.0")
273+
assert_msg_in_log(remote_command_executor, slurm_resume_log, f"'SubnetId': '{public_subnets[1]}', 'Priority': 1.0")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
Image:
2+
Os: {{ os }}
3+
HeadNode:
4+
SharedStorageType: {{ shared_headnode_storage_type }}
5+
InstanceType: {{ instance }}
6+
Networking:
7+
SubnetId: {{ public_subnet_id }}
8+
Ssh:
9+
KeyName: {{ key_name }}
10+
Scheduling:
11+
Scheduler: slurm
12+
SlurmQueues:
13+
- Name: queue1
14+
CapacityType: ONDEMAND
15+
AllocationStrategy: prioritized
16+
ComputeResources:
17+
- Name: queue1-i1
18+
Instances:
19+
- InstanceType: {{ instance }}
20+
MinCount: 0
21+
MaxCount: 10
22+
Networking:
23+
SubnetIds:
24+
- {{ public_subnet_ids[0] }}
25+
- {{ public_subnet_ids[1] }}
26+
- Name: queue2
27+
CapacityType: SPOT
28+
AllocationStrategy: capacity-optimized-prioritized
29+
ComputeResources:
30+
- Name: queue2-i1
31+
Instances:
32+
- InstanceType: {{ instance }}
33+
MinCount: 0
34+
MaxCount: 10
35+
Networking:
36+
SubnetIds:
37+
- {{ public_subnet_ids[0] }}
38+
- {{ public_subnet_ids[1] }}

tests/integration-tests/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,22 @@ def get_cluster_nodes_instance_ids(stack_name, region, instance_types=None, node
365365
raise
366366

367367

368+
def get_compute_nodes_subnet_ids(stack_name, region, instance_types=None, node_type=None, queue_name=None):
369+
"""Return a list of cluster Instances Subnet Ids."""
370+
try:
371+
instances = describe_cluster_instances(
372+
stack_name,
373+
region,
374+
filter_by_node_type=node_type,
375+
filter_by_instance_types=instance_types,
376+
filter_by_queue_name=queue_name,
377+
)
378+
return [instance["SubnetId"] for instance in instances]
379+
except Exception as e:
380+
logging.error("Failed retrieving instance ids with exception: %s", e)
381+
raise
382+
383+
368384
def get_compute_nodes_instance_ips(stack_name, region):
369385
"""Return a list of compute Instances Ip's."""
370386
try:

0 commit comments

Comments
 (0)