30
30
config_client = boto3 .client ("config" , region_name = AWS_REGION )
31
31
s3_client = boto3 .client ("s3" , region_name = AWS_REGION )
32
32
33
+ # Global variables
34
+ BUCKET_NAME = ""
33
35
34
36
def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]: # noqa: CFQ004, CCR001, C901
35
37
"""Evaluate if Bedrock Model Invocation Logging is properly configured for S3.
@@ -41,6 +43,7 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ0
41
43
tuple[str, str]: Compliance status and annotation message.
42
44
43
45
"""
46
+ global BUCKET_NAME
44
47
# Parse rule parameters
45
48
params = json .loads (json .dumps (rule_parameters )) if rule_parameters else {}
46
49
check_retention = params .get ("check_retention" , "true" ).lower () == "true"
@@ -57,17 +60,22 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ0
57
60
LOGGER .info (f"Bedrock Model Invocation S3 config: { s3_config } " )
58
61
bucket_name = s3_config .get ("bucketName" , "" )
59
62
LOGGER .info (f"Bedrock Model Invocation S3 bucketName: { bucket_name } " )
60
-
63
+ BUCKET_NAME = bucket_name
61
64
if not s3_config or not bucket_name :
62
65
return "NON_COMPLIANT" , "S3 logging is not enabled for Bedrock Model Invocation Logging"
63
66
64
67
# Check S3 bucket configurations
65
68
issues = []
66
69
67
70
if check_retention :
68
- lifecycle = s3_client .get_bucket_lifecycle_configuration (Bucket = bucket_name )
69
- if not any (rule .get ("Expiration" ) for rule in lifecycle .get ("Rules" , [])):
70
- issues .append ("retention not set" )
71
+ try :
72
+ lifecycle = s3_client .get_bucket_lifecycle_configuration (Bucket = bucket_name )
73
+ if not any (rule .get ("Expiration" ) for rule in lifecycle .get ("Rules" , [])):
74
+ issues .append ("retention not set" )
75
+ except botocore .exceptions .ClientError as client_error :
76
+ if client_error .response ['Error' ]['Code' ] == 'NoSuchLifecycleConfiguration' :
77
+ LOGGER .info (f"No lifecycle configuration found for S3 bucket: { bucket_name } " )
78
+ issues .append ("lifecycle not set" )
71
79
72
80
if check_encryption :
73
81
encryption = s3_client .get_bucket_encryption (Bucket = bucket_name )
@@ -98,12 +106,11 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ0
98
106
return "INSUFFICIENT_DATA" , f"Error evaluating Object Lock configuration: { str (error )} "
99
107
100
108
if issues :
101
- return "NON_COMPLIANT" , f"S3 logging enabled but { ', ' .join (issues )} "
109
+ return "NON_COMPLIANT" , f"S3 logging to { BUCKET_NAME } enabled but { ', ' .join (issues )} "
102
110
return "COMPLIANT" , f"S3 logging properly configured for Bedrock Model Invocation Logging. Bucket: { bucket_name } "
103
-
104
- except Exception as e :
105
- LOGGER .error (f"Error evaluating Bedrock Model Invocation Logging configuration: { str (e )} " )
106
- return "INSUFFICIENT_DATA" , f"Error evaluating compliance: { str (e )} "
111
+ except botocore .exceptions .ClientError as client_error :
112
+ LOGGER .error (f"Error evaluating Bedrock Model Invocation Logging configuration: { str (client_error )} " )
113
+ return "INSUFFICIENT_DATA" , f"Error evaluating compliance: { str (client_error )} "
107
114
108
115
109
116
def lambda_handler (event : dict , context : Any ) -> None : # noqa: U100
0 commit comments