--- AWSTemplateFormatVersion: "2010-09-09" Description: |- Start, stop and back up AWS resources tagged with cron schedules. github.com/sqlxpert/lights-off-aws/ GPLv3 Copyright Paul Marcelin Parameters: PlaceholderSuggestedStackName: Type: String Default: "LightsOff" PlaceholderHelp: Type: String Default: "github.com/sqlxpert/lights-off-aws#quick-start" Enable: Type: String Description: >- Whether the "Find" AWS Lambda function will run, checking every 10 minutes for AWS resources with scheduled operations Default: "true" AllowedValues: - "false" - "true" EnableSchedCloudFormationOps: Type: String Description: >- Whether the feature that performs scheduled stack update operations on your own CloudFormation stacks is enabled. Setting this to "false" reduces permisions for the "Find" and "Do" AWS Lambda functions. See https://github.com/sqlxpert/lights-off-aws#bonus-delete-and-recreate-expensive-resources-on-a-schedule Default: "true" AllowedValues: - "false" - "true" PlaceholderAdvancedParameters: Type: String Default: "" AllowedValues: - "" BackupRoleName: Type: String Description: >- The IAM role that the AWS Backup service will assume when creating backups. Specify only the name, not the ARN. For a StackSet, the role must exist, and have the same name, in every target AWS account. Roles are account-wide, not regional. AWS Backup creates "service-role/AWSBackupDefaultServiceRole" the first time you start an on-demand backup in the AWS Console. Otherwise, if you want to use this role, you must create it explicitly. See https://docs.aws.amazon.com/aws-backup/latest/devguide/iam-service-roles.html#default-service-roles . Security warning: The AWS Lambda execution role for the "Do" function receives permission to iam:PassRole the backup role to AWS Backup. Default: "service-role/AWSBackupDefaultServiceRole" BackupVaultName: Type: String Description: >- The vault where backups will be stored. Specify only the name, not the ARN. The vault must have been created in the same AWS account and region where you are creating this stack. For a StackSet, the vault must exist, and have the same name, in every target AWS account and region. AWS Backup creates the "Default" vault the first time you access the list of vaults in the AWS Console. Otherwise, you must explicitly create a vault. See https://docs.aws.amazon.com/aws-backup/latest/devguide/create-a-vault.html Default: "Default" BackupStartWindowMinutes: Type: Number Description: >- How many minutes AWS Backup should wait before canceling a backup job that has not started. This does not extend BackupCompleteWindowMinutes . MinValue: 60 Default: 60 MaxValue: 52560000 # 100 years BackupCompleteWindowMinutes: Type: Number Description: >- How many minutes AWS Backup should wait before canceling a backup job that has not finished. This must be at least BackupStartWindowMinutes . Schedule backups for the same resource at least this many minutes apart. MinValue: 60 Default: 360 MaxValue: 52560000 # 100 years BackupColdStorageAfterDays: Type: Number Description: >- How many days AWS Backup should wait before moving a backup to storage that is less expensive per byte but has a minimum billable retention period. To avoid cold storage, specify -1 . For some resource types, requesting cold storage has no effect. If you specify a value other than -1 , OptInToArchiveForSupportedResources will be set to true in backup lifecycle policies. See https://docs.aws.amazon.com/aws-backup/latest/devguide/backup-feature-availability.html#features-by-resource Default: -1 BackupDeleteAfterDays: Type: Number Description: >- How many days AWS Backup should wait before deleting a backup. To retain backups indefinitely, specify -1 . If you request both cold storage and deletion (BackupColdStorageAfterDays and BackupDeleteAfterDays are both other than -1), specify at least BackupColdStorageAfterDays + 90 days. Default: -1 FindLambdaFnMemoryMB: Type: Number Description: >- How many megabytes of memory to allocate to the "Find" AWS Lambda function. Increase this only in case of out-of-memory errors. See https://docs.aws.amazon.com/lambda/latest/operatorguide/computing-power.html Default: 128 FindLambdaFnTimeoutSecs: Type: Number Description: >- How many seconds before execution of the "Find" AWS Lambda function is canceled. Increase this only in case of time-out errors. See https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-functions-that-can-run-up-to-15-minutes/ Default: 60 DoLambdaFnReservedConcurrentExecutions: Type: Number Description: >- How many batches of scheduled operation messages can definitely be processed in parallel. To decline access to AWS Lambda reserved concurrency, set this to -1. See https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html#configuration-concurrency-reserved MinValue: -1 Default: -1 DoLambdaFnMaximumConcurrency: Type: Number Description: >- How many batches of scheduled operation messages may be processed in parallel. The minimum is 2. If you set DoLambdaFnReservedConcurrentExecutions to 2 or more, set this no larger than DoLambdaFnReservedConcurrentExecutions. See https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-scaling.html#events-sqs-max-concurrency MinValue: 2 Default: 5 RequireSameAccountKmsKeyPolicyForEc2StartInstances: Type: String Description: >- When starting an EC2 instance with an EBS volume that is encrypted with a custom KMS key, whether to defer to the key policy even if the custom key and the EC2 instance are in the same AWS account. The default, "false", lets the "Do" AWS Lambda function use any custom key in the same AWS account even if the key policy doesn't explicitly allow it. Changing this to "true" treats same-account custom keys just like other-account custom keys: the "Do" function can only use a key if the key policy allows, and otherwise, the request to start the EC2 instance will fail. For a sample key policy statement, see: https://github.com/sqlxpert/lights-off-aws#starting-ec2-instances-with-encrypted-ebs-volumes Default: "false" AllowedValues: - "false" - "true" DoLambdaFnRoleAttachLocalPolicyName: Type: String Description: >- The name of a customer-managed IAM policy to attach to the "Do" function's role. By including "Effect": "Deny" statements, you could, for example, prevent production resources from being stopped. Specify only the name, not the ARN. For a StackSet, the policy must exist, and have exactly the same name, in every target AWS account. Policies are account-wide, not regional. See https://github.com/sqlxpert/lights-off-aws/README.md#security-steps-you-can-take Default: "" DoLambdaFnBatchSize: Type: Number Description: >- How many scheduled operation messages to process in a single invokation. Batching is not important for a sporadic and/or low-volume workload. Consider together with QueueMessageBytesMax (for the error queue, which might receive a batch of messages inside an EventBridge Scheduler event), DoLambdaFnMemoryMB and DoLambdaFnTimeoutSecs . MinValue: 1 Default: 3 DoLambdaFnMemoryMB: Type: Number Description: >- How many megabytes of memory to allocate to the "Do" AWS Lambda function. Increase this only in case of out-of-memory errors. Default: 128 DoLambdaFnTimeoutSecs: Type: Number Description: >- How many seconds before execution of the "Do" AWS Lambda function is canceled. Increase this only in case of time-out errors. Default: 30 OperationQueueVisibilityTimeoutSecs: Type: Number Description: >- How many seconds SQS waits for the "Do" AWS Lambda function to accept and process a scheduled operation message. This must be at least DoLambdaFnTimeoutSecs . In case of problems, see https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-queueconfig Default: 90 QueueMessageBytesMax: Type: Number Description: >- The maximum number of bytes in an operation queue message MinValue: 1024 Default: 32768 # 32 KiB (worst case when copying 50 long tags) MaxValue: 262144 # 256 KiB ErrorQueueMessageRetentionPeriodSecs: Type: Number Description: >- How many seconds to keep error queue messages. For consistency with the log retention period, and if CloudWatch Logs and SQS allow, set this to LogRetentionInDays * 86400 . See MessageRetentionPeriod in https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html#API_SetQueueAttributes_RequestParameters Default: 604800 SqsKmsKey: Type: String Description: >- If this is blank, default non-KMS SQS encryption applies. To use the AWS-managed key (which does not support key policy restrictions, or cross-region or cross-account usage), specify "alias/aws/sqs". To use a custom key, specify "ACCOUNT:key/KEY_ID". Whether the custom key is a single-region key, a multi-region key primary, or a multi-region key replica, it must be in the same region where you are creating this stack. Even if the custom key is in the same AWS account as this stack, you must update the key policy to allow usage by SQS. See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#compatibility-with-aws-services . For a StackSet, if you wish to use a custom key, it must be multi-region ("mrk-" prefix in the KEY_ID), and a replica (or the primary key itself) must exist in every target region. Default: "" LogRetentionInDays: Type: Number Description: >- How many days to keep CloudWatch logs from the AWS Lambda functions. See retentionInDays in http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutRetentionPolicy.html Default: 7 LogLevel: Type: String Description: >- Threshold for logging the activities of the AWS Lambda functions. See https://docs.python.org/3/library/logging.html#levels Default: ERROR AllowedValues: - CRITICAL - ERROR - WARNING - INFO - DEBUG - NOTSET CloudWatchLogsKmsKey: Type: String Description: >- If this is blank, default non-KMS CloudWatch Logs encryption applies. To use a KMS key, which must be a custom key, specify "ACCOUNT:key/KEY_ID". Whether the custom key is a single-region key, a multi-region key primary, or a multi-region key replica, it must be in the same region where you are creating this stack. Even if the custom key is in the same AWS account as this stack, you must update the key policy to allow usage by CloudWatch Logs. See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/encrypt-log-data-kms.html#cmk-permissions . For a StackSet, the custom key must be multi-region ("mrk-" prefix in the KEY_ID), and a replica (or the primary key itself) must exist in every target region. Default: "" Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: For Reference Parameters: - PlaceholderSuggestedStackName - PlaceholderHelp - Label: default: Essential Parameters: - Enable - EnableSchedCloudFormationOps - Label: default: Advanced... Parameters: - PlaceholderAdvancedParameters - Label: default: Backups Parameters: - BackupRoleName - BackupVaultName - BackupStartWindowMinutes - BackupCompleteWindowMinutes - BackupColdStorageAfterDays - BackupDeleteAfterDays - Label: default: AWS Lambda function to find resources with schedule tags Parameters: - FindLambdaFnMemoryMB - FindLambdaFnTimeoutSecs - Label: default: AWS Lambda function to do scheduled operations Parameters: - DoLambdaFnReservedConcurrentExecutions - DoLambdaFnMaximumConcurrency - RequireSameAccountKmsKeyPolicyForEc2StartInstances - DoLambdaFnRoleAttachLocalPolicyName - DoLambdaFnBatchSize - DoLambdaFnMemoryMB - DoLambdaFnTimeoutSecs - Label: default: SQS queues for errors and scheduled operations Parameters: - OperationQueueVisibilityTimeoutSecs - QueueMessageBytesMax - ErrorQueueMessageRetentionPeriodSecs - SqsKmsKey - Label: default: Logs Parameters: - LogRetentionInDays - LogLevel - CloudWatchLogsKmsKey ParameterLabels: PlaceholderHelp: default: For help with this stack, see PlaceholderSuggestedStackName: default: Suggested stack name Enable: default: Enabled? EnableSchedCloudFormationOps: default: Scheduled CloudFormation operations enabled? PlaceholderAdvancedParameters: default: Do not change the parameters below, unless necessary! BackupRoleName: default: IAM role name BackupVaultName: default: Vault name BackupStartWindowMinutes: default: Minutes before canceling, if not started BackupCompleteWindowMinutes: default: Minutes before canceling, if not finished BackupColdStorageAfterDays: default: Days before moving to lower-cost storage BackupDeleteAfterDays: default: Days before deleting FindLambdaFnMemoryMB: default: Megabytes of memory FindLambdaFnTimeoutSecs: default: Seconds before timeout DoLambdaFnReservedConcurrentExecutions: default: Reserved parallel batches DoLambdaFnMaximumConcurrency: default: Maximum parallel batches RequireSameAccountKmsKeyPolicyForEc2StartInstances: default: Require same-account KMS key policy for EC2 sched-start DoLambdaFnRoleAttachLocalPolicyName: default: Name of local policy to attach DoLambdaFnBatchSize: default: Batch size DoLambdaFnMemoryMB: default: Megabytes of memory DoLambdaFnTimeoutSecs: default: Seconds before timeout OperationQueueVisibilityTimeoutSecs: default: Seconds before re-processing a message QueueMessageBytesMax: default: Maximum bytes in a message ErrorQueueMessageRetentionPeriodSecs: default: Seconds before deleting an error queue message SqsKmsKey: default: KMS encryption key LogRetentionInDays: default: Days before deleting LogLevel: default: Message level CloudWatchLogsKmsKey: default: KMS encryption key Conditions: EnableTrue: !Equals [ !Ref Enable, "true" ] EnableSchedCloudFormationOpsTrue: !Equals [ !Ref EnableSchedCloudFormationOps, "true" ] RequireSameAccountKmsKeyPolicyForEc2StartInstancesTrue: !Equals [ !Ref RequireSameAccountKmsKeyPolicyForEc2StartInstances, "true" ] SqsKmsKeyBlank: !Equals [ !Ref SqsKmsKey, "" ] SqsKmsKeyCustom: Fn::And: - !Not [ !Condition SqsKmsKeyBlank ] - !Not [ !Equals [ !Ref SqsKmsKey, "alias/aws/sqs" ] ] DoLambdaFnRoleAttachLocalPolicyNameBlank: !Equals [ !Ref DoLambdaFnRoleAttachLocalPolicyName, "" ] DoLambdaFnReservedConcurrentExecutionsOff: !Equals [ !Ref DoLambdaFnReservedConcurrentExecutions, -1 ] CloudWatchLogsKmsKeyBlank: !Equals [ !Ref CloudWatchLogsKmsKey, "" ] Resources: ErrorQueue: Type: AWS::SQS::Queue Properties: DelaySeconds: 0 SqsManagedSseEnabled: !If [ SqsKmsKeyBlank, true, false ] KmsMasterKeyId: Fn::If: - SqsKmsKeyBlank - !Ref AWS::NoValue - Fn::If: - SqsKmsKeyCustom - !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${SqsKmsKey}" - !Ref SqsKmsKey KmsDataKeyReusePeriodSeconds: !If [ SqsKmsKeyBlank, !Ref AWS::NoValue, 86400 ] # seconds (24 hours) MessageRetentionPeriod: !Ref ErrorQueueMessageRetentionPeriodSecs ReceiveMessageWaitTimeSeconds: 20 # long polling (lowest cost) VisibilityTimeout: 0 # seconds; dead message retries don't make sense OperationQueue: Type: AWS::SQS::Queue Properties: SqsManagedSseEnabled: !If [ SqsKmsKeyBlank, true, false ] KmsMasterKeyId: Fn::If: - SqsKmsKeyBlank - !Ref AWS::NoValue - Fn::If: - SqsKmsKeyCustom - !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${SqsKmsKey}" - !Ref SqsKmsKey KmsDataKeyReusePeriodSeconds: !If [ SqsKmsKeyBlank, !Ref AWS::NoValue, 86400 ] # seconds (24 hours) MaximumMessageSize: !Ref QueueMessageBytesMax MessageRetentionPeriod: 1200 # seconds # 1200 seconds = 20 minutes = 2 cycles # DoLambdaFn code treats a scheduled operation message as expired # 9 minutes after the start of a cycle, logs the error, and consumes the # message. # SQS silently deletes messages that were never consumed during the # retention period; those do not go to the dead-letter queue. ReceiveMessageWaitTimeSeconds: 20 # long polling (lowest cost) VisibilityTimeout: !Ref OperationQueueVisibilityTimeoutSecs RedrivePolicy: maxReceiveCount: 3 deadLetterTargetArn: !GetAtt ErrorQueue.Arn RedriveAllowPolicy: redrivePermission: denyAll # In-line policies apply only to their roles, which, in turn, can only be # assumed by AWS Lambda functions. Separate, "managed" policies could be # attached to other roles or users, allowing permission escalation. # Administrator should restrict iam:PassRole to prevent use of these roles # with arbitrary AWS Lambda functions. FindLambdaFnRole: Type: AWS::IAM::Role Properties: Description: !Sub "For ${AWS::Region} region" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: { Service: lambda.amazonaws.com } Action: sts:AssumeRole Policies: - PolicyName: CloudWatchLogsCreateLogGroupIfDeleted PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup Resource: !GetAtt FindLambdaFnLogGrp.Arn - PolicyName: CloudWatchLogsWrite PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${FindLambdaFnLogGrp}:log-stream:*" # !GetAtt LogGroup.Arn ends with :* instead of allowing us to # append :log-stream:* to make a log stream ARN - PolicyName: Ec2Read PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - ec2:DescribeInstances - ec2:DescribeVolumes - ec2:DescribeTags Resource: "*" - PolicyName: RdsRead PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - rds:DescribeDBInstances - rds:DescribeDBClusters Resource: "*" - Effect: Allow Action: rds:ListTagsForResource Resource: - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:db:*" - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster:*" - Fn::If: - EnableSchedCloudFormationOpsTrue - PolicyName: CloudFormationRead PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - cloudformation:ListStacks - cloudformation:DescribeStacks Resource: "*" - !Ref AWS::NoValue - Fn::If: - SqsKmsKeyCustom - PolicyName: SqsKmsEncryptNoteComplementsQueuePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#send-to-encrypted-queue - kms:GenerateDataKey - kms:Decrypt # To verify a new data key! Resource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${SqsKmsKey}" Condition: StringEquals: { "kms:ViaService": !Sub "sqs.${AWS::Region}.amazonaws.com" } - !Ref AWS::NoValue DoLambdaFnRole: Type: AWS::IAM::Role Properties: Description: !Sub "For ${AWS::Region} region" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: { Service: lambda.amazonaws.com } Action: sts:AssumeRole ManagedPolicyArns: - Fn::If: - DoLambdaFnRoleAttachLocalPolicyNameBlank - !Ref AWS::NoValue - !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:policy/${DoLambdaFnRoleAttachLocalPolicyName}" Policies: - PolicyName: CloudWatchLogsCreateLogGroupIfDeleted PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup Resource: !GetAtt DoLambdaFnLogGrp.Arn - PolicyName: CloudWatchLogsWrite PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${DoLambdaFnLogGrp}:log-stream:*" # !GetAtt LogGroup.Arn ends with :* instead of allowing us to # append :log-stream:* to make a log stream ARN - PolicyName: Ec2Write PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: ec2:StartInstances Resource: !Sub "arn:${AWS::Partition}:ec2:${AWS::Region}:${AWS::AccountId}:instance/*" Condition: StringLike: { "aws:ResourceTag/sched-start": "*" } - Effect: Allow Action: ec2:StopInstances Resource: !Sub "arn:${AWS::Partition}:ec2:${AWS::Region}:${AWS::AccountId}:instance/*" Condition: StringLike: { "aws:ResourceTag/sched-stop": "*" } - Effect: Allow Action: ec2:StopInstances Resource: !Sub "arn:${AWS::Partition}:ec2:${AWS::Region}:${AWS::AccountId}:instance/*" Condition: StringLike: { "aws:ResourceTag/sched-hibernate": "*" } - Sid: BackupCreatorCannotDeleteBackup Effect: Deny Action: - ec2:DeregisterImage - ec2:DeleteSnapshot Resource: "*" - Fn::If: - RequireSameAccountKmsKeyPolicyForEc2StartInstancesTrue - PolicyName: KmsForEc2StartInstancesRequireSameAccountKmsKeyPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kms:CreateGrant NotResource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:*" Condition: StringEquals: { "kms:ViaService": !Sub "ec2.${AWS::Region}.amazonaws.com" } Bool: { "kms:GrantIsForAWSResource": "true" } - !Ref AWS::NoValue - Fn::If: - RequireSameAccountKmsKeyPolicyForEc2StartInstancesTrue - !Ref AWS::NoValue - PolicyName: KmsForEc2StartInstances PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kms:CreateGrant Resource: "*" Condition: StringEquals: { "kms:ViaService": !Sub "ec2.${AWS::Region}.amazonaws.com" } Bool: { "kms:GrantIsForAWSResource": "true" } - PolicyName: RdsWrite PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - rds:StartDBInstance - rds:StartDBCluster Resource: - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:db:*" - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster:*" Condition: StringLike: { "aws:ResourceTag/sched-start": "*" } - Effect: Allow Action: - rds:StopDBInstance - rds:StopDBCluster Resource: - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:db:*" - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster:*" Condition: StringLike: { "aws:ResourceTag/sched-stop": "*" } - Sid: BackupCreatorCannotDeleteBackup Effect: Deny Action: - rds:DeleteDBSnapshot - rds:DeleteDBClusterSnapshot Resource: - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:snapshot:*" - !Sub "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-snapshot:*" - PolicyName: BackupWrite PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: backup:StartBackupJob Resource: !Sub "arn:${AWS::Partition}:backup:${AWS::Region}:${AWS::AccountId}:backup-vault:${BackupVaultName}" - Effect: Allow Action: iam:PassRole Resource: !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/${BackupRoleName}" Condition: StringLike: { "iam:PassedToService": "backup.amazonaws.com" } - Fn::If: - EnableSchedCloudFormationOpsTrue - PolicyName: CloudFormationWrite PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: cloudformation:UpdateStack Resource: !Sub "arn:${AWS::Partition}:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/*" Condition: StringLike: { "aws:ResourceTag/sched-set-Enable-false": "*" } - Effect: Allow Action: cloudformation:UpdateStack Resource: !Sub "arn:${AWS::Partition}:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/*" Condition: StringLike: { "aws:ResourceTag/sched-set-Enable-true": "*" } - !Ref AWS::NoValue - Fn::If: - SqsKmsKeyCustom - PolicyName: SqsKmsDecryptNoteComplementsQueuePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#receive-from-encrypted-queue - kms:Decrypt Resource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${SqsKmsKey}" Condition: StringEquals: { "kms:ViaService": !Sub "sqs.${AWS::Region}.amazonaws.com" } - !Ref AWS::NoValue ErrorQueuePol: Type: AWS::SQS::QueuePolicy Properties: Queues: [ !Ref ErrorQueue ] PolicyDocument: Version: "2012-10-17" Statement: - Sid: RequireTls Effect: Deny Principal: "*" Action: sqs:* Resource: "*" Condition: Bool: { aws:SecureTransport: "false" } - Effect: Allow Principal: "*" Action: sqs:GetQueueAttributes Resource: "*" - Sid: Fn::If: - SqsKmsKeyCustom - SourceScheduleNoteRoleNeedsSqsKmsEncrypt - SourceSchedule Effect: Allow Principal: "*" Action: sqs:SendMessage Resource: "*" Condition: ArnEquals: "aws:PrincipalArn": - !GetAtt InvokeFindLambdaFnRole.Arn - Sid: Fn::If: - SqsKmsKeyCustom - SourceQueueNoteKeyPolicyNeedsSqsKmsEncrypt - SourceQueue Effect: Allow Principal: "*" Action: sqs:SendMessage Resource: "*" Condition: ArnEquals: "aws:SourceArn": - !GetAtt OperationQueue.Arn - Sid: ExclusiveSources Effect: Deny Principal: "*" Action: sqs:SendMessage Resource: "*" Condition: ArnNotEquals: "aws:PrincipalArn": - !GetAtt InvokeFindLambdaFnRole.Arn "aws:SourceArn": - !GetAtt OperationQueue.Arn OperationQueuePol: Type: AWS::SQS::QueuePolicy Properties: Queues: [ !Ref OperationQueue ] PolicyDocument: Version: "2012-10-17" Statement: - Sid: RequireTls Effect: Deny Principal: "*" Action: sqs:* Resource: "*" Condition: Bool: { aws:SecureTransport: "false" } - Effect: Allow Principal: "*" Action: sqs:GetQueueAttributes Resource: "*" - Sid: Fn::If: - SqsKmsKeyCustom - SourceLambdaFnNoteRoleNeedsSqsKmsEncrypt - SourceLambdaFn Effect: Allow Principal: "*" Action: sqs:SendMessage Resource: "*" Condition: ArnEquals: { aws:PrincipalArn: !GetAtt FindLambdaFnRole.Arn } - Sid: ExclusiveSource Effect: Deny Principal: "*" Action: sqs:SendMessage Resource: "*" Condition: ArnNotEquals: { aws:PrincipalArn: !GetAtt FindLambdaFnRole.Arn } - Sid: Fn::If: - SqsKmsKeyCustom - TargetLambdaFnNoteRoleNeedsSqsKmsDecrypt - TargetLambdaFn Effect: Allow Principal: "*" Action: - sqs:ChangeMessageVisibility - sqs:ReceiveMessage - sqs:DeleteMessage Resource: "*" Condition: ArnEquals: { aws:PrincipalArn: !GetAtt DoLambdaFnRole.Arn } - Sid: Fn::If: - SqsKmsKeyCustom - TargetDeadLetterQueueNoteKeyPolicyNeedsSqsKmsDecrypt - TargetDeadLetterQueue Effect: Allow Principal: "*" Action: - sqs:ChangeMessageVisibility - sqs:ReceiveMessage - sqs:DeleteMessage Resource: "*" Condition: ArnEquals: { aws:SourceArn: !GetAtt ErrorQueue.Arn } - Sid: ExclusiveTargets Effect: Deny Principal: "*" Action: - sqs:ChangeMessageVisibility - sqs:ReceiveMessage - sqs:DeleteMessage Resource: "*" Condition: ArnNotEquals: aws:PrincipalArn: - !GetAtt DoLambdaFnRole.Arn aws:SourceArn: - !GetAtt ErrorQueue.Arn FindLambdaFnLogGrp: Type: AWS::Logs::LogGroup Properties: RetentionInDays: !Ref LogRetentionInDays KmsKeyId: Fn::If: - CloudWatchLogsKmsKeyBlank - !Ref AWS::NoValue - !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${CloudWatchLogsKmsKey}" FindLambdaFn: Type: AWS::Lambda::Function Properties: Role: !GetAtt FindLambdaFnRole.Arn ReservedConcurrentExecutions: 1 # Only one Find process at a time! Timeout: !Ref FindLambdaFnTimeoutSecs MemorySize: !Ref FindLambdaFnMemoryMB LoggingConfig: LogGroup: !Ref FindLambdaFnLogGrp LogFormat: JSON SystemLogLevel: WARN ApplicationLogLevel: !Ref LogLevel Architectures: - arm64 Runtime: python3.13 # To avoid making users build a source bundle and distribute it to a # bucket in every target region (an AWS Lambda requirement when using # S3), use common, inline source code for both functions... Environment: Variables: # Referenced before handler for either Lambda function is invoked: "QUEUE_URL": !Ref OperationQueue "QUEUE_MSG_BYTES_MAX": !Ref QueueMessageBytesMax "BACKUP_ROLE_ARN": !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/${BackupRoleName}" # Referenced only after "Find" Lambda function handler is invoked: "BACKUP_VAULT_NAME": !Ref BackupVaultName "BACKUP_START_WINDOW_MINUTES": !Ref BackupStartWindowMinutes "BACKUP_COMPLETE_WINDOW_MINUTES": !Ref BackupCompleteWindowMinutes "BACKUP_COLD_STORAGE_AFTER_DAYS": !Ref BackupColdStorageAfterDays "BACKUP_DELETE_AFTER_DAYS": !Ref BackupDeleteAfterDays "ENABLE_SCHED_CLOUDFORMATION_OPS": !If [ EnableSchedCloudFormationOpsTrue, "", !Ref AWS::NoValue ] Handler: index.lambda_handler_find Code: ZipFile: | #!/usr/bin/env python3 """Start, stop and back up AWS resources tagged with cron schedules github.com/sqlxpert/lights-off-aws GPLv3 Copyright Paul Marcelin """ import os import logging import datetime import re import json import botocore import boto3 logger = logging.getLogger() # Skip "credentials in environment" INFO message, unavoidable in AWS Lambda: logging.getLogger("botocore").setLevel(logging.WARNING) def environ_int(environ_var_name): """Take name of an environment variable, return its integer value """ return int(os.environ[environ_var_name]) SCHED_DELIMS = r"\ +" # Exposed space must be escaped for re.VERBOSE SCHED_TERMS = rf"([^ ]+{SCHED_DELIMS})*" # Unescaped space inside char class SCHED_REGEXP_STRFTIME_FMT = rf""" (^|{SCHED_DELIMS}) ( # Specific monthly or weekly day and time, or... (dTH:M=%d|uTH:M=%u)T%H:%M | # Day wildcard, specific day, or specific weekday, any other terms, and... (d=(_|%d)|u=%u){SCHED_DELIMS}{SCHED_TERMS} ( # Specific daily time, or... H:M=%H:%M | # Hour wildcard or specific hour, any other terms, and specific minute. H=(_|%H){SCHED_DELIMS}{SCHED_TERMS}M=%M ) ) ({SCHED_DELIMS}|$) """ QUEUE_URL = os.environ["QUEUE_URL"] QUEUE_MSG_BYTES_MAX = environ_int("QUEUE_MSG_BYTES_MAX") QUEUE_MSG_FMT_VERSION = "01" ARN_DELIM = ":" BACKUP_ROLE_ARN = os.environ["BACKUP_ROLE_ARN"] ARN_PARTS = BACKUP_ROLE_ARN.split(ARN_DELIM) # arn:partition:service:region:account-id:resource-type/resource-id # [0] [1] [2] [3] [4] [5] # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html#configuration-envvars-runtime ARN_PARTS[3] = os.environ.get("AWS_REGION", os.environ["AWS_DEFAULT_REGION"]) # 1. Helpers ################################################################# def log(entry_type, entry_value, log_level=logging.INFO): """Take type and value, and emit a JSON-format log entry """ entry_value_out = json.loads(json.dumps(entry_value, default=str)) # Avoids "Object of type datetime is not JSON serializable" in # https://github.com/aws/aws-lambda-python-runtime-interface-client/blob/9efb462/awslambdaric/lambda_runtime_log_utils.py#L109-L135 # # The JSON encoder in the AWS Lambda Python runtime isn't configured to # serialize datatime values in responses returned by AWS's own Python SDK! # # Alternative considered: # https://docs.powertools.aws.dev/lambda/python/latest/core/logger/ logger.log( log_level, "", extra={"type": entry_type, "value": entry_value_out} ) def sqs_send_message_log( cycle_start_str, send_kwargs, result, result_type, log_level ): """Log scheduled start (on error), send_message kwargs, and outcome """ if log_level > logging.INFO: log("START", cycle_start_str, log_level) log("KWARGS_SQS_SEND_MESSAGE", send_kwargs, log_level) log(result_type, result, log_level) def op_log(event, op_msg, result, result_type, log_level): """Log Lambda event (on error), SQS message (operation), and outcome """ if log_level > logging.INFO: log("LAMBDA_EVENT", event, log_level) log("SQS_MESSAGE", op_msg, log_level) log(result_type, result, log_level) def assess_op_msg(op_msg): """Take an operation queue message, return error message, type, retry flag """ result = None result_type = "" retry = True if msg_attr_str_decode(op_msg, "version") != QUEUE_MSG_FMT_VERSION: result = "Unrecognized operation queue message format" result_type = "WRONG_QUEUE_MSG_FMT" retry = False elif ( int(msg_attr_str_decode(op_msg, "expires")) < int(datetime.datetime.now(datetime.timezone.utc).timestamp()) ): result = ( "Schedule fewer operations per 10-minute cycle or " "increase DoLambdaFnMaximumConcurrency in CloudFormation" ) result_type = "EXPIRED_OP" retry = False return (result, result_type, retry) def assess_op_except(svc, op_method_name, misc_except): """Take an operation and an exception, return retry flag and log level botocore.exceptions.ClientError is general but statically-defined, making comparison easier, in a multi-service context, than for service-specific but dynamically-defined exceptions like boto3.Client("rds").exceptions.InvalidDBClusterStateFault and boto3.Client("rds").exceptions.InvalidDBInstanceStateFault https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services """ retry = True log_level = logging.ERROR if isinstance(misc_except, botocore.exceptions.ClientError): verb = op_method_name.split("_")[0] err_dict = getattr(misc_except, "response", {}).get("Error", {}) err_msg = err_dict.get("Message") match (svc, err_dict.get("Code")): case ("cloudformation", "ValidationError") if ( "No updates are to be performed." == err_msg ): retry = False log_level = logging.INFO # Idempotent update_stack (after a recent external update) case ("rds", "InvalidDBClusterStateFault") if ( ((verb == "start") and "is in available" in err_msg) or f"is in {verb}" in err_msg ): retry = False log_level = logging.INFO # Idempotent # start_db_cluster "in available[ state]" or "in start[ing state]" or # stop__db_cluster "in stop[ped state]" or "in stop[ping state]" case ("rds", "InvalidDBInstanceState"): # Fault suffix is missing here! retry = False # Can't decide between idempotent start_db_instance / stop_db_instance # (common) or truly erroneous state (rare), because message does not # mention current, invalid state. Log as potential error, but do not # retry (avoids a duplicate error queue entry). return (retry, log_level) def tag_key_join(tag_key_words): """Take a tuple of strings, add a prefix, join, and return a tag key """ return "-".join(("sched", ) + tag_key_words) def cycle_start_end(datetime_in, cycle_minutes=10, cutoff_minutes=9): """Take a datetime, return 10-minute floor and ceiling less 1 minute """ cycle_minutes_int = int(cycle_minutes) cycle_start = datetime_in.replace( minute=(datetime_in.minute // cycle_minutes_int) * cycle_minutes_int, second=0, microsecond=0, ) cycle_cutoff = cycle_start + datetime.timedelta(minutes=cutoff_minutes) return (cycle_start, cycle_cutoff) def msg_attrs_str_encode(attr_pairs): """Take list of string name, value pairs, return SQS MessageAttributes dict """ return { attr_name: {"DataType": "String", "StringValue": attr_value} for (attr_name, attr_value) in attr_pairs } def msg_attr_str_decode(msg, attr_name): """Take an SQS message, return value of a string attribute (must be present) """ return msg["messageAttributes"][attr_name]["stringValue"] svc_clients = {} def svc_client_get(svc): """Take an AWS service, return a boto3 client, creating it if needed boto3 method references can only be resolved at run-time, against an instance of an AWS service's Client class. http://boto3.readthedocs.io/en/latest/guide/events.html#extensibility-guide Alternatives considered: https://github.com/boto/boto3/issues/3197#issue-1175578228 https://github.com/aws-samples/boto-session-manager-project """ if svc not in svc_clients: svc_clients[svc] = boto3.client( svc, config=botocore.config.Config(retries={"mode": "standard"}) ) return svc_clients[svc] # 2. Custom Classes ########################################################## # See rsrc_types_init() for usage examples. class AWSRsrcType(): # pylint: disable=too-many-instance-attributes """AWS resource type, with identification properties and various operations """ members = {} # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__( self, svc, rsrc_type_words, ops_dict, rsrc_id_key_suffix="Id", arn_key_suffix="Arn", tags_key="Tags", status_filter_pair=() ): self.svc = svc self.name_in_methods = "_".join(rsrc_type_words).lower() self.name_in_keys = "".join(rsrc_type_words) self.rsrc_id_key = f"{self.name_in_keys}{rsrc_id_key_suffix}" if arn_key_suffix: self.arn_prefix = "" self.arn_key = f"{self.name_in_keys}{arn_key_suffix}" else: self.arn_prefix = ARN_DELIM.join( ARN_PARTS[0:2] + [svc] + ARN_PARTS[3:5] + [f"{self.name_in_methods}/"] ) self.arn_key = self.rsrc_id_key self.tags_key = tags_key self.ops = {} for (op_tag_key_words, op_properties) in ops_dict.items(): op = AWSOp.new(self, op_tag_key_words, **op_properties) self.ops[op.tag_key] = op self.describe_kwargs = {} if status_filter_pair: self.describe_kwargs["Filters"] = [ {"Name": filter_name, "Values": list(filter_values)} for (filter_name, filter_values) in [status_filter_pair, ("tag-key", self.ops_tag_keys)] ] self.__class__.members[(svc, self.name_in_keys)] = self # Register me! def __str__(self): return " ".join([self.__class__.__name__, self.svc, self.name_in_keys]) @property def ops_tag_keys(self): """Return tag keys for all operations on this resource type """ return self.ops.keys() def rsrc_id(self, rsrc): """Take 1 describe_ result, return the resource ID """ return rsrc[self.rsrc_id_key] def arn(self, rsrc): """Take 1 describe_ result, return the ARN """ return f"{self.arn_prefix}{rsrc[self.arn_key]}" def get_describe_pages(self): """Return an iterator over pages of boto3 describe_ responses """ return svc_client_get(self.svc).get_paginator( f"describe_{self.name_in_methods}s" ).paginate(**self.describe_kwargs) def get_rsrcs(self): """Return an iterator over individual boto3 describe_ items """ return ( rsrc for page in self.get_describe_pages() for rsrc in page.get(f"{self.name_in_keys}s", []) ) def rsrc_tags_list(self, rsrc): """Take 1 describe_ result, return raw resource tags """ return rsrc.get(self.tags_key, []) # Key may be missing if no tags def op_tags_match(self, rsrc, sched_regexp): """Scan 1 resource's tags to find operations scheduled for current cycle """ ops_tag_keys = self.ops_tag_keys op_tags_matched = [] for tag_dict in self.rsrc_tags_list(rsrc): tag_key = tag_dict["Key"] if tag_key in ops_tag_keys and sched_regexp.search(tag_dict["Value"]): op_tags_matched.append(tag_key) return op_tags_matched def rsrcs_find(self, sched_regexp, cycle_start_str, cycle_cutoff_epoch_str): """Find resources to operate on, and send details to queue """ for rsrc in self.get_rsrcs(): op_tags_matched = self.op_tags_match(rsrc, sched_regexp) op_tags_matched_count = len(op_tags_matched) if op_tags_matched_count == 1: op = self.ops[op_tags_matched[0]] op.msg_send_to_queue(rsrc, cycle_start_str, cycle_cutoff_epoch_str) elif op_tags_matched_count > 1: log("START", cycle_start_str, logging.ERROR) log( "MULTIPLE_OPS", {"arn": self.arn(rsrc), "tag_keys": op_tags_matched}, logging.ERROR ) class AWSRsrcTypeEc2Inst(AWSRsrcType): """EC2 instance """ def get_rsrcs(self): return ( instance for page in self.get_describe_pages() for reservation in page.get("Reservations", []) for instance in reservation.get("Instances", []) ) class AWSOp(): """Operation on 1 AWS resource """ def __init__(self, rsrc_type, tag_key_words, **kwargs): self.tag_key = tag_key_join(tag_key_words) self.svc = rsrc_type.svc self.rsrc_id = rsrc_type.rsrc_id verb = kwargs.get("verb", tag_key_words[0]) # Default: 1st word self.method_name = f"{verb}_{rsrc_type.name_in_methods}" self.kwarg_rsrc_id_key = rsrc_type.rsrc_id_key self.kwargs_add = kwargs.get("kwargs_add", {}) @staticmethod def new(rsrc_type, tag_key_words, **kwargs): """Create an op of the requested, appropriate, or default (sub)class """ if "class" in kwargs: op_class = kwargs["class"] elif tag_key_words == ("backup", ): op_class = AWSOpBackUp else: op_class = AWSOp return op_class(rsrc_type, tag_key_words, **kwargs) def kwarg_rsrc_id(self, rsrc): """Transfer resource ID from a describe_ result to another method's kwarg """ return {self.kwarg_rsrc_id_key: self.rsrc_id(rsrc)} def op_kwargs(self, rsrc, cycle_start_str): # pylint: disable=unused-argument """Take a describe_ result, return another method's kwargs """ op_kwargs_out = self.kwarg_rsrc_id(rsrc) op_kwargs_out.update(self.kwargs_add) return op_kwargs_out def msg_send_to_queue(self, rsrc, cycle_start_str, cycle_cutoff_epoch_str): """Send 1 operation message to the SQS queue """ op_kwargs = self.op_kwargs(rsrc, cycle_start_str) if op_kwargs: send_kwargs = { "QueueUrl": QUEUE_URL, "MessageAttributes": msg_attrs_str_encode(( ("version", QUEUE_MSG_FMT_VERSION), ("expires", cycle_cutoff_epoch_str), ("start", cycle_start_str), ("svc", self.svc), ("op_method_name", self.method_name), )), "MessageBody": op_kwargs, # Raw only for logging in case of an exception during JSON encoding } result = None result_type = "" log_level = logging.ERROR try: msg_body = json.dumps(op_kwargs) send_kwargs.update({"MessageBody": msg_body, }) if QUEUE_MSG_BYTES_MAX < len(bytes(msg_body, "utf-8")): result = "Increase QueueMessageBytesMax in CloudFormation" result_type = "QUEUE_MSG_TOO_LONG" else: result = svc_client_get("sqs").send_message(**send_kwargs) result_type = "AWS_RESPONSE" log_level = logging.INFO except Exception as misc_except: # pylint: disable=broad-exception-caught result = misc_except result_type = "EXCEPTION" sqs_send_message_log( cycle_start_str, send_kwargs, result, result_type, log_level ) def __str__(self): return " ".join([ self.__class__.__name__, self.tag_key, self.svc, self.method_name ]) class AWSOpMultipleRsrcs(AWSOp): """Operation on multiple AWS resources of the same type """ def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, **kwargs) self.method_name = self.method_name + "s" def kwarg_rsrc_id(self, rsrc): """Transfer resource ID from a describe_ result to a singleton list kwarg One at a time for consistency and to avoid partial completion risk """ return {f"{self.kwarg_rsrc_id_key}s": [self.rsrc_id(rsrc)]} class AWSOpUpdateStack(AWSOp): """CloudFormation stack update operation """ def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, verb="update", **kwargs) # Use of final template instead of original makes this incompatible with # CloudFormation "transforms". describe_stacks does not return templates. self.kwargs_add.update({ "UsePreviousTemplate": True, "RetainExceptOnCreate": True, }) # Use previous parameter values, except for: # Param Value # Key Out # tag_key sched-set-Enable-true # tag_key sched-set-Enable-false # tag_key_words [-2] [-1] self.changing_param_key = tag_key_words[-2] self.changing_param_value_out = tag_key_words[-1] def op_kwargs(self, rsrc, cycle_start_str): """Take 1 describe_stacks result, return update_stack kwargs An empty dict indicates that no stack update is needed. """ op_kwargs_out = {} params_out = [] if rsrc.get("StackStatus") in ( "UPDATE_COMPLETE", "CREATE_COMPLETE", ): for param in rsrc.get("Parameters", []): param_key = param["ParameterKey"] param_out = { "ParameterKey": param_key, "UsePreviousValue": True, } if param_key == self.changing_param_key: if param.get("ParameterValue") == self.changing_param_value_out: break # One time, if changing_param is present and not already up-to-date param_out.update({ "UsePreviousValue": False, "ParameterValue": self.changing_param_value_out, }) op_kwargs_out = super().op_kwargs(rsrc, cycle_start_str) op_kwargs_out.update({ "ClientRequestToken": f"{self.tag_key}-{cycle_start_str}", "Parameters": params_out, # Continue updating dict in-place }) capabilities = rsrc.get("Capabilities") if capabilities: op_kwargs_out["Capabilities"] = capabilities params_out.append(param_out) else: log( "STACK_STATUS_IRREGULAR", "Fix manually until UPDATE_COMPLETE", logging.ERROR ) log("AWS_RESPONSE_PART", rsrc, logging.ERROR) return op_kwargs_out class AWSOpBackUp(AWSOp): """On-demand AWS Backup operation """ backup_kwargs_add = None lifecycle_base = None @classmethod def backup_kwargs_add_init(cls): """Populate start_backup_job kwargs and base lifecycle, if not yet done """ if not cls.backup_kwargs_add: cls.backup_kwargs_add = { "IamRoleArn": BACKUP_ROLE_ARN, "BackupVaultName": os.environ["BACKUP_VAULT_NAME"], "StartWindowMinutes": environ_int("BACKUP_START_WINDOW_MINUTES"), "CompleteWindowMinutes": environ_int( "BACKUP_COMPLETE_WINDOW_MINUTES" ), } cls.lifecycle_base = {} cold_storage_after_days = environ_int("BACKUP_COLD_STORAGE_AFTER_DAYS") if cold_storage_after_days > 0: cls.lifecycle_base.update({ "OptInToArchiveForSupportedResources": True, "MoveToColdStorageAfterDays": cold_storage_after_days, }) delete_after_days = environ_int("BACKUP_DELETE_AFTER_DAYS") if delete_after_days > 0: cls.lifecycle_base["DeleteAfterDays"] = delete_after_days # pylint: disable=unsupported-assignment-operation def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, **kwargs) self.rsrc_id = rsrc_type.arn self.svc = "backup" self.method_name = "start_backup_job" self.kwarg_rsrc_id_key = "ResourceArn" self.__class__.backup_kwargs_add_init() self.kwargs_add.update(self.__class__.backup_kwargs_add) def op_kwargs(self, rsrc, cycle_start_str): """Take a describe_ result, return start_backup_job kwargs """ op_kwargs_out = super().op_kwargs(rsrc, cycle_start_str) op_kwargs_out.update({ "IdempotencyToken": f"{cycle_start_str},{self.rsrc_id(rsrc)}", # As of May, 2025, AWS Backup treats calls to back up different # resources as identical so long as IdempotencyToken matches! This is # contrary to the documentation ("otherwise identical calls"). To assure # uniqueness, combine scheduled start time, ARN. # https://docs.aws.amazon.com/aws-backup/latest/devguide/API_StartBackupJob.html#Backup-StartBackupJob-request-IdempotencyToken "RecoveryPointTags": {tag_key_join(("time", )): cycle_start_str}, }) lifecycle = dict(self.lifecycle_base) # Updatable copy (future need) if lifecycle: op_kwargs_out["Lifecycle"] = lifecycle return op_kwargs_out # 3. Data-Driven Specifications ############################################## def rsrc_types_init(): """Create AWS resource type objects when needed, if not already done """ if not AWSRsrcType.members: AWSRsrcTypeEc2Inst( "ec2", ("Instance", ), { ("start", ): {"class": AWSOpMultipleRsrcs}, ("stop", ): {"class": AWSOpMultipleRsrcs}, ("hibernate", ): { "class": AWSOpMultipleRsrcs, "verb": "stop", "kwargs_add": {"Hibernate": True}, }, ("backup", ): {}, }, arn_key_suffix=None, status_filter_pair=( "instance-state-name", ("running", "stopping", "stopped") ) ) AWSRsrcType( "ec2", ("Volume", ), {("backup", ): {}}, arn_key_suffix=None, status_filter_pair=("status", ("available", "in-use")), ) AWSRsrcType( "rds", ("DB", "Instance"), { ("start", ): {}, ("stop", ): {}, ("backup", ): {}, }, rsrc_id_key_suffix="Identifier", tags_key="TagList", ) AWSRsrcType( "rds", ("DB", "Cluster"), { ("start", ): {}, ("stop", ): {}, ("backup", ): {}, }, rsrc_id_key_suffix="Identifier", tags_key="TagList", ) if "ENABLE_SCHED_CLOUDFORMATION_OPS" in os.environ: AWSRsrcType( "cloudformation", ("Stack", ), { ("set", "Enable", "true"): {"class": AWSOpUpdateStack}, ("set", "Enable", "false"): {"class": AWSOpUpdateStack}, }, rsrc_id_key_suffix="Name", arn_key_suffix="Id", ) # 4. Find Resources Lambda Function Handler ################################## def lambda_handler_find(event, context): # pylint: disable=unused-argument """Find and queue AWS resources for scheduled operations, based on tags """ log("LAMBDA_EVENT", event) (cycle_start, cycle_cutoff) = cycle_start_end( datetime.datetime.now(datetime.timezone.utc) ) # ISO 8601 basic, no puctuation (downstream requirement) cycle_start_str = cycle_start.strftime("%Y%m%dT%H%MZ") cycle_cutoff_epoch_str = str(int(cycle_cutoff.timestamp())) sched_regexp = re.compile( cycle_start.strftime(SCHED_REGEXP_STRFTIME_FMT), re.VERBOSE ) log("START", cycle_start_str) log("SCHED_REGEXP_VERBOSE", sched_regexp.pattern, logging.DEBUG) rsrc_types_init() for rsrc_type in AWSRsrcType.members.values(): try: rsrc_type.rsrcs_find( sched_regexp, cycle_start_str, cycle_cutoff_epoch_str ) except Exception as misc_except: # pylint: disable=broad-exception-caught log("EXCEPTION", misc_except, logging.ERROR) # Allow continue to next describe_ call (likely that permissions differ) # 5. "Do" Operations Lambda Function Handler ################################# def lambda_handler_do(event, context): # pylint: disable=unused-argument """Perform a queued operation on an AWS resource """ batch_item_failures = [] for op_msg in event.get("Records", []): sqs_msg_id = "" result = None result_type = "" retry = True log_level = logging.ERROR try: sqs_msg_id = op_msg["messageId"] (result, result_type, retry) = assess_op_msg(op_msg) if not result_type: svc = msg_attr_str_decode(op_msg, "svc") op_method_name = msg_attr_str_decode(op_msg, "op_method_name") op_kwargs = json.loads(op_msg["body"]) op_method = getattr(svc_client_get(svc), op_method_name) result = op_method(**op_kwargs) result_type = "AWS_RESPONSE" retry = False log_level = logging.INFO except Exception as misc_except: # pylint: disable=broad-exception-caught result = misc_except result_type = "EXCEPTION" (retry, log_level) = assess_op_except(svc, op_method_name, misc_except) op_log(event, op_msg, result, result_type, log_level) if retry and sqs_msg_id: batch_item_failures.append({"itemIdentifier": sqs_msg_id, }) # https://repost.aws/knowledge-center/lambda-sqs-report-batch-item-failures return {"batchItemFailures": batch_item_failures, } # ZIPFILE_END FindLambdaFnSchedGrp: Type: AWS::Scheduler::ScheduleGroup # Administrator should block other invocation of this AWS Lambda function InvokeFindLambdaFnRole: Type: AWS::IAM::Role Properties: Description: !Sub "For ${AWS::Region} region" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: { Service: scheduler.amazonaws.com } Action: sts:AssumeRole Condition: ArnEquals: "aws:SourceArn": !GetAtt FindLambdaFnSchedGrp.Arn # Must be a schedule group, not an individual schedule! Policies: - PolicyName: LambdaInvoke PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: lambda:InvokeFunction Resource: !GetAtt FindLambdaFn.Arn - Fn::If: - SqsKmsKeyCustom - PolicyName: SqsKmsEncryptNoteComplementsQueuePolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#send-to-encrypted-queue - kms:GenerateDataKey - kms:Decrypt # To verify a new data key! Resource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${SqsKmsKey}" Condition: StringEquals: { "kms:ViaService": !Sub "sqs.${AWS::Region}.amazonaws.com" } - !Ref AWS::NoValue FindLambdaFnSched2: Type: AWS::Scheduler::Schedule Properties: GroupName: !Ref FindLambdaFnSchedGrp Description: >- Every 10 minutes (do not change!): invoke "Find" AWS Lambda function ScheduleExpressionTimezone: UTC ScheduleExpression: "cron(01,11,21,31,41,51 * * * ? *)" # cron(minutes hours day-of-month month day-of-week year) # https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html#cron-based FlexibleTimeWindow: { Mode: "OFF" } # Quotes required! Target: RoleArn: !GetAtt InvokeFindLambdaFnRole.Arn Arn: !GetAtt FindLambdaFn.Arn RetryPolicy: MaximumRetryAttempts: 0 # ReservedConcurrentExecutions >= 0 MaximumEventAgeInSeconds: 300 # 5 minutes (future use) DeadLetterConfig: { Arn: !GetAtt ErrorQueue.Arn } State: !If [ EnableTrue, ENABLED, DISABLED ] DoLambdaFnLogGrp: Type: AWS::Logs::LogGroup Properties: RetentionInDays: !Ref LogRetentionInDays KmsKeyId: Fn::If: - CloudWatchLogsKmsKeyBlank - !Ref AWS::NoValue - !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${CloudWatchLogsKmsKey}" DoLambdaFn: Type: AWS::Lambda::Function Properties: Role: !GetAtt DoLambdaFnRole.Arn ReservedConcurrentExecutions: Fn::If: - DoLambdaFnReservedConcurrentExecutionsOff - !Ref AWS::NoValue - !Ref DoLambdaFnReservedConcurrentExecutions Timeout: !Ref DoLambdaFnTimeoutSecs MemorySize: !Ref DoLambdaFnMemoryMB LoggingConfig: LogGroup: !Ref DoLambdaFnLogGrp LogFormat: JSON SystemLogLevel: WARN ApplicationLogLevel: !Ref LogLevel TracingConfig: { Mode: PassThrough } Architectures: - arm64 Runtime: python3.13 # To avoid making users build a source bundle and distribute it to a # bucket in every target region (an AWS Lambda requirement when using # S3), use common, inline source code for both functions... Environment: Variables: # Referenced before handler for either Lambda function is invoked: "QUEUE_URL": !Ref OperationQueue "QUEUE_MSG_BYTES_MAX": !Ref QueueMessageBytesMax "BACKUP_ROLE_ARN": !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/${BackupRoleName}" Handler: index.lambda_handler_do Code: ZipFile: | #!/usr/bin/env python3 """Start, stop and back up AWS resources tagged with cron schedules github.com/sqlxpert/lights-off-aws GPLv3 Copyright Paul Marcelin """ import os import logging import datetime import re import json import botocore import boto3 logger = logging.getLogger() # Skip "credentials in environment" INFO message, unavoidable in AWS Lambda: logging.getLogger("botocore").setLevel(logging.WARNING) def environ_int(environ_var_name): """Take name of an environment variable, return its integer value """ return int(os.environ[environ_var_name]) SCHED_DELIMS = r"\ +" # Exposed space must be escaped for re.VERBOSE SCHED_TERMS = rf"([^ ]+{SCHED_DELIMS})*" # Unescaped space inside char class SCHED_REGEXP_STRFTIME_FMT = rf""" (^|{SCHED_DELIMS}) ( # Specific monthly or weekly day and time, or... (dTH:M=%d|uTH:M=%u)T%H:%M | # Day wildcard, specific day, or specific weekday, any other terms, and... (d=(_|%d)|u=%u){SCHED_DELIMS}{SCHED_TERMS} ( # Specific daily time, or... H:M=%H:%M | # Hour wildcard or specific hour, any other terms, and specific minute. H=(_|%H){SCHED_DELIMS}{SCHED_TERMS}M=%M ) ) ({SCHED_DELIMS}|$) """ QUEUE_URL = os.environ["QUEUE_URL"] QUEUE_MSG_BYTES_MAX = environ_int("QUEUE_MSG_BYTES_MAX") QUEUE_MSG_FMT_VERSION = "01" ARN_DELIM = ":" BACKUP_ROLE_ARN = os.environ["BACKUP_ROLE_ARN"] ARN_PARTS = BACKUP_ROLE_ARN.split(ARN_DELIM) # arn:partition:service:region:account-id:resource-type/resource-id # [0] [1] [2] [3] [4] [5] # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html#configuration-envvars-runtime ARN_PARTS[3] = os.environ.get("AWS_REGION", os.environ["AWS_DEFAULT_REGION"]) # 1. Helpers ################################################################# def log(entry_type, entry_value, log_level=logging.INFO): """Take type and value, and emit a JSON-format log entry """ entry_value_out = json.loads(json.dumps(entry_value, default=str)) # Avoids "Object of type datetime is not JSON serializable" in # https://github.com/aws/aws-lambda-python-runtime-interface-client/blob/9efb462/awslambdaric/lambda_runtime_log_utils.py#L109-L135 # # The JSON encoder in the AWS Lambda Python runtime isn't configured to # serialize datatime values in responses returned by AWS's own Python SDK! # # Alternative considered: # https://docs.powertools.aws.dev/lambda/python/latest/core/logger/ logger.log( log_level, "", extra={"type": entry_type, "value": entry_value_out} ) def sqs_send_message_log( cycle_start_str, send_kwargs, result, result_type, log_level ): """Log scheduled start (on error), send_message kwargs, and outcome """ if log_level > logging.INFO: log("START", cycle_start_str, log_level) log("KWARGS_SQS_SEND_MESSAGE", send_kwargs, log_level) log(result_type, result, log_level) def op_log(event, op_msg, result, result_type, log_level): """Log Lambda event (on error), SQS message (operation), and outcome """ if log_level > logging.INFO: log("LAMBDA_EVENT", event, log_level) log("SQS_MESSAGE", op_msg, log_level) log(result_type, result, log_level) def assess_op_msg(op_msg): """Take an operation queue message, return error message, type, retry flag """ result = None result_type = "" retry = True if msg_attr_str_decode(op_msg, "version") != QUEUE_MSG_FMT_VERSION: result = "Unrecognized operation queue message format" result_type = "WRONG_QUEUE_MSG_FMT" retry = False elif ( int(msg_attr_str_decode(op_msg, "expires")) < int(datetime.datetime.now(datetime.timezone.utc).timestamp()) ): result = ( "Schedule fewer operations per 10-minute cycle or " "increase DoLambdaFnMaximumConcurrency in CloudFormation" ) result_type = "EXPIRED_OP" retry = False return (result, result_type, retry) def assess_op_except(svc, op_method_name, misc_except): """Take an operation and an exception, return retry flag and log level botocore.exceptions.ClientError is general but statically-defined, making comparison easier, in a multi-service context, than for service-specific but dynamically-defined exceptions like boto3.Client("rds").exceptions.InvalidDBClusterStateFault and boto3.Client("rds").exceptions.InvalidDBInstanceStateFault https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services """ retry = True log_level = logging.ERROR if isinstance(misc_except, botocore.exceptions.ClientError): verb = op_method_name.split("_")[0] err_dict = getattr(misc_except, "response", {}).get("Error", {}) err_msg = err_dict.get("Message") match (svc, err_dict.get("Code")): case ("cloudformation", "ValidationError") if ( "No updates are to be performed." == err_msg ): retry = False log_level = logging.INFO # Idempotent update_stack (after a recent external update) case ("rds", "InvalidDBClusterStateFault") if ( ((verb == "start") and "is in available" in err_msg) or f"is in {verb}" in err_msg ): retry = False log_level = logging.INFO # Idempotent # start_db_cluster "in available[ state]" or "in start[ing state]" or # stop__db_cluster "in stop[ped state]" or "in stop[ping state]" case ("rds", "InvalidDBInstanceState"): # Fault suffix is missing here! retry = False # Can't decide between idempotent start_db_instance / stop_db_instance # (common) or truly erroneous state (rare), because message does not # mention current, invalid state. Log as potential error, but do not # retry (avoids a duplicate error queue entry). return (retry, log_level) def tag_key_join(tag_key_words): """Take a tuple of strings, add a prefix, join, and return a tag key """ return "-".join(("sched", ) + tag_key_words) def cycle_start_end(datetime_in, cycle_minutes=10, cutoff_minutes=9): """Take a datetime, return 10-minute floor and ceiling less 1 minute """ cycle_minutes_int = int(cycle_minutes) cycle_start = datetime_in.replace( minute=(datetime_in.minute // cycle_minutes_int) * cycle_minutes_int, second=0, microsecond=0, ) cycle_cutoff = cycle_start + datetime.timedelta(minutes=cutoff_minutes) return (cycle_start, cycle_cutoff) def msg_attrs_str_encode(attr_pairs): """Take list of string name, value pairs, return SQS MessageAttributes dict """ return { attr_name: {"DataType": "String", "StringValue": attr_value} for (attr_name, attr_value) in attr_pairs } def msg_attr_str_decode(msg, attr_name): """Take an SQS message, return value of a string attribute (must be present) """ return msg["messageAttributes"][attr_name]["stringValue"] svc_clients = {} def svc_client_get(svc): """Take an AWS service, return a boto3 client, creating it if needed boto3 method references can only be resolved at run-time, against an instance of an AWS service's Client class. http://boto3.readthedocs.io/en/latest/guide/events.html#extensibility-guide Alternatives considered: https://github.com/boto/boto3/issues/3197#issue-1175578228 https://github.com/aws-samples/boto-session-manager-project """ if svc not in svc_clients: svc_clients[svc] = boto3.client( svc, config=botocore.config.Config(retries={"mode": "standard"}) ) return svc_clients[svc] # 2. Custom Classes ########################################################## # See rsrc_types_init() for usage examples. class AWSRsrcType(): # pylint: disable=too-many-instance-attributes """AWS resource type, with identification properties and various operations """ members = {} # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__( self, svc, rsrc_type_words, ops_dict, rsrc_id_key_suffix="Id", arn_key_suffix="Arn", tags_key="Tags", status_filter_pair=() ): self.svc = svc self.name_in_methods = "_".join(rsrc_type_words).lower() self.name_in_keys = "".join(rsrc_type_words) self.rsrc_id_key = f"{self.name_in_keys}{rsrc_id_key_suffix}" if arn_key_suffix: self.arn_prefix = "" self.arn_key = f"{self.name_in_keys}{arn_key_suffix}" else: self.arn_prefix = ARN_DELIM.join( ARN_PARTS[0:2] + [svc] + ARN_PARTS[3:5] + [f"{self.name_in_methods}/"] ) self.arn_key = self.rsrc_id_key self.tags_key = tags_key self.ops = {} for (op_tag_key_words, op_properties) in ops_dict.items(): op = AWSOp.new(self, op_tag_key_words, **op_properties) self.ops[op.tag_key] = op self.describe_kwargs = {} if status_filter_pair: self.describe_kwargs["Filters"] = [ {"Name": filter_name, "Values": list(filter_values)} for (filter_name, filter_values) in [status_filter_pair, ("tag-key", self.ops_tag_keys)] ] self.__class__.members[(svc, self.name_in_keys)] = self # Register me! def __str__(self): return " ".join([self.__class__.__name__, self.svc, self.name_in_keys]) @property def ops_tag_keys(self): """Return tag keys for all operations on this resource type """ return self.ops.keys() def rsrc_id(self, rsrc): """Take 1 describe_ result, return the resource ID """ return rsrc[self.rsrc_id_key] def arn(self, rsrc): """Take 1 describe_ result, return the ARN """ return f"{self.arn_prefix}{rsrc[self.arn_key]}" def get_describe_pages(self): """Return an iterator over pages of boto3 describe_ responses """ return svc_client_get(self.svc).get_paginator( f"describe_{self.name_in_methods}s" ).paginate(**self.describe_kwargs) def get_rsrcs(self): """Return an iterator over individual boto3 describe_ items """ return ( rsrc for page in self.get_describe_pages() for rsrc in page.get(f"{self.name_in_keys}s", []) ) def rsrc_tags_list(self, rsrc): """Take 1 describe_ result, return raw resource tags """ return rsrc.get(self.tags_key, []) # Key may be missing if no tags def op_tags_match(self, rsrc, sched_regexp): """Scan 1 resource's tags to find operations scheduled for current cycle """ ops_tag_keys = self.ops_tag_keys op_tags_matched = [] for tag_dict in self.rsrc_tags_list(rsrc): tag_key = tag_dict["Key"] if tag_key in ops_tag_keys and sched_regexp.search(tag_dict["Value"]): op_tags_matched.append(tag_key) return op_tags_matched def rsrcs_find(self, sched_regexp, cycle_start_str, cycle_cutoff_epoch_str): """Find resources to operate on, and send details to queue """ for rsrc in self.get_rsrcs(): op_tags_matched = self.op_tags_match(rsrc, sched_regexp) op_tags_matched_count = len(op_tags_matched) if op_tags_matched_count == 1: op = self.ops[op_tags_matched[0]] op.msg_send_to_queue(rsrc, cycle_start_str, cycle_cutoff_epoch_str) elif op_tags_matched_count > 1: log("START", cycle_start_str, logging.ERROR) log( "MULTIPLE_OPS", {"arn": self.arn(rsrc), "tag_keys": op_tags_matched}, logging.ERROR ) class AWSRsrcTypeEc2Inst(AWSRsrcType): """EC2 instance """ def get_rsrcs(self): return ( instance for page in self.get_describe_pages() for reservation in page.get("Reservations", []) for instance in reservation.get("Instances", []) ) class AWSOp(): """Operation on 1 AWS resource """ def __init__(self, rsrc_type, tag_key_words, **kwargs): self.tag_key = tag_key_join(tag_key_words) self.svc = rsrc_type.svc self.rsrc_id = rsrc_type.rsrc_id verb = kwargs.get("verb", tag_key_words[0]) # Default: 1st word self.method_name = f"{verb}_{rsrc_type.name_in_methods}" self.kwarg_rsrc_id_key = rsrc_type.rsrc_id_key self.kwargs_add = kwargs.get("kwargs_add", {}) @staticmethod def new(rsrc_type, tag_key_words, **kwargs): """Create an op of the requested, appropriate, or default (sub)class """ if "class" in kwargs: op_class = kwargs["class"] elif tag_key_words == ("backup", ): op_class = AWSOpBackUp else: op_class = AWSOp return op_class(rsrc_type, tag_key_words, **kwargs) def kwarg_rsrc_id(self, rsrc): """Transfer resource ID from a describe_ result to another method's kwarg """ return {self.kwarg_rsrc_id_key: self.rsrc_id(rsrc)} def op_kwargs(self, rsrc, cycle_start_str): # pylint: disable=unused-argument """Take a describe_ result, return another method's kwargs """ op_kwargs_out = self.kwarg_rsrc_id(rsrc) op_kwargs_out.update(self.kwargs_add) return op_kwargs_out def msg_send_to_queue(self, rsrc, cycle_start_str, cycle_cutoff_epoch_str): """Send 1 operation message to the SQS queue """ op_kwargs = self.op_kwargs(rsrc, cycle_start_str) if op_kwargs: send_kwargs = { "QueueUrl": QUEUE_URL, "MessageAttributes": msg_attrs_str_encode(( ("version", QUEUE_MSG_FMT_VERSION), ("expires", cycle_cutoff_epoch_str), ("start", cycle_start_str), ("svc", self.svc), ("op_method_name", self.method_name), )), "MessageBody": op_kwargs, # Raw only for logging in case of an exception during JSON encoding } result = None result_type = "" log_level = logging.ERROR try: msg_body = json.dumps(op_kwargs) send_kwargs.update({"MessageBody": msg_body, }) if QUEUE_MSG_BYTES_MAX < len(bytes(msg_body, "utf-8")): result = "Increase QueueMessageBytesMax in CloudFormation" result_type = "QUEUE_MSG_TOO_LONG" else: result = svc_client_get("sqs").send_message(**send_kwargs) result_type = "AWS_RESPONSE" log_level = logging.INFO except Exception as misc_except: # pylint: disable=broad-exception-caught result = misc_except result_type = "EXCEPTION" sqs_send_message_log( cycle_start_str, send_kwargs, result, result_type, log_level ) def __str__(self): return " ".join([ self.__class__.__name__, self.tag_key, self.svc, self.method_name ]) class AWSOpMultipleRsrcs(AWSOp): """Operation on multiple AWS resources of the same type """ def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, **kwargs) self.method_name = self.method_name + "s" def kwarg_rsrc_id(self, rsrc): """Transfer resource ID from a describe_ result to a singleton list kwarg One at a time for consistency and to avoid partial completion risk """ return {f"{self.kwarg_rsrc_id_key}s": [self.rsrc_id(rsrc)]} class AWSOpUpdateStack(AWSOp): """CloudFormation stack update operation """ def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, verb="update", **kwargs) # Use of final template instead of original makes this incompatible with # CloudFormation "transforms". describe_stacks does not return templates. self.kwargs_add.update({ "UsePreviousTemplate": True, "RetainExceptOnCreate": True, }) # Use previous parameter values, except for: # Param Value # Key Out # tag_key sched-set-Enable-true # tag_key sched-set-Enable-false # tag_key_words [-2] [-1] self.changing_param_key = tag_key_words[-2] self.changing_param_value_out = tag_key_words[-1] def op_kwargs(self, rsrc, cycle_start_str): """Take 1 describe_stacks result, return update_stack kwargs An empty dict indicates that no stack update is needed. """ op_kwargs_out = {} params_out = [] if rsrc.get("StackStatus") in ( "UPDATE_COMPLETE", "CREATE_COMPLETE", ): for param in rsrc.get("Parameters", []): param_key = param["ParameterKey"] param_out = { "ParameterKey": param_key, "UsePreviousValue": True, } if param_key == self.changing_param_key: if param.get("ParameterValue") == self.changing_param_value_out: break # One time, if changing_param is present and not already up-to-date param_out.update({ "UsePreviousValue": False, "ParameterValue": self.changing_param_value_out, }) op_kwargs_out = super().op_kwargs(rsrc, cycle_start_str) op_kwargs_out.update({ "ClientRequestToken": f"{self.tag_key}-{cycle_start_str}", "Parameters": params_out, # Continue updating dict in-place }) capabilities = rsrc.get("Capabilities") if capabilities: op_kwargs_out["Capabilities"] = capabilities params_out.append(param_out) else: log( "STACK_STATUS_IRREGULAR", "Fix manually until UPDATE_COMPLETE", logging.ERROR ) log("AWS_RESPONSE_PART", rsrc, logging.ERROR) return op_kwargs_out class AWSOpBackUp(AWSOp): """On-demand AWS Backup operation """ backup_kwargs_add = None lifecycle_base = None @classmethod def backup_kwargs_add_init(cls): """Populate start_backup_job kwargs and base lifecycle, if not yet done """ if not cls.backup_kwargs_add: cls.backup_kwargs_add = { "IamRoleArn": BACKUP_ROLE_ARN, "BackupVaultName": os.environ["BACKUP_VAULT_NAME"], "StartWindowMinutes": environ_int("BACKUP_START_WINDOW_MINUTES"), "CompleteWindowMinutes": environ_int( "BACKUP_COMPLETE_WINDOW_MINUTES" ), } cls.lifecycle_base = {} cold_storage_after_days = environ_int("BACKUP_COLD_STORAGE_AFTER_DAYS") if cold_storage_after_days > 0: cls.lifecycle_base.update({ "OptInToArchiveForSupportedResources": True, "MoveToColdStorageAfterDays": cold_storage_after_days, }) delete_after_days = environ_int("BACKUP_DELETE_AFTER_DAYS") if delete_after_days > 0: cls.lifecycle_base["DeleteAfterDays"] = delete_after_days # pylint: disable=unsupported-assignment-operation def __init__(self, rsrc_type, tag_key_words, **kwargs): super().__init__(rsrc_type, tag_key_words, **kwargs) self.rsrc_id = rsrc_type.arn self.svc = "backup" self.method_name = "start_backup_job" self.kwarg_rsrc_id_key = "ResourceArn" self.__class__.backup_kwargs_add_init() self.kwargs_add.update(self.__class__.backup_kwargs_add) def op_kwargs(self, rsrc, cycle_start_str): """Take a describe_ result, return start_backup_job kwargs """ op_kwargs_out = super().op_kwargs(rsrc, cycle_start_str) op_kwargs_out.update({ "IdempotencyToken": f"{cycle_start_str},{self.rsrc_id(rsrc)}", # As of May, 2025, AWS Backup treats calls to back up different # resources as identical so long as IdempotencyToken matches! This is # contrary to the documentation ("otherwise identical calls"). To assure # uniqueness, combine scheduled start time, ARN. # https://docs.aws.amazon.com/aws-backup/latest/devguide/API_StartBackupJob.html#Backup-StartBackupJob-request-IdempotencyToken "RecoveryPointTags": {tag_key_join(("time", )): cycle_start_str}, }) lifecycle = dict(self.lifecycle_base) # Updatable copy (future need) if lifecycle: op_kwargs_out["Lifecycle"] = lifecycle return op_kwargs_out # 3. Data-Driven Specifications ############################################## def rsrc_types_init(): """Create AWS resource type objects when needed, if not already done """ if not AWSRsrcType.members: AWSRsrcTypeEc2Inst( "ec2", ("Instance", ), { ("start", ): {"class": AWSOpMultipleRsrcs}, ("stop", ): {"class": AWSOpMultipleRsrcs}, ("hibernate", ): { "class": AWSOpMultipleRsrcs, "verb": "stop", "kwargs_add": {"Hibernate": True}, }, ("backup", ): {}, }, arn_key_suffix=None, status_filter_pair=( "instance-state-name", ("running", "stopping", "stopped") ) ) AWSRsrcType( "ec2", ("Volume", ), {("backup", ): {}}, arn_key_suffix=None, status_filter_pair=("status", ("available", "in-use")), ) AWSRsrcType( "rds", ("DB", "Instance"), { ("start", ): {}, ("stop", ): {}, ("backup", ): {}, }, rsrc_id_key_suffix="Identifier", tags_key="TagList", ) AWSRsrcType( "rds", ("DB", "Cluster"), { ("start", ): {}, ("stop", ): {}, ("backup", ): {}, }, rsrc_id_key_suffix="Identifier", tags_key="TagList", ) if "ENABLE_SCHED_CLOUDFORMATION_OPS" in os.environ: AWSRsrcType( "cloudformation", ("Stack", ), { ("set", "Enable", "true"): {"class": AWSOpUpdateStack}, ("set", "Enable", "false"): {"class": AWSOpUpdateStack}, }, rsrc_id_key_suffix="Name", arn_key_suffix="Id", ) # 4. Find Resources Lambda Function Handler ################################## def lambda_handler_find(event, context): # pylint: disable=unused-argument """Find and queue AWS resources for scheduled operations, based on tags """ log("LAMBDA_EVENT", event) (cycle_start, cycle_cutoff) = cycle_start_end( datetime.datetime.now(datetime.timezone.utc) ) # ISO 8601 basic, no puctuation (downstream requirement) cycle_start_str = cycle_start.strftime("%Y%m%dT%H%MZ") cycle_cutoff_epoch_str = str(int(cycle_cutoff.timestamp())) sched_regexp = re.compile( cycle_start.strftime(SCHED_REGEXP_STRFTIME_FMT), re.VERBOSE ) log("START", cycle_start_str) log("SCHED_REGEXP_VERBOSE", sched_regexp.pattern, logging.DEBUG) rsrc_types_init() for rsrc_type in AWSRsrcType.members.values(): try: rsrc_type.rsrcs_find( sched_regexp, cycle_start_str, cycle_cutoff_epoch_str ) except Exception as misc_except: # pylint: disable=broad-exception-caught log("EXCEPTION", misc_except, logging.ERROR) # Allow continue to next describe_ call (likely that permissions differ) # 5. "Do" Operations Lambda Function Handler ################################# def lambda_handler_do(event, context): # pylint: disable=unused-argument """Perform a queued operation on an AWS resource """ batch_item_failures = [] for op_msg in event.get("Records", []): sqs_msg_id = "" result = None result_type = "" retry = True log_level = logging.ERROR try: sqs_msg_id = op_msg["messageId"] (result, result_type, retry) = assess_op_msg(op_msg) if not result_type: svc = msg_attr_str_decode(op_msg, "svc") op_method_name = msg_attr_str_decode(op_msg, "op_method_name") op_kwargs = json.loads(op_msg["body"]) op_method = getattr(svc_client_get(svc), op_method_name) result = op_method(**op_kwargs) result_type = "AWS_RESPONSE" retry = False log_level = logging.INFO except Exception as misc_except: # pylint: disable=broad-exception-caught result = misc_except result_type = "EXCEPTION" (retry, log_level) = assess_op_except(svc, op_method_name, misc_except) op_log(event, op_msg, result, result_type, log_level) if retry and sqs_msg_id: batch_item_failures.append({"itemIdentifier": sqs_msg_id, }) # https://repost.aws/knowledge-center/lambda-sqs-report-batch-item-failures return {"batchItemFailures": batch_item_failures, } # ZIPFILE_END # Administrator should block other invocation of this AWS Lambda function DoLambdaFnInvokeLambdaPerm: Type: AWS::Lambda::Permission DependsOn: OperationQueuePol Properties: Action: lambda:InvokeFunction FunctionName: !Ref DoLambdaFn Principal: sqs.amazonaws.com SourceArn: !GetAtt OperationQueue.Arn DoLambdaFnSqsMapping: Type: AWS::Lambda::EventSourceMapping DependsOn: DoLambdaFnInvokeLambdaPerm Properties: EventSourceArn: !GetAtt OperationQueue.Arn BatchSize: !Ref DoLambdaFnBatchSize MaximumBatchingWindowInSeconds: 60 # Not much time is available to wait, inside a 10-minute cycle whose # first and last minutes are not used, which leaves only 8 minutes. # Note: >= 20 for long polling (lowest cost) FunctionName: !GetAtt DoLambdaFn.Arn ScalingConfig: MaximumConcurrency: !Ref DoLambdaFnMaximumConcurrency FunctionResponseTypes: - ReportBatchItemFailures Enabled: !Ref Enable