diff --git a/.autover/changes/e390422f-955d-4699-97cf-67725872e746.json b/.autover/changes/e390422f-955d-4699-97cf-67725872e746.json new file mode 100644 index 000000000..e71fd5f0e --- /dev/null +++ b/.autover/changes/e390422f-955d-4699-97cf-67725872e746.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.TestTool", + "Type": "Minor", + "ChangelogMessages": [ + "Add SQS event source support" + ] + } + ] +} \ No newline at end of file diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj index 2e3fd629c..1841f48fc 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Amazon.Lambda.TestTool.csproj @@ -1,8 +1,8 @@ - + - Exe + Exe A tool to help debug and test your .NET AWS Lambda functions locally. net8.0 enable @@ -17,7 +17,7 @@ dotnet-lambda-test-tool 0.9.1 NU5100 - Major + Major README.md @@ -26,7 +26,10 @@ + + + @@ -38,7 +41,7 @@ - + diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs index 09d03e856..f366c2f83 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/RunCommand.cs @@ -7,6 +7,7 @@ using Amazon.Lambda.TestTool.Extensions; using Amazon.Lambda.TestTool.Models; using Amazon.Lambda.TestTool.Processes; +using Amazon.Lambda.TestTool.Processes.SQSEventSource; using Amazon.Lambda.TestTool.Services; using Amazon.Lambda.TestTool.Services.IO; using Spectre.Console.Cli; @@ -22,6 +23,11 @@ public sealed class RunCommand( public const string LAMBDA_RUNTIME_API_PORT = "LAMBDA_RUNTIME_API_PORT"; public const string API_GATEWAY_EMULATOR_PORT = "API_GATEWAY_EMULATOR_PORT"; + /// + /// Task for the Lambda Runtime API. + /// + public Task LambdRuntimeApiTask { get; private set; } + /// /// The method responsible for executing the . /// @@ -31,10 +37,10 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS { EvaluateEnvironmentVariables(settings); - if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue) + if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig)) { throw new ArgumentException("At least one of the following parameters must be set: " + - "--lambda-emulator-port or --api-gateway-emulator-port"); + "--lambda-emulator-port, --api-gateway-emulator-port or --sqs-eventsource-config"); } var tasks = new List(); @@ -42,6 +48,7 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS if (settings.LambdaEmulatorPort.HasValue) { var testToolProcess = TestToolProcess.Startup(settings, cancellationTokenSource.Token); + LambdRuntimeApiTask = testToolProcess.RunningTask; tasks.Add(testToolProcess.RunningTask); if (!settings.NoLaunchWindow) @@ -74,10 +81,20 @@ public override async Task ExecuteAsync(CommandContext context, RunCommandS tasks.Add(apiGatewayEmulatorProcess.RunningTask); } - await Task.WhenAny(tasks); + if (!string.IsNullOrEmpty(settings.SQSEventSourceConfig)) + { + var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token); + tasks.Add(sqsEventSourceProcess.RunningTask); + } + + await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token)); return CommandReturnCodes.Success; } + catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested) + { + return CommandReturnCodes.Success; + } catch (Exception e) when (e.IsExpectedException()) { toolInteractiveService.WriteErrorLine(string.Empty); @@ -132,5 +149,15 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings) throw new ArgumentException($"Value for {API_GATEWAY_EMULATOR_PORT} environment variable was not a valid port number"); } } + + if (settings.SQSEventSourceConfig != null && settings.SQSEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase)) + { + var envVariable = settings.SQSEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length); + if (!environmentVariables.Contains(envVariable)) + { + throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty"); + } + settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString(); + } } } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs index cdabc377a..1be04a4dc 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Commands/Settings/RunCommandSettings.cs @@ -53,4 +53,12 @@ public sealed class RunCommandSettings : CommandSettings [CommandOption("--api-gateway-emulator-port ")] [Description("The port number used for the test tool's API Gateway emulator.")] public int? ApiGatewayEmulatorPort { get; set; } + + /// + /// The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=queue-url,FunctionName=function-name,VisibilityTimeout=100\". + /// Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout + /// + [CommandOption("--sqs-eventsource-config ")] + [Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=,FunctionName=,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")] + public string? SQSEventSourceConfig { get; set; } } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Constants.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Constants.cs index 11a940278..680f99398 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Constants.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Constants.cs @@ -84,4 +84,10 @@ public abstract class Constants /// The Visual Studio Marketplace link for the AWS Toolkit for Visual Studio. /// public const string LinkVsToolkitMarketplace = "https://marketplace.visualstudio.com/items?itemName=AmazonWebServices.AWSToolkitforVisualStudio2022"; + + /// + /// Prefix this is used for values of command line arguments that support the value being stored in an environment variable. + /// This used in the Aspire integration where it is often easier to pass configuration via environment variables. + /// + public const string ArgumentEnvironmentVariablePrefix = "env:"; } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs new file mode 100644 index 000000000..2125b6f9a --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs @@ -0,0 +1,242 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.Lambda.Model; +using Amazon.Lambda.SQSEvents; +using Amazon.Runtime; +using Amazon.SQS.Model; +using Amazon.SQS; +using System.Text.Json; +using Amazon.Lambda.TestTool.Services; + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected +/// Lambda function with the polled messages. +/// +public class SQSEventSourceBackgroundService : BackgroundService +{ + private static readonly List DefaultAttributesToReceive = new List { "All" }; + private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private readonly ILogger _logger; + private readonly IAmazonSQS _sqsClient; + private readonly ILambdaClient _lambdaClient; + private readonly SQSEventSourceBackgroundServiceConfig _config; + + /// + /// Constructs instance of . + /// + /// The logger + /// The SQS client used to poll messages from a queue. + /// The config of the service + /// The Lambda client that can use a different endpoint for each invoke request. + public SQSEventSourceBackgroundService(ILogger logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) + { + _logger = logger; + _sqsClient = sqsClient; + _config = config; + _lambdaClient = lambdaClient; + } + + private async Task GetQueueArn(CancellationToken stoppingToken) + { + var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = _config.QueueUrl, + AttributeNames = new List { "QueueArn" } + }, stoppingToken); + + return response.QueueARN; + } + + /// + /// Execute the SQSEventSourceBackgroundService. + /// + /// CancellationToken used to end the service. + /// Task for the background service. + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // The queue arn is needed for creating the Lambda event. + var queueArn = await GetQueueArn(stoppingToken); + _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); + while (!stoppingToken.IsCancellationRequested) + { + try + { + _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); + // Read a message from the queue using the ExternalCommands console application. + var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = _config.QueueUrl, + WaitTimeSeconds = 20, + MessageAttributeNames = DefaultAttributesToReceive, + MessageSystemAttributeNames = DefaultAttributesToReceive, + MaxNumberOfMessages = _config.BatchSize, + VisibilityTimeout = _config.VisibilityTimeout, + }, stoppingToken); + + if (stoppingToken.IsCancellationRequested) + { + return; + } + if (response.Messages == null || response.Messages.Count == 0) + { + _logger.LogDebug("No messages received from while polling SQS"); + // Since there are no messages, sleep a bit to wait for messages to come. + await Task.Delay(200); + continue; + } + + var lambdaPayload = new + { + Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn) + }; + + var invokeRequest = new InvokeRequest + { + InvocationType = InvocationType.RequestResponse, + FunctionName = _config.FunctionName, + Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) + }; + + _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); + var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); + + if (lambdaResponse.FunctionError != null) + { + _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); + continue; + } + + if (!_config.DisableMessageDelete) + { + List messagesToDelete; + if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) + { + var partialResponse = JsonSerializer.Deserialize(lambdaResponse.Payload); + if (partialResponse == null) + { + lambdaResponse.Payload.Position = 0; + using var reader = new StreamReader(lambdaResponse.Payload); + var payloadString = reader.ReadToEnd(); + _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); + continue; + } + + if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) + { + _logger.LogDebug("Partial SQS response received with no failures"); + messagesToDelete = response.Messages; + } + else + { + _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); + messagesToDelete = new List(); + foreach (var message in response.Messages) + { + if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) + { + messagesToDelete.Add(message); + } + } + } + } + else + { + _logger.LogDebug("No partial response received. All messages eligible for deletion"); + messagesToDelete = response.Messages; + } + + if (messagesToDelete.Count > 0) + { + var deleteRequest = new DeleteMessageBatchRequest + { + QueueUrl = _config.QueueUrl, + Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() + }; + + _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); + await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); + } + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception e) + { + _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); + + // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. + await Task.Delay(3000); + } + } + } + + /// + /// Convert from the SDK's list of messages to the Lambda event's SQS message type. + /// + /// List of messages using the SDK's .NET type + /// The aws region the messages came from. + /// The SQS queue arn the messages came from. + /// List of messages using the Lambda event's .NET type. + internal static List ConvertToLambdaMessages(List messages, string awsRegion, string queueArn) + { + return messages.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList(); + } + + /// + /// Convert from the SDK's SQS message to the Lambda event's SQS message type. + /// + /// Message using the SDK's .NET type + /// The aws region the message came from. + /// The SQS queue arn the message came from. + /// Messages using the Lambda event's .NET type. + internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn) + { + var lambdaMessage = new SQSEvent.SQSMessage + { + AwsRegion = awsRegion, + Body = message.Body, + EventSource = "aws:sqs", + EventSourceArn = queueArn, + Md5OfBody = message.MD5OfBody, + Md5OfMessageAttributes = message.MD5OfMessageAttributes, + MessageId = message.MessageId, + ReceiptHandle = message.ReceiptHandle, + }; + + if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) + { + lambdaMessage.MessageAttributes = new Dictionary(); + foreach (var kvp in message.MessageAttributes) + { + var lambdaAttribute = new SQSEvent.MessageAttribute + { + DataType = kvp.Value.DataType, + StringValue = kvp.Value.StringValue, + BinaryValue = kvp.Value.BinaryValue + }; + + lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); + } + } + + if (message.Attributes != null && message.Attributes.Count > 0) + { + lambdaMessage.Attributes = message.Attributes; + } + + return lambdaMessage; + } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundServiceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundServiceConfig.cs new file mode 100644 index 000000000..b07cc386c --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundServiceConfig.cs @@ -0,0 +1,42 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// Configuration for the service. +/// +public class SQSEventSourceBackgroundServiceConfig +{ + /// + /// The batch size to read and send to Lambda function. This is the upper bound of messages to read and send. + /// SQS will return with less than batch size if there are not enough messages in the queue. + /// + public required int BatchSize { get; init; } = SQSEventSourceProcess.DefaultBatchSize; + + /// + /// If true the will skip deleting messages from the queue after the Lambda function returns. + /// + public required bool DisableMessageDelete { get; init; } + + /// + /// The Lambda function to send the SQS messages to delete to. + /// + public required string FunctionName { get; init; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. The Lambda function identified by FunctionName must be listening for events from this endpoint. + /// + public required string LambdaRuntimeApi { get; init; } + + /// + /// The SQS queue url to poll for messages. + /// + public required string QueueUrl { get; init; } + + /// + /// The visibility timeout used for messages read. This is the length the message will not be visible to be read + /// again once it is returned in the receive call. + /// + public required int VisibilityTimeout { get; init; } = SQSEventSourceProcess.DefaultVisiblityTimeout; +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceConfig.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceConfig.cs new file mode 100644 index 000000000..471fdfcc8 --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceConfig.cs @@ -0,0 +1,54 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// This class represents the input values from the user. +/// +internal class SQSEventSourceConfig +{ + /// + /// The batch size to read and send to Lambda function. This is the upper bound of messages to read and send. + /// SQS will return with less than batch size if there are not enough messages in the queue. + /// + public int? BatchSize { get; set; } + + /// + /// If true the will skip deleting messages from the queue after the Lambda function returns. + /// + public bool? DisableMessageDelete { get; set; } + + /// + /// The Lambda function to send the SQS messages to delete to. + /// If not set the default function will be used. + /// + public string? FunctionName { get; set; } + + /// + /// The endpoint where the emulated Lambda runtime API is running. The Lambda function identified by FunctionName must be listening for events from this endpoint. + /// If not set the current Test Tool instance will be used assuming it is running a Lambda runtime api emulator. + /// + public string? LambdaRuntimeApi { get; set; } + + /// + /// The AWS profile to use for credentials for fetching messages from the queue. + /// + public string? Profile { get; set; } + + /// + /// The queue url where messages should be polled from. + /// + public string? QueueUrl { get; set; } + + /// + /// The AWS region the queue is in. + /// + public string? Region { get; set; } + + /// + /// The visibility timeout used for messages read. This is the length the message will not be visible to be read + /// again once it is returned in the receive call. + /// + public int? VisibilityTimeout { get; set; } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceProcess.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceProcess.cs new file mode 100644 index 000000000..2b7272cad --- /dev/null +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceProcess.cs @@ -0,0 +1,240 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.SQS; +using Amazon.Lambda.TestTool.Commands.Settings; +using Amazon.Lambda.TestTool.Services; +using System.Text.Json; + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// Process for handling SQS event source for Lambda functions. +/// +public class SQSEventSourceProcess +{ + internal const int DefaultBatchSize = 10; + internal const int DefaultVisiblityTimeout = 30; + + /// + /// The Parent task for all the tasks started for each list SQS event source. + /// + public required Task RunningTask { get; init; } + + /// + /// Startup SQS event sources + /// + /// The settings to launch the tool. + /// CancellationToken to end + /// This instance of that was started. + /// Thrown when the config is invalid. + public static SQSEventSourceProcess Startup(RunCommandSettings settings, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(settings.SQSEventSourceConfig)) + { + throw new InvalidOperationException($"The {nameof(RunCommandSettings.SQSEventSourceConfig)} can not be null when starting the SQS event source process"); + } + + var sqsEventSourceConfigs = LoadSQSEventSourceConfig(settings.SQSEventSourceConfig); + + var tasks = new List(); + + // Create a separate SQSEventSourceBackgroundService for each SQS event source config listed in the SQSEventSourceConfig + foreach (var sqsEventSourceConfig in sqsEventSourceConfigs) + { + var builder = Host.CreateApplicationBuilder(); + + var sqsConfig = new AmazonSQSConfig(); + if (!string.IsNullOrEmpty(sqsEventSourceConfig.Profile)) + { + sqsConfig.Profile = new Profile(sqsEventSourceConfig.Profile); + } + + if (!string.IsNullOrEmpty(sqsEventSourceConfig.Region)) + { + sqsConfig.RegionEndpoint = RegionEndpoint.GetBySystemName(sqsEventSourceConfig.Region); + } + + var sqsClient = new AmazonSQSClient(sqsConfig); + builder.Services.AddSingleton(sqsClient); + builder.Services.AddSingleton(); + + var queueUrl = sqsEventSourceConfig.QueueUrl; + if (string.IsNullOrEmpty(queueUrl)) + { + throw new InvalidOperationException("QueueUrl is a required property for SQS event source config"); + } + + var lambdaRuntimeApi = sqsEventSourceConfig.LambdaRuntimeApi; + if (string.IsNullOrEmpty(lambdaRuntimeApi)) + { + if (!settings.LambdaEmulatorPort.HasValue) + { + throw new InvalidOperationException("No Lambda runtime api endpoint was given as part of the SQS event source config and the current " + + "instance of the test tool is not running the Lambda runtime api. Either provide a Lambda runtime api endpoint or set a port for " + + "the lambda runtime api when starting the test tool."); + } + lambdaRuntimeApi = $"http://{settings.LambdaEmulatorHost}:{settings.LambdaEmulatorPort}/"; + } + + var backgroundServiceConfig = new SQSEventSourceBackgroundServiceConfig + { + BatchSize = sqsEventSourceConfig.BatchSize ?? DefaultBatchSize, + DisableMessageDelete = sqsEventSourceConfig.DisableMessageDelete ?? false, + FunctionName = sqsEventSourceConfig.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName, + LambdaRuntimeApi = lambdaRuntimeApi, + QueueUrl = queueUrl, + VisibilityTimeout = sqsEventSourceConfig.VisibilityTimeout ?? DefaultVisiblityTimeout + }; + + builder.Services.AddSingleton(backgroundServiceConfig); + builder.Services.AddHostedService(); + + var app = builder.Build(); + var task = app.RunAsync(cancellationToken); + tasks.Add(task); + } + + return new SQSEventSourceProcess + { + RunningTask = Task.WhenAll(tasks) + }; + } + + /// + /// Load the SQS event source configs. The format of the config can be either JSON or comma delimited key pairs. + /// With the JSON format it is possible to configure multiple event sources but special care is required + /// escaping the quotes. The JSON format also provides consistency with the API Gateway configuration. + /// + /// The comma-delimited key pairs allows users to configure a single SQS event source without having + /// to deal with escaping quotes. + /// + /// If the value of sqsEventSourceConfigString points to a file that exists the contents of the file + /// will be read and sued for the value for SQS event source config. + /// + /// The SQS event source config to load. + /// The list of SQS event source configs. + /// Thrown when the config is invalid. + internal static List LoadSQSEventSourceConfig(string sqsEventSourceConfigString) + { + if (File.Exists(sqsEventSourceConfigString)) + { + sqsEventSourceConfigString = File.ReadAllText(sqsEventSourceConfigString); + } + + sqsEventSourceConfigString = sqsEventSourceConfigString.Trim(); + + List? configs = null; + + var jsonOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }; + + // Check to see if the config is in JSON array format. + // The JSON format provides consistency with the API Gateway config style. + if (sqsEventSourceConfigString.StartsWith('[')) + { + try + { + configs = JsonSerializer.Deserialize>(sqsEventSourceConfigString, jsonOptions); + if (configs == null) + { + throw new InvalidOperationException("Failed to parse SQS event source JSON config: " + sqsEventSourceConfigString); + } + } + catch(JsonException e) + { + throw new InvalidOperationException("Failed to parse SQS event source JSON config: " + sqsEventSourceConfigString, e); + } + } + // Config is a single object JSON document. + // The JSON format provides consistency with the API Gateway config style. + else if (sqsEventSourceConfigString.StartsWith('{')) + { + try + { + var config = JsonSerializer.Deserialize(sqsEventSourceConfigString, jsonOptions); + if (config == null) + { + throw new InvalidOperationException("Failed to parse SQS event source JSON config: " + sqsEventSourceConfigString); + } + + configs = new List { config }; + } + catch (JsonException e) + { + throw new InvalidOperationException("Failed to parse SQS event source JSON config: " + sqsEventSourceConfigString, e); + } + } + // Config is a QueueUrl only. The current test tool instance will be assumed the Lambda runtime api and the + // messages will be sent to the default function. Support this format allows for an + // simple CLI experience of just providing a single value for the default scenario. + else if (Uri.TryCreate(sqsEventSourceConfigString, UriKind.Absolute, out _)) + { + configs = new List { new SQSEventSourceConfig { QueueUrl = sqsEventSourceConfigString } }; + } + // Config is in comma delimited key value pair format. This format allows setting all the parameters without having + // to deal with escaping quotes like the JSON format. + else + { + var config = new SQSEventSourceConfig(); + var tokens = sqsEventSourceConfigString.Split(','); + foreach(var token in tokens) + { + if (string.IsNullOrWhiteSpace(token)) + continue; + + var keyValuePair = token.Split('='); + if (keyValuePair.Length != 2) + { + throw new InvalidOperationException("Failed to parse SQS event source config. Format should be \"QueueUrl=,FunctionName=,...\""); + } + + switch (keyValuePair[0].ToLower().Trim()) + { + case "batchsize": + if (!int.TryParse(keyValuePair[1].Trim(), out var batchSize)) + { + throw new InvalidOperationException("Value for batch size is not a formatted integer"); + } + config.BatchSize = batchSize; + break; + case "disablemessagedelete": + if (!bool.TryParse(keyValuePair[1].Trim(), out var disableMessageDelete)) + { + throw new InvalidOperationException("Value for disable message delete is not a formatted boolean"); + } + config.DisableMessageDelete = disableMessageDelete; + break; + case "functionname": + config.FunctionName = keyValuePair[1].Trim(); + break; + case "lambdaruntimeapi": + config.LambdaRuntimeApi = keyValuePair[1].Trim(); + break; + case "profile": + config.Profile = keyValuePair[1].Trim(); + break; + case "queueurl": + config.QueueUrl = keyValuePair[1].Trim(); + break; + case "region": + config.Region = keyValuePair[1].Trim(); + break; + case "visibilitytimeout": + if (!int.TryParse(keyValuePair[1].Trim(), out var visibilityTimeout)) + { + throw new InvalidOperationException("Value for visibility timeout is not a formatted integer"); + } + config.VisibilityTimeout = visibilityTimeout; + break; + } + } + + configs = new List { config }; + } + + return configs; + } +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Services/LambdaRuntimeAPI.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Services/LambdaRuntimeAPI.cs index 3f9666896..b05bb85ea 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Services/LambdaRuntimeAPI.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Services/LambdaRuntimeAPI.cs @@ -20,6 +20,8 @@ internal LambdaRuntimeApi(WebApplication app) { _runtimeApiDataStoreManager = app.Services.GetRequiredService(); + app.MapGet("/lambda-runtime-api/healthcheck", () => "health"); + app.MapPost("/2015-03-31/functions/function/invocations", (Delegate)PostEventDefaultFunction); app.MapPost("/2015-03-31/functions/{functionName}/invocations", PostEvent); @@ -119,7 +121,7 @@ public async Task GetNextInvocation(HttpContext ctx, string functionName) { var runtimeDataStore = _runtimeApiDataStoreManager.GetLambdaRuntimeDataStore(functionName); - EventContainer? activeEvent; + EventContainer? activeEvent = null; // A Lambda function should never call to get the next event till it was done // processing the active event and there is no more active event. If there @@ -132,7 +134,7 @@ public async Task GetNextInvocation(HttpContext ctx, string functionName) } else { - while (!runtimeDataStore.TryActivateEvent(out activeEvent)) + while (!ctx.RequestAborted.IsCancellationRequested && !runtimeDataStore.TryActivateEvent(out activeEvent)) { await Task.Delay(TimeSpan.FromMilliseconds(100)); } diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/appsettings.Development.json b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/appsettings.Development.json index 0b8026458..c95571f6b 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/appsettings.Development.json +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/appsettings.Development.json @@ -1,7 +1,7 @@ { "Logging": { "LogLevel": { - "Default": "Information" + "Default": "Error" } }, "DetailedErrors": true diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Amazon.Lambda.TestTool.IntegrationTests.csproj b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Amazon.Lambda.TestTool.IntegrationTests.csproj index 7c1381008..eda8904a5 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Amazon.Lambda.TestTool.IntegrationTests.csproj +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Amazon.Lambda.TestTool.IntegrationTests.csproj @@ -20,6 +20,7 @@ + diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/ApiGatewayResponseExtensionsAdditionalTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/ApiGatewayResponseExtensionsAdditionalTests.cs index d3019fdc5..22d95b857 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/ApiGatewayResponseExtensionsAdditionalTests.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/ApiGatewayResponseExtensionsAdditionalTests.cs @@ -39,7 +39,7 @@ public async Task ToHttpResponse_RestAPIGatewayV1DecodesBase64() var baseUrl = _fixture.GetAppropriateBaseUrl(ApiGatewayType.RestWithBinarySupport); var url = _fixture.GetRouteUrl(baseUrl, TestRoutes.Ids.DecodeParseBinary); - var actualResponse = await _httpClient.PostAsync(url, new StringContent(JsonSerializer.Serialize(testResponse))); + var actualResponse = await _httpClient.PostAsync(url, new StringContent(JsonSerializer.Serialize(testResponse)), new CancellationTokenSource(5000).Token); await _fixture.ApiGatewayTestHelper.AssertResponsesEqual(actualResponse, httpContext.Response); Assert.Equal(200, (int)actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -62,7 +62,7 @@ public async Task ToHttpResponse_HttpV1APIGatewayV1DecodesBase64() var baseUrl = _fixture.GetAppropriateBaseUrl(ApiGatewayType.HttpV1); var url = _fixture.GetRouteUrl(baseUrl, TestRoutes.Ids.ParseAndReturnBody); - var actualResponse = await _httpClient.PostAsync(url, new StringContent(JsonSerializer.Serialize(testResponse))); + var actualResponse = await _httpClient.PostAsync(url, new StringContent(JsonSerializer.Serialize(testResponse)), new CancellationTokenSource(5000).Token); await _fixture.ApiGatewayTestHelper.AssertResponsesEqual(actualResponse, httpContext.Response); Assert.Equal(200, (int)actualResponse.StatusCode); diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/BaseApiGatewayTest.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/BaseApiGatewayTest.cs index 9fae56de6..f277ffa2a 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/BaseApiGatewayTest.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/BaseApiGatewayTest.cs @@ -14,14 +14,19 @@ using System.Text; using System.Net.Http; using System.Net.Http.Headers; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using System.Threading; +using Microsoft.Extensions.Hosting; +using Amazon.SecurityToken; namespace Amazon.Lambda.TestTool.IntegrationTests; public abstract class BaseApiGatewayTest { protected readonly ITestOutputHelper TestOutputHelper; - protected readonly Mock MockEnvironmentManager; protected readonly Mock MockInteractiveService; + protected readonly Mock MockEnvironmentManager; protected readonly Mock MockRemainingArgs; protected CancellationTokenSource CancellationTokenSource; protected static readonly object TestLock = new(); @@ -83,7 +88,7 @@ protected async Task WaitForGatewayHealthCheck(int apiGatewayPort) { try { - var response = await client.GetAsync(healthUrl); + var response = await client.GetAsync(healthUrl, new CancellationTokenSource(10000).Token); if (response.IsSuccessStatusCode) { TestOutputHelper.WriteLine("API Gateway health check succeeded"); @@ -122,8 +127,8 @@ protected async Task TestEndpoint(string routeName, int api { "POST" => await client.PostAsync( $"http://localhost:{apiGatewayPort}/{routeName}", - new StringContent(payload ?? "hello world", Encoding.UTF8, new MediaTypeHeaderValue("text/plain"))), - "GET" => await client.GetAsync($"http://localhost:{apiGatewayPort}/{routeName}"), + new StringContent(payload ?? "hello world", Encoding.UTF8, new MediaTypeHeaderValue("text/plain")), new CancellationTokenSource(5000).Token), + "GET" => await client.GetAsync($"http://localhost:{apiGatewayPort}/{routeName}", new CancellationTokenSource(5000).Token), _ => throw new ArgumentException($"Unsupported HTTP method: {httpMethod}") }; } @@ -154,22 +159,48 @@ protected async Task TestEndpoint(string routeName, int api protected int GetFreePort() { - var random = new Random(); - var port = random.Next(49152, 65535); - var listener = new TcpListener(IPAddress.Loopback, port); - try - { - listener.Start(); - return port; - } - catch (SocketException) + Console.WriteLine("Looking for free port"); + var builder = WebApplication.CreateBuilder(); + builder.WebHost.UseUrls("http://127.0.0.1:0"); + var app = builder.Build(); + app.MapGet("/", () => "test"); + + CancellationTokenSource tokenSource = new CancellationTokenSource(); + tokenSource.CancelAfter(5000); + var runTask = app.RunAsync(tokenSource.Token); + var uri = new Uri(app.Urls.First()); + + using var client = new HttpClient(); + string? content = null; + + Console.WriteLine($"Testing port: {uri.Port}"); + var timeout = DateTime.UtcNow.AddSeconds(30); + while(DateTime.UtcNow < timeout) { - return GetFreePort(); + try + { + content = client.GetStringAsync(uri, tokenSource.Token).GetAwaiter().GetResult(); + Console.WriteLine("Port was successful"); + break; + } + catch + { + Thread.Sleep(100); + } } - finally + + if (!string.Equals(content, "test")) { - listener.Stop(); + Console.WriteLine("Port test failed trying again"); + var recursivePort = GetFreePort(); + tokenSource.Cancel(); + return recursivePort; } + + tokenSource.Cancel(); + Task.Delay(2000); + + return uri.Port; } protected async Task StartTestToolProcessWithNullEndpoint(ApiGatewayEmulatorMode apiGatewayMode, string routeName, int lambdaPort, int apiGatewayPort, CancellationTokenSource cancellationTokenSource) diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayHelper.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayHelper.cs index 2f1036951..5818bf3e0 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayHelper.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayHelper.cs @@ -15,13 +15,11 @@ public class ApiGatewayHelper { private readonly IAmazonAPIGateway _apiGatewayV1Client; private readonly IAmazonApiGatewayV2 _apiGatewayV2Client; - private readonly HttpClient _httpClient; public ApiGatewayHelper(IAmazonAPIGateway apiGatewayV1Client, IAmazonApiGatewayV2 apiGatewayV2Client) { _apiGatewayV1Client = apiGatewayV1Client; _apiGatewayV2Client = apiGatewayV2Client; - _httpClient = new HttpClient(); } public async Task WaitForApiAvailability(string apiId, string apiUrl, bool isHttpApi, int maxWaitTimeSeconds = 60) diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayTestHelper.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayTestHelper.cs index 3237a5345..b4584e198 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayTestHelper.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/Helpers/ApiGatewayTestHelper.cs @@ -24,7 +24,7 @@ public ApiGatewayTestHelper() httpContext.Response.Body = new MemoryStream(); await testResponse.ToHttpResponseAsync(httpContext, emulatorMode); var serialized = JsonSerializer.Serialize(testResponse); - var actualResponse = await _httpClient.PostAsync(apiUrl, new StringContent(serialized)); + var actualResponse = await _httpClient.PostAsync(apiUrl, new StringContent(serialized), new CancellationTokenSource(5000).Token); return (actualResponse, httpContext.Response); } @@ -34,7 +34,7 @@ public ApiGatewayTestHelper() testResponseHttpContext.Response.Body = new MemoryStream(); await testResponse.ToHttpResponseAsync(testResponseHttpContext); var serialized = JsonSerializer.Serialize(testResponse); - var actualResponse = await _httpClient.PostAsync(apiUrl, new StringContent(serialized)); + var actualResponse = await _httpClient.PostAsync(apiUrl, new StringContent(serialized), new CancellationTokenSource(5000).Token); return (actualResponse, testResponseHttpContext.Response); } diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/SQSEventSourceTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/SQSEventSourceTests.cs new file mode 100644 index 000000000..fc4aa1882 --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.IntegrationTests/SQSEventSourceTests.cs @@ -0,0 +1,447 @@ +using Amazon.Lambda.TestTool.Commands; +using Amazon.Lambda.TestTool.Commands.Settings; +using Amazon.SQS; +using Spectre.Console.Cli; +using Xunit; +using Xunit.Abstractions; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.Core; +using Amazon.Lambda.SQSEvents; +using Amazon.SQS.Model; +using Amazon.Lambda.TestTool.Tests.Common; +using Amazon.Lambda.TestTool.Tests.Common.Retries; +using Amazon.Lambda.TestTool.Services; +using Moq; + +namespace Amazon.Lambda.TestTool.IntegrationTests; + +[Collection("SQSEventSourceTests")] +public class SQSEventSourceTests : BaseApiGatewayTest +{ + public SQSEventSourceTests(ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + { + } + + [RetryFact] + public async Task ProcessSingleMessage() + { + var cancellationSource = new CancellationTokenSource(); + + var sqsClient = new AmazonSQSClient(); + var queueName = nameof(ProcessSingleMessage) + DateTime.Now.Ticks; + var queueUrl = (await sqsClient.CreateQueueAsync(queueName)).QueueUrl; + await Task.Delay(2000); + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + + var lambdaPort = GetFreePort(); + var testToolTask = StartTestToolProcessAsync(lambdaPort, $"QueueUrl={queueUrl},FunctionName=SQSProcessor", cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl, "TheBody"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + Assert.Single(listOfProcessedMessages); + Assert.Equal("TheBody", listOfProcessedMessages[0].Body); + Assert.Equal(0, await GetNumberOfMessagesInQueueAsync(sqsClient, queueUrl)); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl); + Console.SetError(consoleError); + } + } + + [RetryFact] + public async Task SQSEventSourceComesFromEnvironmentVariable() + { + var cancellationSource = new CancellationTokenSource(); + + var sqsClient = new AmazonSQSClient(); + var queueName = nameof(SQSEventSourceComesFromEnvironmentVariable) + DateTime.Now.Ticks; + var queueUrl = (await sqsClient.CreateQueueAsync(queueName)).QueueUrl; + await Task.Delay(2000); + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + + var lambdaPort = GetFreePort(); + var testToolTask = StartTestToolProcessAsync(lambdaPort, $"env:SQS_CONFIG&QueueUrl={queueUrl},FunctionName=SQSProcessor", cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl, "TheBody"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + Assert.Single(listOfProcessedMessages); + Assert.Equal("TheBody", listOfProcessedMessages[0].Body); + Assert.Equal(0, await GetNumberOfMessagesInQueueAsync(sqsClient, queueUrl)); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl); + Console.SetError(consoleError); + } + } + + [RetryFact] + public async Task ProcessMessagesFromMultipleEventSources() + { + var cancellationSource = new CancellationTokenSource(); + + var sqsClient = new AmazonSQSClient(); + var queueName1 = nameof(ProcessMessagesFromMultipleEventSources) + "-1-" + DateTime.Now.Ticks; + var queueUrl1 = (await sqsClient.CreateQueueAsync(queueName1)).QueueUrl; + + var queueName2 = nameof(ProcessMessagesFromMultipleEventSources) + "-2-" + DateTime.Now.Ticks; + var queueUrl2 = (await sqsClient.CreateQueueAsync(queueName2)).QueueUrl; + await Task.Delay(2000); + + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + + var sqsEventSourceConfig = """ + [ + { + "QueueUrl" : "queueUrl1", + "FunctionName" : "SQSProcessor" + }, + { + "QueueUrl" : "queueUrl2", + "FunctionName" : "SQSProcessor" + } + ] + """.Replace("queueUrl1", queueUrl1).Replace("queueUrl2", queueUrl2); + + var lambdaPort = GetFreePort(); + var testToolTask = StartTestToolProcessAsync(lambdaPort, sqsEventSourceConfig, cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl1, "MessageFromQueue1"); + await sqsClient.SendMessageAsync(queueUrl2, "MessageFromQueue2"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + Assert.Equal(2, listOfProcessedMessages.Count); + Assert.NotEqual(listOfProcessedMessages[0].EventSourceArn, listOfProcessedMessages[1].EventSourceArn); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl1); + await sqsClient.DeleteQueueAsync(queueUrl2); + Console.SetError(consoleError); + } + } + + [RetryFact] + public async Task MessageNotDeleted() + { + var cancellationSource = new CancellationTokenSource(); + var sqsClient = new AmazonSQSClient(); + var queueName = nameof(MessageNotDeleted) + DateTime.Now.Ticks; + var queueUrl = (await sqsClient.CreateQueueAsync(queueName)).QueueUrl; + await Task.Delay(2000); + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + + var lambdaPort = GetFreePort(); + var testToolTask = StartTestToolProcessAsync(lambdaPort, $"QueueUrl={queueUrl},FunctionName=SQSProcessor,DisableMessageDelete=true", cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl, "TheBody"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + Assert.Single(listOfProcessedMessages); + Assert.Equal("TheBody", listOfProcessedMessages[0].Body); + Assert.Equal(1, await GetNumberOfMessagesInQueueAsync(sqsClient, queueUrl)); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl); + Console.SetError(consoleError); + } + } + + [RetryFact] + public async Task LambdaThrowsErrorAndMessageNotDeleted() + { + var cancellationSource = new CancellationTokenSource(); + var sqsClient = new AmazonSQSClient(); + var queueName = nameof(LambdaThrowsErrorAndMessageNotDeleted) + DateTime.Now.Ticks; + var queueUrl = (await sqsClient.CreateQueueAsync(queueName)).QueueUrl; + await Task.Delay(2000); + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + var lambdaPort = GetFreePort(); + var testToolTask = StartTestToolProcessAsync(lambdaPort, $"QueueUrl={queueUrl},FunctionName=SQSProcessor", cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + + throw new Exception("Failed to process message"); + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl, "TheBody"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + Assert.Single(listOfProcessedMessages); + Assert.Equal("TheBody", listOfProcessedMessages[0].Body); + Assert.Equal(1, await GetNumberOfMessagesInQueueAsync(sqsClient, queueUrl)); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl); + Console.SetError(consoleError); + } + } + + [RetryFact] + public async Task PartialFailureResponse() + { + var cancellationSource = new CancellationTokenSource(); + var sqsClient = new AmazonSQSClient(); + var queueName = nameof(PartialFailureResponse) + DateTime.Now.Ticks; + var queueUrl = (await sqsClient.CreateQueueAsync(queueName)).QueueUrl; + await Task.Delay(2000); + var consoleError = Console.Error; + try + { + Console.SetError(Console.Out); + await sqsClient.SendMessageAsync(queueUrl, "Message1"); + + var lambdaPort = GetFreePort(); + + // Lower VisibilityTimeout to speed up receiving the message at the end to prove the message wasn't deleted. + var testToolTask = StartTestToolProcessAsync(lambdaPort, $"QueueUrl={queueUrl},FunctionName=SQSProcessor,VisibilityTimeout=3", cancellationSource); + + var listOfProcessedMessages = new List(); + var handler = (SQSEvent evnt, ILambdaContext context) => + { + TestOutputHelper.WriteLine($"Lambda handler called with {evnt.Records.Count} messages"); + foreach (var message in evnt.Records) + { + listOfProcessedMessages.Add(message); + } + + var sqsResponse = new SQSBatchResponse(); + sqsResponse.BatchItemFailures = new List + { + new SQSBatchResponse.BatchItemFailure + { + ItemIdentifier = evnt.Records[0].MessageId + } + }; + + return sqsResponse; + }; + + var lambdaTask = LambdaBootstrapBuilder.Create(handler, new DefaultLambdaJsonSerializer()) + .ConfigureOptions(x => x.RuntimeApiEndpoint = $"localhost:{lambdaPort}/SQSProcessor") + .Build() + .RunAsync(cancellationSource.Token); + + await sqsClient.SendMessageAsync(queueUrl, "TheBody"); + + var startTime = DateTime.UtcNow; + while (listOfProcessedMessages.Count == 0 && DateTime.UtcNow < startTime.AddMinutes(2)) + { + Assert.False(lambdaTask.IsFaulted, "Lambda function failed: " + lambdaTask.Exception?.ToString()); + await Task.Delay(500); + } + + // Wait for message to be deleted. + await Task.Delay(2000); + + // Since the message was never deleted by the event source it should still be eligibl for reading. + var response = await sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 20 }); + Assert.Single(response.Messages); + } + finally + { + _ = cancellationSource.CancelAsync(); + await sqsClient.DeleteQueueAsync(queueUrl); + Console.SetError(consoleError); + } + } + + + private async Task GetNumberOfMessagesInQueueAsync(IAmazonSQS sqsClient, string queueUrl) + { + // Add a delay to handle SQS eventual consistency. + await Task.Delay(5000); + var response = await sqsClient.GetQueueAttributesAsync(queueUrl, new List { "All" }); + return response.ApproximateNumberOfMessages + response.ApproximateNumberOfMessagesNotVisible; + } + + // Do not use async/await so we can be sure to hand back the Task that created by RunCommand back to caller. + private Task StartTestToolProcessAsync(int lambdaPort, string sqsEventSourceConfig, CancellationTokenSource cancellationTokenSource) + { + Environment.SetEnvironmentVariable("ASPNETCORE_ENVIRONMENT", "Development"); + + var environmentVariables = new Dictionary { }; + + if (sqsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix)) + { + var tokens = sqsEventSourceConfig.Split('&'); + if (tokens.Length == 2) + { + sqsEventSourceConfig = tokens[0]; + var envName = tokens[0].Replace(Constants.ArgumentEnvironmentVariablePrefix, ""); + var envValue = tokens[1]; + environmentVariables[envName] = envValue; + } + } + + var settings = new RunCommandSettings + { + LambdaEmulatorPort = lambdaPort, + NoLaunchWindow = true, + SQSEventSourceConfig = sqsEventSourceConfig + }; + + + var command = new RunCommand(MockInteractiveService.Object, new TestEnvironmentManager(environmentVariables)); + var context = new CommandContext(new List(), MockRemainingArgs.Object, "run", null); + Task testToolTask = command.ExecuteAsync(context, settings, cancellationTokenSource); + + var timeout = DateTime.UtcNow.AddMinutes(1); + while (DateTime.UtcNow < timeout && command.LambdRuntimeApiTask == null) + { + Thread.Sleep(100); + } + + Thread.Sleep(2000); + + Assert.NotNull(command.LambdRuntimeApiTask); + Assert.False(command.LambdRuntimeApiTask.IsFaulted, "Task to start Lambda Runtime API failed: " + command.LambdRuntimeApiTask.Exception?.ToString()); + + using var httpClient = new HttpClient(); + + var healthCheckUrl = $"http://localhost:{lambdaPort}/lambda-runtime-api/healthcheck"; + TestOutputHelper.WriteLine($"Attempting health check url for Lambda runtime api: {healthCheckUrl}"); + + try + { + var health = httpClient.GetStringAsync(healthCheckUrl).GetAwaiter().GetResult(); + TestOutputHelper.WriteLine($"Get successful health check: {health}"); + } + catch(Exception ex) + { + Assert.Fail($"Failed to make healthcheck: {ex}"); + } + + Thread.Sleep(2000); + return testToolTask; + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.Tests.Common/TestEnvironmentManager.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.Tests.Common/TestEnvironmentManager.cs new file mode 100644 index 000000000..d614a06fe --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.Tests.Common/TestEnvironmentManager.cs @@ -0,0 +1,8 @@ +using Amazon.Lambda.TestTool.Services.IO; + +namespace Amazon.Lambda.TestTool.Tests.Common; + +public class TestEnvironmentManager(System.Collections.IDictionary dictionary) : IEnvironmentManager +{ + public System.Collections.IDictionary GetEnvironmentVariables() => dictionary; +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Processes/ApiGatewayEmulatorProcessTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Processes/ApiGatewayEmulatorProcessTests.cs index f6d80d58b..fe6a6d790 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Processes/ApiGatewayEmulatorProcessTests.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/Processes/ApiGatewayEmulatorProcessTests.cs @@ -5,12 +5,14 @@ using Amazon.Lambda.TestTool.Commands.Settings; using Amazon.Lambda.TestTool.Models; using Amazon.Lambda.TestTool.Processes; +using Amazon.Lambda.TestTool.Tests.Common; using Amazon.Lambda.TestTool.Tests.Common.Helpers; using Xunit; +using Xunit.Abstractions; namespace Amazon.Lambda.TestTool.UnitTests.Processes; -public class ApiGatewayEmulatorProcessTests +public class ApiGatewayEmulatorProcessTests(ITestOutputHelper testOutputHelper) { [Theory] [InlineData(ApiGatewayEmulatorMode.Rest, HttpStatusCode.Forbidden, "{\"message\":\"Missing Authentication Token\"}")] diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/RuntimeApiTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/RuntimeApiTests.cs index d03c6a8e6..998460411 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/RuntimeApiTests.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/RuntimeApiTests.cs @@ -14,10 +14,12 @@ using Microsoft.Extensions.DependencyInjection; using Xunit; using Environment = System.Environment; +using Xunit.Abstractions; +using Amazon.Lambda.TestTool.Tests.Common; namespace Amazon.Lambda.TestTool.UnitTests; -public class RuntimeApiTests +public class RuntimeApiTests(ITestOutputHelper testOutputHelper) { #if DEBUG [Fact] diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs new file mode 100644 index 000000000..c8722551d --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs @@ -0,0 +1,78 @@ +using Amazon.Lambda.TestTool.Processes.SQSEventSource; +using Amazon.SQS.Model; +using Xunit; + +namespace Amazon.Lambda.TestTool.UnitTests.SQSEventSource; + +public class ConvertSDKToLambdaEventTests +{ + [Fact] + public void ConvertSDKMessageFull() + { + var sdkMessage = new Message + { + Attributes = new Dictionary { { "key1", "value1" }, { "key2", "value2" } }, + Body = "theBody", + MD5OfBody = "theBodyMD5", + MD5OfMessageAttributes = "attributesMD5", + MessageAttributes = new Dictionary + { + { "key1", new MessageAttributeValue{StringValue = "value1", DataType = "String"} }, + { "key2", new MessageAttributeValue{BinaryValue = new MemoryStream(), DataType = "Binary"} } + }, + MessageId = "id", + ReceiptHandle = "receiptHandle" + }; + + var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, "us-west-2", "queueArn"); + Assert.Equal("us-west-2", eventMessage.AwsRegion); + Assert.Equal("queueArn", eventMessage.EventSourceArn); + Assert.Equal("aws:sqs", eventMessage.EventSource); + + Assert.Equal(sdkMessage.Attributes, eventMessage.Attributes); + Assert.Equal("theBody", eventMessage.Body); + Assert.Equal("theBodyMD5", eventMessage.Md5OfBody); + Assert.Equal("attributesMD5", eventMessage.Md5OfMessageAttributes); + Assert.Equal("id", eventMessage.MessageId); + Assert.Equal("receiptHandle", eventMessage.ReceiptHandle); + + Assert.Equal(2, eventMessage.MessageAttributes.Count); + + Assert.Equal("value1", eventMessage.MessageAttributes["key1"].StringValue); + Assert.Null(eventMessage.MessageAttributes["key1"].BinaryValue); + Assert.Equal("String", eventMessage.MessageAttributes["key1"].DataType); + + Assert.Null(eventMessage.MessageAttributes["key2"].StringValue); + Assert.NotNull(eventMessage.MessageAttributes["key2"].BinaryValue); + Assert.Equal("Binary", eventMessage.MessageAttributes["key2"].DataType); + } + + [Fact] + public void ConvertSDKMessageWithNullCollections() + { + var sdkMessage = new Message + { + Attributes = null, + Body = "theBody", + MD5OfBody = "theBodyMD5", + MD5OfMessageAttributes = "attributesMD5", + MessageAttributes = null, + MessageId = "id", + ReceiptHandle = "receiptHandle" + }; + + var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, "us-west-2", "queueArn"); + Assert.Equal("us-west-2", eventMessage.AwsRegion); + Assert.Equal("queueArn", eventMessage.EventSourceArn); + Assert.Equal("aws:sqs", eventMessage.EventSource); + + Assert.Equal("theBody", eventMessage.Body); + Assert.Equal("theBodyMD5", eventMessage.Md5OfBody); + Assert.Equal("attributesMD5", eventMessage.Md5OfMessageAttributes); + Assert.Equal("id", eventMessage.MessageId); + Assert.Equal("receiptHandle", eventMessage.ReceiptHandle); + + Assert.Null(eventMessage.Attributes); + Assert.Null(eventMessage.MessageAttributes); + } +} diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ParseSQSEventSourceConfigTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ParseSQSEventSourceConfigTests.cs new file mode 100644 index 000000000..b9686f219 --- /dev/null +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ParseSQSEventSourceConfigTests.cs @@ -0,0 +1,142 @@ +using Amazon.Lambda.TestTool.Processes.SQSEventSource; +using Xunit; + +namespace Amazon.Lambda.TestTool.UnitTests.SQSEventSource; + +public class ParseSQSEventSourceConfigTests +{ + [Fact] + public void ParseValidJsonObject() + { + string json = """ +{ + "QueueUrl" : "https://amazonsqs/queueurl", + "FunctionName" : "LambdaFunction", + "BatchSize" : 5, + "DisableMessageDelete" : true, + "LambdaRuntimeApi" : "http://localhost:7777/", + "Profile" : "beta", + "Region" : "us-east-23", + "VisibilityTimeout" : 50 +} +"""; + + var configs = SQSEventSourceProcess.LoadSQSEventSourceConfig(json); + Assert.Single(configs); + Assert.Equal("https://amazonsqs/queueurl", configs[0].QueueUrl); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(5, configs[0].BatchSize); + Assert.True(configs[0].DisableMessageDelete); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + Assert.Equal(50, configs[0].VisibilityTimeout); + + } + + [Fact] + public void ParseInvalidJsonObject() + { + string json = """ +{ + "aaa" +} +"""; + + Assert.Throws(() => SQSEventSourceProcess.LoadSQSEventSourceConfig(json)); + } + + + [Fact] + public void ParseValidJsonArray() + { + string json = """ +[ + { + "QueueUrl" : "https://amazonsqs/queueurl", + "FunctionName" : "LambdaFunction", + "BatchSize" : 5, + "DisableMessageDelete" : true, + "LambdaRuntimeApi" : "http://localhost:7777/", + "Profile" : "beta", + "Region" : "us-east-23", + "VisibilityTimeout" : 50 + }, + { + "QueueUrl" : "https://amazonsqs/queueurl", + "FunctionName" : "LambdaFunction", + "BatchSize" : 5, + "DisableMessageDelete" : true, + "LambdaRuntimeApi" : "http://localhost:7777/", + "Profile" : "beta", + "Region" : "us-east-23", + "VisibilityTimeout" : 50 + } +] +"""; + + var configs = SQSEventSourceProcess.LoadSQSEventSourceConfig(json); + Assert.Equal(2, configs.Count); + + foreach (var config in configs) + { + Assert.Equal("https://amazonsqs/queueurl", config.QueueUrl); + Assert.Equal("LambdaFunction", config.FunctionName); + Assert.Equal(5, config.BatchSize); + Assert.True(config.DisableMessageDelete); + Assert.Equal("http://localhost:7777/", config.LambdaRuntimeApi); + Assert.Equal("beta", config.Profile); + Assert.Equal("us-east-23", config.Region); + Assert.Equal(50, config.VisibilityTimeout); + } + } + + [Fact] + public void ParseInvalidJsonArray() + { + string json = """ +[ + {"aaa"} +] +"""; + + Assert.Throws(() => SQSEventSourceProcess.LoadSQSEventSourceConfig(json)); + } + + [Fact] + public void ParseQueueUrl() + { + var configs = SQSEventSourceProcess.LoadSQSEventSourceConfig("https://amazonsqs/queueurl"); + Assert.Single(configs); + Assert.Equal("https://amazonsqs/queueurl", configs[0].QueueUrl); + } + + [Fact] + public void ParseKeyPairs() + { + var configs = SQSEventSourceProcess.LoadSQSEventSourceConfig( + "QueueUrl=https://amazonsqs/queueurl ,functionName =LambdaFunction, batchSize=5, DisableMessageDelete=true," + + "LambdaRuntimeApi=http://localhost:7777/ ,Profile=beta,Region=us-east-23,VisibilityTimeout=50"); + + Assert.Single(configs); + Assert.Equal("https://amazonsqs/queueurl", configs[0].QueueUrl); + Assert.Equal("LambdaFunction", configs[0].FunctionName); + Assert.Equal(5, configs[0].BatchSize); + Assert.True(configs[0].DisableMessageDelete); + Assert.Equal("http://localhost:7777/", configs[0].LambdaRuntimeApi); + Assert.Equal("beta", configs[0].Profile); + Assert.Equal("us-east-23", configs[0].Region); + Assert.Equal(50, configs[0].VisibilityTimeout); + } + + [Theory] + [InlineData("novalue")] + [InlineData("BatchSize=noint")] + [InlineData("VisibilityTimeout=noint")] + [InlineData("DisableMessageDelete=nobool")] + [InlineData("QueueUrl=https://amazonsqs/queueurl FunctionName =LambdaFunction")] + public void InvalidKeyPairString(string keyPairConfig) + { + Assert.Throws(() => SQSEventSourceProcess.LoadSQSEventSourceConfig(keyPairConfig)); + } +}