Skip to content

Add SQS event source support for TestTool #2008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9d91dd2
Add SQS event source to test tool
normj Mar 10, 2025
49e4570
Add unit tests
normj Mar 11, 2025
a10ddaf
Add integ tests
normj Mar 11, 2025
7ef91de
Add change log file
normj Mar 11, 2025
ae43475
Update switches description
normj Mar 11, 2025
597d943
Add delay to handle SQS eventual consistency
normj Mar 11, 2025
fd1612f
Add dispose of TcpListener when looking for a free port.
normj Mar 11, 2025
de271ac
Rework how random ports are picked in tests
normj Mar 11, 2025
e484507
Add collection attribute to SQS integ tests
normj Mar 11, 2025
128befd
Add [RetryFact] for new SQS integ tests
normj Mar 11, 2025
016f318
Do more checks to see if port is available
normj Mar 11, 2025
2eca321
Add more logging for tests.
normj Mar 12, 2025
0bc3a19
Add started polling info log message
normj Mar 13, 2025
cc45ffa
Merge branch 'dev' into normj/testtool-sqs
normj Mar 22, 2025
3108beb
Address PR comments
normj Mar 22, 2025
377b15d
Merge branch 'dev' into normj/testtool-sqs
normj Apr 1, 2025
76c92c8
Merge branch 'normj/testtool-sqs' of https://github.com/aws/aws-lambd…
normj Apr 1, 2025
eeee081
Cancel polling for next invocations if server if request has been can…
normj Apr 3, 2025
2d37ce7
When cleaning up test instances don't block waiting for the cancel to…
normj Apr 3, 2025
e5121e4
Rework the RunCommand block on running tasks to take into account the…
normj Apr 3, 2025
484a885
Shutdown cleanup
normj Apr 3, 2025
f64f3a3
Try disabling SQS integ tests
normj Apr 3, 2025
6298a1a
Try disabling more tests to track down CI issue
normj Apr 3, 2025
5e06508
Try reverting the use of TestOutputToolInteractiveService
normj Apr 3, 2025
fc0e128
Revert more test changes
normj Apr 3, 2025
7cb6f87
Add back tests see if they now work in CI
normj Apr 3, 2025
a38f6d7
Tweak SQS tests
normj Apr 3, 2025
3c6e8d5
Try turning off the SQS tests
normj Apr 3, 2025
1a70ee3
Add back one SQS test
normj Apr 3, 2025
8aa7b80
Add back another test
normj Apr 3, 2025
81e8667
Add another test
normj Apr 3, 2025
bf19b4b
Add cancellation tokens on HTTP requests.
normj Apr 3, 2025
54e8318
Try rewriting GetFreePort
normj Apr 3, 2025
329c38d
Add some checks that the Lambda Runtime API started correct if not As…
normj Apr 3, 2025
f85380f
Add back all the SQS tests.
normj Apr 3, 2025
e3be00b
Add delay using the SQS queue before using it
normj Apr 3, 2025
0daf44d
Check if Lambda function task is failed
normj Apr 3, 2025
073f1de
Tweak tests
normj Apr 4, 2025
98b10c0
Add health check for lambda runtime api
normj Apr 4, 2025
7ac01d4
Try turning on trace logging
normj Apr 4, 2025
33386a5
Try redirecting stderr to stdout in tests
normj Apr 4, 2025
9f71778
More tweaks to code looking for a free port.
normj Apr 4, 2025
768286f
Try again without SQS integ tests
normj Apr 4, 2025
6c56167
Switch logging back to Error
normj Apr 4, 2025
b7bcbd9
Add Tests
normj Apr 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/e390422f-955d-4699-97cf-67725872e746.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.TestTool",
"Type": "Minor",
"ChangelogMessages": [
"Add SQS event source support"
]
}
]
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">

<Import Project="..\..\..\..\buildtools\common.props" />
<PropertyGroup>
<OutputType>Exe</OutputType>
<OutputType>Exe</OutputType>
<Description>A tool to help debug and test your .NET AWS Lambda functions locally.</Description>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
Expand All @@ -17,7 +17,7 @@
<ToolCommandName>dotnet-lambda-test-tool</ToolCommandName>
<Version>0.9.1</Version>
<NoWarn>NU5100</NoWarn>
<RollForward>Major</RollForward>
<RollForward>Major</RollForward>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>

Expand All @@ -26,7 +26,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.400" />
<PackageReference Include="AWSSDK.Lambda" Version="3.7.411.17" />
<PackageReference Include="AWSSDK.SQS" Version="3.7.400.109" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="Spectre.Console.Cli" Version="0.49.1" />
<PackageReference Include="Amazon.Lambda.APIGatewayEvents" Version="2.7.1" />
Expand All @@ -38,7 +41,7 @@
<Exec Command="dotnet msbuild ../../../../Libraries/src/Amazon.Lambda.RuntimeSupport/Amazon.Lambda.RuntimeSupport.csproj --getProperty:TargetFrameworks" ConsoleToMSBuild="true">
<Output TaskParameter="ConsoleOutput" PropertyName="RuntimeSupportTargetFrameworks" />
</Exec>

<ItemGroup>
<TempFrameworks Include="$(RuntimeSupportTargetFrameworks.Split(';'))" />
<TargetFrameworks Include="@(TempFrameworks)" Condition="'%(Identity)' != 'netstandard2.0'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Amazon.Lambda.TestTool.Extensions;
using Amazon.Lambda.TestTool.Models;
using Amazon.Lambda.TestTool.Processes;
using Amazon.Lambda.TestTool.Processes.SQSEventSource;
using Amazon.Lambda.TestTool.Services;
using Amazon.Lambda.TestTool.Services.IO;
using Spectre.Console.Cli;
Expand All @@ -22,6 +23,11 @@ public sealed class RunCommand(
public const string LAMBDA_RUNTIME_API_PORT = "LAMBDA_RUNTIME_API_PORT";
public const string API_GATEWAY_EMULATOR_PORT = "API_GATEWAY_EMULATOR_PORT";

/// <summary>
/// Task for the Lambda Runtime API.
/// </summary>
public Task LambdRuntimeApiTask { get; private set; }

/// <summary>
/// The method responsible for executing the <see cref="RunCommand"/>.
/// </summary>
Expand All @@ -31,17 +37,18 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
EvaluateEnvironmentVariables(settings);

if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue)
if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig))
{
throw new ArgumentException("At least one of the following parameters must be set: " +
"--lambda-emulator-port or --api-gateway-emulator-port");
"--lambda-emulator-port, --api-gateway-emulator-port or --sqs-eventsource-config");
}

var tasks = new List<Task>();

if (settings.LambdaEmulatorPort.HasValue)
{
var testToolProcess = TestToolProcess.Startup(settings, cancellationTokenSource.Token);
LambdRuntimeApiTask = testToolProcess.RunningTask;
tasks.Add(testToolProcess.RunningTask);

if (!settings.NoLaunchWindow)
Expand Down Expand Up @@ -74,10 +81,20 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
tasks.Add(apiGatewayEmulatorProcess.RunningTask);
}

await Task.WhenAny(tasks);
if (!string.IsNullOrEmpty(settings.SQSEventSourceConfig))
{
var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(sqsEventSourceProcess.RunningTask);
}

await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token));

return CommandReturnCodes.Success;
}
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
{
return CommandReturnCodes.Success;
}
catch (Exception e) when (e.IsExpectedException())
{
toolInteractiveService.WriteErrorLine(string.Empty);
Expand Down Expand Up @@ -132,5 +149,15 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings)
throw new ArgumentException($"Value for {API_GATEWAY_EMULATOR_PORT} environment variable was not a valid port number");
}
}

if (settings.SQSEventSourceConfig != null && settings.SQSEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase))
{
var envVariable = settings.SQSEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length);
if (!environmentVariables.Contains(envVariable))
{
throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty");
}
settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public sealed class RunCommandSettings : CommandSettings
[CommandOption("--api-gateway-emulator-port <PORT>")]
[Description("The port number used for the test tool's API Gateway emulator.")]
public int? ApiGatewayEmulatorPort { get; set; }

/// <summary>
/// The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=queue-url,FunctionName=function-name,VisibilityTimeout=100\".
/// Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout
/// </summary>
[CommandOption("--sqs-eventsource-config <CONFIG>")]
[Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=<queue-url>,FunctionName=<function-name>,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")]
public string? SQSEventSourceConfig { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,10 @@ public abstract class Constants
/// The Visual Studio Marketplace link for the AWS Toolkit for Visual Studio.
/// </summary>
public const string LinkVsToolkitMarketplace = "https://marketplace.visualstudio.com/items?itemName=AmazonWebServices.AWSToolkitforVisualStudio2022";

/// <summary>
/// Prefix this is used for values of command line arguments that support the value being stored in an environment variable.
/// This used in the Aspire integration where it is often easier to pass configuration via environment variables.
/// </summary>
public const string ArgumentEnvironmentVariablePrefix = "env:";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.Lambda.Model;
using Amazon.Lambda.SQSEvents;
using Amazon.Runtime;
using Amazon.SQS.Model;
using Amazon.SQS;
using System.Text.Json;
using Amazon.Lambda.TestTool.Services;

namespace Amazon.Lambda.TestTool.Processes.SQSEventSource;

/// <summary>
/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected
/// Lambda function with the polled messages.
/// </summary>
public class SQSEventSourceBackgroundService : BackgroundService
{
private static readonly List<string> DefaultAttributesToReceive = new List<string> { "All" };
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

private readonly ILogger<SQSEventSourceProcess> _logger;
private readonly IAmazonSQS _sqsClient;
private readonly ILambdaClient _lambdaClient;
private readonly SQSEventSourceBackgroundServiceConfig _config;

/// <summary>
/// Constructs instance of <see cref="SQSEventSourceBackgroundService"/>.
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="sqsClient">The SQS client used to poll messages from a queue.</param>
/// <param name="config">The config of the service</param>
/// <param name="lambdaClient">The Lambda client that can use a different endpoint for each invoke request.</param>
public SQSEventSourceBackgroundService(ILogger<SQSEventSourceProcess> logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient)
{
_logger = logger;
_sqsClient = sqsClient;
_config = config;
_lambdaClient = lambdaClient;
}

private async Task<string> GetQueueArn(CancellationToken stoppingToken)
{
var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
{
QueueUrl = _config.QueueUrl,
AttributeNames = new List<string> { "QueueArn" }
}, stoppingToken);

return response.QueueARN;
}

/// <summary>
/// Execute the SQSEventSourceBackgroundService.
/// </summary>
/// <param name="stoppingToken">CancellationToken used to end the service.</param>
/// <returns>Task for the background service.</returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// The queue arn is needed for creating the Lambda event.
var queueArn = await GetQueueArn(stoppingToken);
_logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn);
while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl);
// Read a message from the queue using the ExternalCommands console application.
var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = _config.QueueUrl,
WaitTimeSeconds = 20,
MessageAttributeNames = DefaultAttributesToReceive,
MessageSystemAttributeNames = DefaultAttributesToReceive,
MaxNumberOfMessages = _config.BatchSize,
VisibilityTimeout = _config.VisibilityTimeout,
}, stoppingToken);

if (stoppingToken.IsCancellationRequested)
{
return;
}
if (response.Messages == null || response.Messages.Count == 0)
{
_logger.LogDebug("No messages received from while polling SQS");
// Since there are no messages, sleep a bit to wait for messages to come.
await Task.Delay(200);
continue;
}

var lambdaPayload = new
{
Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn)
};

var invokeRequest = new InvokeRequest
{
InvocationType = InvocationType.RequestResponse,
FunctionName = _config.FunctionName,
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
};

_logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count);
var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi);

if (lambdaResponse.FunctionError != null)
{
_logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError);
continue;
}

if (!_config.DisableMessageDelete)
{
List<Message> messagesToDelete;
if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0)
{
var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload);
if (partialResponse == null)
{
lambdaResponse.Payload.Position = 0;
using var reader = new StreamReader(lambdaResponse.Payload);
var payloadString = reader.ReadToEnd();
_logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString);
continue;
}

if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0)
{
_logger.LogDebug("Partial SQS response received with no failures");
messagesToDelete = response.Messages;
}
else
{
_logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count);
messagesToDelete = new List<Message>();
foreach (var message in response.Messages)
{
if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId)))
{
messagesToDelete.Add(message);
}
}
}
}
else
{
_logger.LogDebug("No partial response received. All messages eligible for deletion");
messagesToDelete = response.Messages;
}

if (messagesToDelete.Count > 0)
{
var deleteRequest = new DeleteMessageBatchRequest
{
QueueUrl = _config.QueueUrl,
Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList()
};

_logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count);
await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception e)
{
_logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message);

// Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset.
await Task.Delay(3000);
}
}
}

/// <summary>
/// Convert from the SDK's list of messages to the Lambda event's SQS message type.
/// </summary>
/// <param name="messages">List of messages using the SDK's .NET type</param>
/// <param name="awsRegion">The aws region the messages came from.</param>
/// <param name="queueArn">The SQS queue arn the messages came from.</param>
/// <returns>List of messages using the Lambda event's .NET type.</returns>
internal static List<SQSEvent.SQSMessage> ConvertToLambdaMessages(List<Message> messages, string awsRegion, string queueArn)
{
return messages.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList();
}

/// <summary>
/// Convert from the SDK's SQS message to the Lambda event's SQS message type.
/// </summary>
/// <param name="message">Message using the SDK's .NET type</param>
/// <param name="awsRegion">The aws region the message came from.</param>
/// <param name="queueArn">The SQS queue arn the message came from.</param>
/// <returns>Messages using the Lambda event's .NET type.</returns>
internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn)
{
var lambdaMessage = new SQSEvent.SQSMessage
{
AwsRegion = awsRegion,
Body = message.Body,
EventSource = "aws:sqs",
EventSourceArn = queueArn,
Md5OfBody = message.MD5OfBody,
Md5OfMessageAttributes = message.MD5OfMessageAttributes,
MessageId = message.MessageId,
ReceiptHandle = message.ReceiptHandle,
};

if (message.MessageAttributes != null && message.MessageAttributes.Count > 0)
{
lambdaMessage.MessageAttributes = new Dictionary<string, SQSEvent.MessageAttribute>();
foreach (var kvp in message.MessageAttributes)
{
var lambdaAttribute = new SQSEvent.MessageAttribute
{
DataType = kvp.Value.DataType,
StringValue = kvp.Value.StringValue,
BinaryValue = kvp.Value.BinaryValue
};

lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute);
}
}

if (message.Attributes != null && message.Attributes.Count > 0)
{
lambdaMessage.Attributes = message.Attributes;
}

return lambdaMessage;
}
}
Loading
Loading