@@ -43,73 +43,73 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
43
43
"""
44
44
LOGGER .info (f"Evaluate Compliance Event: { event } " )
45
45
# Initialize AWS clients
46
- s3 = boto3 .client ('s3' )
46
+ s3 = boto3 .client ("s3" )
47
47
48
48
# Get rule parameters
49
- params = ast .literal_eval (event [' ruleParameters' ])
49
+ params = ast .literal_eval (event [" ruleParameters" ])
50
50
LOGGER .info (f"Parameters: { params } " )
51
- bucket_name = params .get (' BucketName' , '' )
52
- check_retention = params .get (' CheckRetention' , ' true' ).lower () != ' false'
53
- check_encryption = params .get (' CheckEncryption' , ' true' ).lower () != ' false'
54
- check_logging = params .get (' CheckLogging' , ' true' ).lower () != ' false'
55
- check_object_locking = params .get (' CheckObjectLocking' , ' true' ).lower () != ' false'
56
- check_versioning = params .get (' CheckVersioning' , ' true' ).lower () != ' false'
51
+ bucket_name = params .get (" BucketName" , "" )
52
+ check_retention = params .get (" CheckRetention" , " true" ).lower () != " false"
53
+ check_encryption = params .get (" CheckEncryption" , " true" ).lower () != " false"
54
+ check_logging = params .get (" CheckLogging" , " true" ).lower () != " false"
55
+ check_object_locking = params .get (" CheckObjectLocking" , " true" ).lower () != " false"
56
+ check_versioning = params .get (" CheckVersioning" , " true" ).lower () != " false"
57
57
58
58
# Check if the bucket exists
59
59
if not check_bucket_exists (bucket_name ):
60
- return build_evaluation (' NOT_APPLICABLE' , f"Bucket { bucket_name } does not exist or is not accessible" )
60
+ return build_evaluation (" NOT_APPLICABLE" , f"Bucket { bucket_name } does not exist or is not accessible" )
61
61
62
- compliance_type = ' COMPLIANT'
62
+ compliance_type = " COMPLIANT"
63
63
annotation = []
64
64
65
65
# Check retention
66
66
if check_retention :
67
67
try :
68
68
retention = s3 .get_bucket_lifecycle_configuration (Bucket = bucket_name )
69
- if not any (rule .get (' Expiration' ) for rule in retention .get (' Rules' , [])):
70
- compliance_type = ' NON_COMPLIANT'
69
+ if not any (rule .get (" Expiration" ) for rule in retention .get (" Rules" , [])):
70
+ compliance_type = " NON_COMPLIANT"
71
71
annotation .append ("Retention policy not set" )
72
72
except ClientError :
73
- compliance_type = ' NON_COMPLIANT'
73
+ compliance_type = " NON_COMPLIANT"
74
74
annotation .append ("Retention policy not set" )
75
75
76
76
# Check encryption
77
77
if check_encryption :
78
78
try :
79
79
encryption = s3 .get_bucket_encryption (Bucket = bucket_name )
80
- if ' ServerSideEncryptionConfiguration' not in encryption :
81
- compliance_type = ' NON_COMPLIANT'
80
+ if " ServerSideEncryptionConfiguration" not in encryption :
81
+ compliance_type = " NON_COMPLIANT"
82
82
annotation .append ("KMS CMK encryption not enabled" )
83
83
except ClientError :
84
- compliance_type = ' NON_COMPLIANT'
84
+ compliance_type = " NON_COMPLIANT"
85
85
annotation .append ("KMS CMK encryption not enabled" )
86
86
87
87
# Check logging
88
88
if check_logging :
89
89
logging = s3 .get_bucket_logging (Bucket = bucket_name )
90
- if ' LoggingEnabled' not in logging :
91
- compliance_type = ' NON_COMPLIANT'
90
+ if " LoggingEnabled" not in logging :
91
+ compliance_type = " NON_COMPLIANT"
92
92
annotation .append ("Server access logging not enabled" )
93
93
94
94
# Check object locking
95
95
if check_object_locking :
96
96
try :
97
97
object_locking = s3 .get_object_lock_configuration (Bucket = bucket_name )
98
- if ' ObjectLockConfiguration' not in object_locking :
99
- compliance_type = ' NON_COMPLIANT'
98
+ if " ObjectLockConfiguration" not in object_locking :
99
+ compliance_type = " NON_COMPLIANT"
100
100
annotation .append ("Object locking not enabled" )
101
101
except ClientError :
102
- compliance_type = ' NON_COMPLIANT'
102
+ compliance_type = " NON_COMPLIANT"
103
103
annotation .append ("Object locking not enabled" )
104
104
105
105
# Check versioning
106
106
if check_versioning :
107
107
versioning = s3 .get_bucket_versioning (Bucket = bucket_name )
108
- if versioning .get (' Status' ) != ' Enabled' :
109
- compliance_type = ' NON_COMPLIANT'
108
+ if versioning .get (" Status" ) != " Enabled" :
109
+ compliance_type = " NON_COMPLIANT"
110
110
annotation .append ("Versioning not enabled" )
111
111
112
- annotation_str = '; ' .join (annotation ) if annotation else "All checked features are compliant"
112
+ annotation_str = "; " .join (annotation ) if annotation else "All checked features are compliant"
113
113
return build_evaluation (compliance_type , annotation_str )
114
114
115
115
@@ -122,10 +122,10 @@ def check_bucket_exists(bucket_name: str) -> Any:
122
122
Returns:
123
123
Any: True if the bucket exists and is accessible, False otherwise
124
124
"""
125
- s3 = boto3 .client ('s3' )
125
+ s3 = boto3 .client ("s3" )
126
126
try :
127
127
response = s3 .list_buckets ()
128
- buckets = [bucket [' Name' ] for bucket in response [' Buckets' ]]
128
+ buckets = [bucket [" Name" ] for bucket in response [" Buckets" ]]
129
129
return bucket_name in buckets
130
130
except ClientError as e :
131
131
LOGGER .info (f"An error occurred: { e } " )
@@ -143,11 +143,7 @@ def build_evaluation(compliance_type: str, annotation: str) -> Any:
143
143
Any: The evaluation compliance type and annotation
144
144
"""
145
145
LOGGER .info (f"Build Evaluation Compliance Type: { compliance_type } Annotation: { annotation } " )
146
- return {
147
- 'ComplianceType' : compliance_type ,
148
- 'Annotation' : annotation ,
149
- 'OrderingTimestamp' : datetime .now ().isoformat ()
150
- }
146
+ return {"ComplianceType" : compliance_type , "Annotation" : annotation , "OrderingTimestamp" : datetime .now ().isoformat ()}
151
147
152
148
153
149
def lambda_handler (event : dict , context : Any ) -> None :
@@ -160,17 +156,17 @@ def lambda_handler(event: dict, context: Any) -> None:
160
156
LOGGER .info (f"Lambda Handler Context: { context } " )
161
157
LOGGER .info (f"Lambda Handler Event: { event } " )
162
158
evaluation = evaluate_compliance (event , context )
163
- config = boto3 .client (' config' )
164
- params = ast .literal_eval (event [' ruleParameters' ])
159
+ config = boto3 .client (" config" )
160
+ params = ast .literal_eval (event [" ruleParameters" ])
165
161
config .put_evaluations (
166
162
Evaluations = [
167
163
{
168
- ' ComplianceResourceType' : ' AWS::S3::Bucket' ,
169
- ' ComplianceResourceId' : params .get (' BucketName' ),
170
- ' ComplianceType' : evaluation [' ComplianceType' ], # type: ignore
171
- ' Annotation' : evaluation [' Annotation' ], # type: ignore
172
- ' OrderingTimestamp' : evaluation [' OrderingTimestamp' ] # type: ignore
164
+ " ComplianceResourceType" : " AWS::S3::Bucket" ,
165
+ " ComplianceResourceId" : params .get (" BucketName" ),
166
+ " ComplianceType" : evaluation [" ComplianceType" ], # type: ignore
167
+ " Annotation" : evaluation [" Annotation" ], # type: ignore
168
+ " OrderingTimestamp" : evaluation [" OrderingTimestamp" ], # type: ignore
173
169
}
174
170
],
175
- ResultToken = event [' resultToken' ]
171
+ ResultToken = event [" resultToken" ],
176
172
)
0 commit comments