Skip to content

Commit e47af17

Browse files
authored
Add support for Data Repository Associations with Pcluster Managed FSx for Lustre (#5693)
* Add support for DRAs as an option for s3 association with an FSx file system. FSx released Persistent_2 which no longer supports the use of ExportPath and ImportPath. Instead, it uses DRAs to configure S3 association. * Add integration and unit tests to cover the creation, modification, and deletion of DRAs
1 parent 95de94b commit e47af17

File tree

17 files changed

+438
-17
lines changed

17 files changed

+438
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CHANGELOG
55
------
66

77
**ENHANCEMENTS**
8-
8+
- Add support for Data Repository Associations when using PERSISTENT_2 as DeploymentType for a managed FSx for Lustre.
99
**CHANGES**
1010

1111
**BUG FIXES**

cli/src/pcluster/aws/fsx.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(self):
2121
self.svm_cache = {}
2222
self.volume_cache = {}
2323
self.fc_cache = {}
24+
self.dra_cache = {}
2425

2526
@AWSExceptionHandler.handle_client_exception
2627
def get_file_systems_info(self, fsx_fs_ids):
@@ -84,6 +85,24 @@ def describe_volumes(self, volume_ids):
8485
result.append(volume)
8586
return result
8687

88+
@AWSExceptionHandler.handle_client_exception
89+
def describe_data_repository_associations(self, dra_ids):
90+
"""Describe FSx data repository associations."""
91+
result = []
92+
missed_dra_ids = []
93+
for dra_id in dra_ids:
94+
cached_data = self.dra_cache.get(dra_id)
95+
if cached_data:
96+
result.append(cached_data)
97+
else:
98+
missed_dra_ids.append(dra_id)
99+
if missed_dra_ids:
100+
response = self._client.describe_data_repository_associations(AssociationIds=missed_dra_ids)["Associations"]
101+
for dra in response:
102+
self.dra_cache[dra.get("AssociationId")] = dra
103+
result.append(dra)
104+
return result
105+
87106
@AWSExceptionHandler.handle_client_exception
88107
def describe_backup(self, backup_id):
89108
"""Describe backup."""

cli/src/pcluster/config/cluster_config.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
FsxAutoImportValidator,
143143
FsxBackupIdValidator,
144144
FsxBackupOptionsValidator,
145+
FsxDraValidator,
145146
FsxPersistentOptionsValidator,
146147
FsxS3Validator,
147148
FsxStorageCapacityValidator,
@@ -431,6 +432,38 @@ def existing_dns_name(self):
431432
return self.file_system_data.dns_name if self.is_unmanaged else ""
432433

433434

435+
class DataRepositoryAssociation(Resource):
436+
"""Represent the Data Repository Association resource."""
437+
438+
def __init__(
439+
self,
440+
name: str,
441+
data_repository_path: str,
442+
file_system_path: str,
443+
batch_import_meta_data_on_create: bool = None,
444+
imported_file_chunk_size: int = None,
445+
auto_export_policy: List[str] = None,
446+
auto_import_policy: List[str] = None,
447+
**kwargs,
448+
):
449+
super().__init__(**kwargs)
450+
self.name = Resource.init_param(name)
451+
self.batch_import_meta_data_on_create = Resource.init_param(batch_import_meta_data_on_create, default=False)
452+
self.data_repository_path = Resource.init_param(data_repository_path)
453+
self.file_system_path = Resource.init_param(file_system_path)
454+
self.imported_file_chunk_size = Resource.init_param(imported_file_chunk_size, default=1024)
455+
self.auto_export_policy = Resource.init_param(auto_export_policy)
456+
self.auto_import_policy = Resource.init_param(auto_import_policy)
457+
458+
def _register_validators(self, context: ValidatorContext = None):
459+
super()._register_validators(context)
460+
self._register_validator(SharedStorageNameValidator, name=self.name)
461+
self._register_validator(S3BucketUriValidator, url=self.data_repository_path)
462+
self._register_validator(
463+
FsxAutoImportValidator, auto_import_policy=self.auto_import_policy, import_path=self.data_repository_path
464+
)
465+
466+
434467
class SharedFsxLustre(BaseSharedFsx):
435468
"""Represent the shared FSX resource."""
436469

@@ -454,6 +487,7 @@ def __init__(
454487
drive_cache_type: str = None,
455488
fsx_storage_type: str = None,
456489
deletion_policy: str = None,
490+
data_repository_associations: List[DataRepositoryAssociation] = None,
457491
**kwargs,
458492
):
459493
super().__init__(**kwargs)
@@ -481,9 +515,16 @@ def __init__(
481515
self.deletion_policy = Resource.init_param(
482516
deletion_policy, default=DELETE_POLICY if not file_system_id else None
483517
)
518+
self.data_repository_associations = Resource.init_param(data_repository_associations)
484519

485520
def _register_validators(self, context: ValidatorContext = None):
486521
super()._register_validators(context)
522+
self._register_validator(
523+
FsxDraValidator,
524+
data_repository_associations=self.data_repository_associations,
525+
import_path=self.import_path,
526+
export_path=self.export_path,
527+
)
487528
self._register_validator(
488529
FsxS3Validator,
489530
import_path=self.import_path,

cli/src/pcluster/config/config_patch.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -189,18 +189,19 @@ def _compare_list(self, base_section, target_section, param_path, data_key, fiel
189189
)
190190
)
191191
# Then, compare all non visited base sections vs target config.
192-
for base_nested_section in base_section.get(data_key, []):
193-
if not base_nested_section.get("visited", False):
194-
self.changes.append(
195-
Change(
196-
param_path,
197-
data_key,
198-
base_nested_section,
199-
None,
200-
change_update_policy,
201-
is_list=True,
192+
if base_section:
193+
for base_nested_section in base_section.get(data_key, []):
194+
if not base_nested_section.get("visited", False):
195+
self.changes.append(
196+
Change(
197+
param_path,
198+
data_key,
199+
base_nested_section,
200+
None,
201+
change_update_policy,
202+
is_list=True,
203+
)
202204
)
203-
)
204205

205206
@property
206207
def update_policy_level(self):

cli/src/pcluster/schemas/cluster_schema.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
CustomActions,
4343
Dashboards,
4444
Database,
45+
DataRepositoryAssociation,
4546
Dcv,
4647
DirectoryService,
4748
Dns,
@@ -355,6 +356,38 @@ def validate_existence_of_mode_throughput(self, data, **kwargs):
355356
)
356357

357358

359+
class DataRepositoryAssociationSchema(BaseSchema):
360+
"""Represent the DRA schema."""
361+
362+
name = fields.Str(required=True, metadata={"update_policy": UpdatePolicy.UNSUPPORTED})
363+
batch_import_meta_data_on_create = fields.Bool(metadata={"update_policy": UpdatePolicy.UNSUPPORTED})
364+
data_repository_path = fields.Str(required=True, metadata={"update_policy": UpdatePolicy.UNSUPPORTED})
365+
file_system_path = fields.Str(
366+
required=True,
367+
validate=validate.Regexp(r"^[^\u0000\u0085\u2028\u2029\r\n]{1,4096}$"),
368+
metadata={"update_policy": UpdatePolicy.UNSUPPORTED},
369+
)
370+
imported_file_chunk_size = fields.Int(
371+
validate=validate.Range(min=1, max=512000, error="has a minimum size of 1 MiB, and max size of 512,000 MiB"),
372+
metadata={"update_policy": UpdatePolicy.SUPPORTED},
373+
)
374+
auto_export_policy = fields.List(
375+
fields.Str(validate=validate.OneOf(["NEW", "CHANGED", "DELETED"])),
376+
validate=validate.Length(max=3),
377+
metadata={"update_policy": UpdatePolicy.SUPPORTED},
378+
)
379+
auto_import_policy = fields.List(
380+
fields.Str(validate=validate.OneOf(["NEW", "CHANGED", "DELETED"])),
381+
validate=validate.Length(max=3),
382+
metadata={"update_policy": UpdatePolicy.SUPPORTED},
383+
)
384+
385+
@post_load
386+
def make_resource(self, data, **kwargs):
387+
"""Generate resource."""
388+
return DataRepositoryAssociation(**data)
389+
390+
358391
class FsxLustreSettingsSchema(BaseSchema):
359392
"""Represent the FSX schema."""
360393

@@ -408,6 +441,14 @@ class FsxLustreSettingsSchema(BaseSchema):
408441
deletion_policy = fields.Str(
409442
validate=validate.OneOf(DELETION_POLICIES), metadata={"update_policy": UpdatePolicy.SUPPORTED}
410443
)
444+
data_repository_associations = fields.Nested(
445+
DataRepositoryAssociationSchema,
446+
many=True,
447+
metadata={
448+
"update_policy": UpdatePolicy.SUPPORTED,
449+
"update_key": "Name",
450+
},
451+
)
411452

412453
@validates_schema
413454
def validate_file_system_id_ignored_parameters(self, data, **kwargs):

cli/src/pcluster/templates/cluster_stack.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,9 @@ def _add_fsx_storage(self, id: str, shared_fsx: BaseSharedFsx):
962962
) = convert_deletion_policy(shared_fsx.deletion_policy)
963963

964964
fsx_id = fsx_resource.ref
965+
966+
self._add_dra(fsx_id, shared_fsx)
967+
965968
# Get MountName for new filesystem. DNSName cannot be retrieved from CFN and will be generated in cookbook
966969
mount_name = fsx_resource.attr_lustre_mount_name
967970

@@ -976,6 +979,30 @@ def _add_fsx_storage(self, id: str, shared_fsx: BaseSharedFsx):
976979

977980
return fsx_id
978981

982+
def _add_dra(self, fsx_id: str, shared_fsx: BaseSharedFsx):
983+
"""Add Cfn Data Repository Association Resources."""
984+
if shared_fsx.data_repository_associations:
985+
for dra in shared_fsx.data_repository_associations:
986+
dra_id = "{0}{1}".format(dra.name, create_hash_suffix(dra.name))
987+
fsx.CfnDataRepositoryAssociation(
988+
self.stack,
989+
dra_id,
990+
batch_import_meta_data_on_create=dra.batch_import_meta_data_on_create,
991+
data_repository_path=dra.data_repository_path,
992+
file_system_id=fsx_id,
993+
file_system_path=dra.file_system_path,
994+
imported_file_chunk_size=dra.imported_file_chunk_size,
995+
s3=fsx.CfnDataRepositoryAssociation.S3Property(
996+
auto_export_policy=fsx.CfnDataRepositoryAssociation.AutoExportPolicyProperty(
997+
events=dra.auto_export_policy
998+
),
999+
auto_import_policy=fsx.CfnDataRepositoryAssociation.AutoImportPolicyProperty(
1000+
events=dra.auto_import_policy
1001+
),
1002+
),
1003+
tags=[CfnTag(key="Name", value=dra.name)],
1004+
)
1005+
9791006
def _add_efs_storage(self, id: str, shared_efs: SharedEfs):
9801007
"""Add specific Cfn Resources to map the EFS storage."""
9811008
# EFS FileSystem

cli/src/pcluster/validators/fsx_validators.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,28 @@ def _validate(
4848
)
4949

5050

51+
class FsxDraValidator(Validator):
52+
"""
53+
FSX Dra validator.
54+
55+
Verify compatibility of given S3 association options for FSX.
56+
"""
57+
58+
def _validate(self, data_repository_associations, import_path, export_path):
59+
if data_repository_associations and (import_path or export_path):
60+
self._add_failure(
61+
"When specifying data repository associations, import and export path "
62+
"can not be used on the same file system.",
63+
FailureLevel.ERROR,
64+
)
65+
66+
if data_repository_associations and len(data_repository_associations) > 8:
67+
self._add_failure(
68+
"The number of data repository association used for one file system cannot be greater that 8.",
69+
FailureLevel.ERROR,
70+
)
71+
72+
5173
class FsxPersistentOptionsValidator(Validator):
5274
"""
5375
FSX persistent options validator.

cli/tests/pcluster/aws/test_fsx.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,45 @@ def get_describe_file_caches_mocked_request(file_cache_ids):
134134
)
135135

136136

137+
def get_describe_data_repository_associations_mocked_request(dra_ids, lifecycle):
138+
return MockedBoto3Request(
139+
method="describe_data_repository_associations",
140+
response={"Associations": [{"AssociationId": dra_id, "Lifecycle": lifecycle} for dra_id in dra_ids]},
141+
expected_params={"AssociationIds": dra_ids},
142+
)
143+
144+
145+
def test_describe_data_repository_associations(boto3_stubber):
146+
dra_id = "dra-012345678"
147+
additional_dra_id = "dra-876543210"
148+
# The first mocked request and the third are about the same fsx. However, the lifecycle of the fsx changes
149+
# from CREATING to AVAILABLE. The second mocked request is about another fsx
150+
mocked_requests = [
151+
get_describe_data_repository_associations_mocked_request([dra_id], "CREATING"),
152+
get_describe_data_repository_associations_mocked_request([additional_dra_id], "CREATING"),
153+
get_describe_data_repository_associations_mocked_request([dra_id], "AVAILABLE"),
154+
]
155+
boto3_stubber("fsx", mocked_requests)
156+
assert_that(AWSApi.instance().fsx.describe_data_repository_associations([dra_id])[0]["Lifecycle"]).is_equal_to(
157+
"CREATING"
158+
)
159+
160+
# Second boto3 call with more volumes. The volume already cached should not be included in the boto3 call.
161+
response = AWSApi.instance().fsx.describe_data_repository_associations([dra_id, additional_dra_id])
162+
assert_that(response).is_length(2)
163+
164+
# Third boto3 call. The result should be from cache even if the lifecycle of the SVM is different
165+
assert_that(AWSApi.instance().fsx.describe_data_repository_associations([dra_id])[0]["Lifecycle"]).is_equal_to(
166+
"CREATING"
167+
)
168+
169+
# Fourth boto3 call after resetting the AWSApi instance. The latest fsx lifecycle should be retrieved from boto3
170+
AWSApi.reset()
171+
assert_that(AWSApi.instance().fsx.describe_data_repository_associations([dra_id])[0]["Lifecycle"]).is_equal_to(
172+
"AVAILABLE"
173+
)
174+
175+
137176
def test_describe_file_caches(boto3_stubber):
138177
file_cache_id = "fc-12345678"
139178
additional_file_cache_id = "fc-123456789012345678"

cli/tests/pcluster/example_configs/slurm.full.yaml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ SharedStorage:
182182
FsxLustreSettings:
183183
StorageCapacity: 3600
184184
DeploymentType: PERSISTENT_1 # PERSISTENT_1 | PERSISTENT_2 | SCRATCH_1 | SCRATCH_2
185-
ImportedFileChunkSize: 1024
185+
# ImportedFileChunkSize: 1024 # ImportedFileChunkSize cannot coexist with some of the fields
186186
DataCompressionType: LZ4
187-
ExportPath: s3://bucket/folder
188-
ImportPath: s3://bucket
187+
# ExportPath: s3://bucket/folder # ExportPath cannot coexist with some of the fields
188+
# ImportPath: s3://bucket # ImportPath cannot coexist with some of the fields
189189
WeeklyMaintenanceStartTime: "1:00:00"
190190
AutomaticBackupRetentionDays: 0
191191
CopyTagsToBackups: true
@@ -194,9 +194,17 @@ SharedStorage:
194194
# BackupId: backup-fedcba98 # BackupId cannot coexist with some of the fields
195195
KmsKeyId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
196196
# FileSystemId: fs-12345678123456789 # FileSystemId cannot coexist with other fields
197-
AutoImportPolicy: NEW # NEW | NEW_CHANGED | NEW_CHANGED_DELETED
197+
# AutoImportPolicy: NEW # NEW | NEW_CHANGED | NEW_CHANGED_DELETED # AutoImportPolicy cannot coexist with some of the fields
198198
DriveCacheType: READ # READ
199199
StorageType: HDD # HDD | SSD
200+
DataRepositoryAssociations:
201+
- Name: dra
202+
BatchImportMetaDataOnCreate: false
203+
DataRepositoryPath: s3://bucket/folder
204+
FileSystemPath: /
205+
ImportedFileChunkSize: 1024
206+
AutoExportPolicy: [ NEW, CHANGED, DELETED ]
207+
AutoImportPolicy: [ NEW, CHANGED, DELETED ]
200208
- MountDir: /my/mount/point4
201209
Name: name4
202210
StorageType: FsxOpenZfs

0 commit comments

Comments
 (0)