29
29
# Define the AWS Config rule parameters
30
30
RULE_NAME = "sra-bedrock-check-eval-job-bucket"
31
31
SERVICE_NAME = "bedrock.amazonaws.com"
32
+ BUCKET_NAME = ""
32
33
33
34
34
35
def evaluate_compliance (event : dict , context : Any ) -> tuple [str , str ]: # noqa: U100, CCR001, C901
@@ -41,21 +42,32 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
41
42
Returns:
42
43
tuple[str, str]: The compliance status and annotation
43
44
"""
45
+ global BUCKET_NAME
44
46
LOGGER .info (f"Evaluate Compliance Event: { event } " )
45
47
# Initialize AWS clients
46
48
s3 = boto3 .client ("s3" )
47
-
49
+ sts = boto3 .client ("sts" )
50
+ account = sts .get_caller_identity ().get ("Account" )
48
51
# Get rule parameters
49
52
params = ast .literal_eval (event ["ruleParameters" ])
50
53
LOGGER .info (f"Parameters: { params } " )
51
- bucket_name = params .get ("BucketName" , "" )
54
+ LOGGER .info (f"Account: { account } " )
55
+ buckets = params .get ("Buckets" , {account : "" })
56
+ LOGGER .info (f"Buckets: { buckets } " )
57
+ buckets = ast .literal_eval (buckets )
58
+ bucket_name = buckets .get (account , "" )
59
+ LOGGER .info (f"Bucket Name: { bucket_name } " )
60
+ BUCKET_NAME = bucket_name
61
+
52
62
check_retention = params .get ("CheckRetention" , "true" ).lower () != "false"
53
63
check_encryption = params .get ("CheckEncryption" , "true" ).lower () != "false"
54
64
check_logging = params .get ("CheckLogging" , "true" ).lower () != "false"
55
65
check_object_locking = params .get ("CheckObjectLocking" , "true" ).lower () != "false"
56
66
check_versioning = params .get ("CheckVersioning" , "true" ).lower () != "false"
57
67
58
68
# Check if the bucket exists
69
+ if bucket_name == "" :
70
+ return build_evaluation ("NOT_APPLICABLE" , "No bucket name provided" )
59
71
if not check_bucket_exists (bucket_name ):
60
72
return build_evaluation ("NOT_APPLICABLE" , f"Bucket { bucket_name } does not exist or is not accessible" )
61
73
@@ -64,6 +76,7 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
64
76
65
77
# Check retention
66
78
if check_retention :
79
+ LOGGER .info (f"Checking retention policy for bucket { bucket_name } " )
67
80
try :
68
81
retention = s3 .get_bucket_lifecycle_configuration (Bucket = bucket_name )
69
82
if not any (rule .get ("Expiration" ) for rule in retention .get ("Rules" , [])):
@@ -75,6 +88,7 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
75
88
76
89
# Check encryption
77
90
if check_encryption :
91
+ LOGGER .info (f"Checking encryption for bucket { bucket_name } " )
78
92
try :
79
93
encryption = s3 .get_bucket_encryption (Bucket = bucket_name )
80
94
if "ServerSideEncryptionConfiguration" not in encryption :
@@ -86,13 +100,15 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
86
100
87
101
# Check logging
88
102
if check_logging :
103
+ LOGGER .info (f"Checking logging for bucket { bucket_name } " )
89
104
logging = s3 .get_bucket_logging (Bucket = bucket_name )
90
105
if "LoggingEnabled" not in logging :
91
106
compliance_type = "NON_COMPLIANT"
92
107
annotation .append ("Server access logging not enabled" )
93
108
94
109
# Check object locking
95
110
if check_object_locking :
111
+ LOGGER .info (f"Checking object locking for bucket { bucket_name } " )
96
112
try :
97
113
object_locking = s3 .get_object_lock_configuration (Bucket = bucket_name )
98
114
if "ObjectLockConfiguration" not in object_locking :
@@ -104,6 +120,7 @@ def evaluate_compliance(event: dict, context: Any) -> tuple[str, str]: # noqa:
104
120
105
121
# Check versioning
106
122
if check_versioning :
123
+ LOGGER .info (f"Checking versioning for bucket { bucket_name } " )
107
124
versioning = s3 .get_bucket_versioning (Bucket = bucket_name )
108
125
if versioning .get ("Status" ) != "Enabled" :
109
126
compliance_type = "NON_COMPLIANT"
@@ -157,12 +174,11 @@ def lambda_handler(event: dict, context: Any) -> None:
157
174
LOGGER .info (f"Lambda Handler Event: { event } " )
158
175
evaluation = evaluate_compliance (event , context )
159
176
config = boto3 .client ("config" )
160
- params = ast .literal_eval (event ["ruleParameters" ])
161
177
config .put_evaluations (
162
178
Evaluations = [
163
179
{
164
180
"ComplianceResourceType" : "AWS::S3::Bucket" ,
165
- "ComplianceResourceId" : params . get ( "BucketName" ) ,
181
+ "ComplianceResourceId" : BUCKET_NAME ,
166
182
"ComplianceType" : evaluation ["ComplianceType" ], # type: ignore
167
183
"Annotation" : evaluation ["Annotation" ], # type: ignore
168
184
"OrderingTimestamp" : evaluation ["OrderingTimestamp" ], # type: ignore
0 commit comments