Skip to content

Add SQS event source support for TestTool #2008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9d91dd2
Add SQS event source to test tool
normj Mar 10, 2025
49e4570
Add unit tests
normj Mar 11, 2025
a10ddaf
Add integ tests
normj Mar 11, 2025
7ef91de
Add change log file
normj Mar 11, 2025
ae43475
Update switches description
normj Mar 11, 2025
597d943
Add delay to handle SQS eventual consistency
normj Mar 11, 2025
fd1612f
Add dispose of TcpListener when looking for a free port.
normj Mar 11, 2025
de271ac
Rework how random ports are picked in tests
normj Mar 11, 2025
e484507
Add collection attribute to SQS integ tests
normj Mar 11, 2025
128befd
Add [RetryFact] for new SQS integ tests
normj Mar 11, 2025
016f318
Do more checks to see if port is available
normj Mar 11, 2025
2eca321
Add more logging for tests.
normj Mar 12, 2025
0bc3a19
Add started polling info log message
normj Mar 13, 2025
cc45ffa
Merge branch 'dev' into normj/testtool-sqs
normj Mar 22, 2025
3108beb
Address PR comments
normj Mar 22, 2025
377b15d
Merge branch 'dev' into normj/testtool-sqs
normj Apr 1, 2025
76c92c8
Merge branch 'normj/testtool-sqs' of https://github.com/aws/aws-lambd…
normj Apr 1, 2025
eeee081
Cancel polling for next invocations if server if request has been can…
normj Apr 3, 2025
2d37ce7
When cleaning up test instances don't block waiting for the cancel to…
normj Apr 3, 2025
e5121e4
Rework the RunCommand block on running tasks to take into account the…
normj Apr 3, 2025
484a885
Shutdown cleanup
normj Apr 3, 2025
f64f3a3
Try disabling SQS integ tests
normj Apr 3, 2025
6298a1a
Try disabling more tests to track down CI issue
normj Apr 3, 2025
5e06508
Try reverting the use of TestOutputToolInteractiveService
normj Apr 3, 2025
fc0e128
Revert more test changes
normj Apr 3, 2025
7cb6f87
Add back tests see if they now work in CI
normj Apr 3, 2025
a38f6d7
Tweak SQS tests
normj Apr 3, 2025
3c6e8d5
Try turning off the SQS tests
normj Apr 3, 2025
1a70ee3
Add back one SQS test
normj Apr 3, 2025
8aa7b80
Add back another test
normj Apr 3, 2025
81e8667
Add another test
normj Apr 3, 2025
bf19b4b
Add cancellation tokens on HTTP requests.
normj Apr 3, 2025
54e8318
Try rewriting GetFreePort
normj Apr 3, 2025
329c38d
Add some checks that the Lambda Runtime API started correct if not As…
normj Apr 3, 2025
f85380f
Add back all the SQS tests.
normj Apr 3, 2025
e3be00b
Add delay using the SQS queue before using it
normj Apr 3, 2025
0daf44d
Check if Lambda function task is failed
normj Apr 3, 2025
073f1de
Tweak tests
normj Apr 4, 2025
98b10c0
Add health check for lambda runtime api
normj Apr 4, 2025
7ac01d4
Try turning on trace logging
normj Apr 4, 2025
33386a5
Try redirecting stderr to stdout in tests
normj Apr 4, 2025
9f71778
More tweaks to code looking for a free port.
normj Apr 4, 2025
768286f
Try again without SQS integ tests
normj Apr 4, 2025
6c56167
Switch logging back to Error
normj Apr 4, 2025
b7bcbd9
Add Tests
normj Apr 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/e390422f-955d-4699-97cf-67725872e746.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.TestTool",
"Type": "Minor",
"ChangelogMessages": [
"Add SQS event source support"
]
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">

<Import Project="..\..\..\..\buildtools\common.props" />
<PropertyGroup>
Expand All @@ -15,7 +15,7 @@
<PackAsTool>true</PackAsTool>
<PackageId>Amazon.Lambda.TestTool</PackageId>
<ToolCommandName>dotnet-lambda-test-tool</ToolCommandName>
<Version>0.9.1</Version>
<Version>0.9.888</Version>
<NoWarn>NU5100</NoWarn>
<RollForward>Major</RollForward>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand All @@ -26,7 +26,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.400" />
<PackageReference Include="AWSSDK.Lambda" Version="3.7.411.17" />
<PackageReference Include="AWSSDK.SQS" Version="3.7.400.109" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="Spectre.Console.Cli" Version="0.49.1" />
<PackageReference Include="Amazon.Lambda.APIGatewayEvents" Version="2.7.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Amazon.Lambda.TestTool.Extensions;
using Amazon.Lambda.TestTool.Models;
using Amazon.Lambda.TestTool.Processes;
using Amazon.Lambda.TestTool.Processes.SQSEventSource;
using Amazon.Lambda.TestTool.Services;
using Amazon.Lambda.TestTool.Services.IO;
using Spectre.Console.Cli;
Expand All @@ -31,17 +32,17 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
EvaluateEnvironmentVariables(settings);

if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue)
if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig))
{
throw new ArgumentException("At least one of the following parameters must be set: " +
"--lambda-emulator-port or --api-gateway-emulator-port");
"--lambda-emulator-port, --api-gateway-emulator-port or --sqs-eventsource-config");
}

var tasks = new List<Task>();

if (settings.LambdaEmulatorPort.HasValue)
{
var testToolProcess = TestToolProcess.Startup(settings, cancellationTokenSource.Token);
var testToolProcess = TestToolProcess.Startup(settings, toolInteractiveService, cancellationTokenSource.Token);
tasks.Add(testToolProcess.RunningTask);

if (!settings.NoLaunchWindow)
Expand Down Expand Up @@ -70,10 +71,16 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
}

var apiGatewayEmulatorProcess =
ApiGatewayEmulatorProcess.Startup(settings, cancellationTokenSource.Token);
ApiGatewayEmulatorProcess.Startup(settings, toolInteractiveService, cancellationTokenSource.Token);
tasks.Add(apiGatewayEmulatorProcess.RunningTask);
}

if (!string.IsNullOrEmpty(settings.SQSEventSourceConfig))
{
var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(sqsEventSourceProcess.RunningTask);
}

await Task.WhenAny(tasks);

return CommandReturnCodes.Success;
Expand Down Expand Up @@ -132,5 +139,15 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings)
throw new ArgumentException($"Value for {API_GATEWAY_EMULATOR_PORT} environment variable was not a valid port number");
}
}

if (settings.SQSEventSourceConfig != null && settings.SQSEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase))
{
var envVariable = settings.SQSEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length);
if (!environmentVariables.Contains(envVariable))
{
throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty");
}
settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public sealed class RunCommandSettings : CommandSettings
[CommandOption("--api-gateway-emulator-port <PORT>")]
[Description("The port number used for the test tool's API Gateway emulator.")]
public int? ApiGatewayEmulatorPort { get; set; }

/// <summary>
/// The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=queue-url,FunctionName=function-name,VisibilityTimeout=100\".
/// Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout
/// </summary>
[CommandOption("--sqs-eventsource-config <CONFIG>")]
[Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=<queue-url>,FunctionName=<function-name>,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")]
public string? SQSEventSourceConfig { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,10 @@ public abstract class Constants
/// The Visual Studio Marketplace link for the AWS Toolkit for Visual Studio.
/// </summary>
public const string LinkVsToolkitMarketplace = "https://marketplace.visualstudio.com/items?itemName=AmazonWebServices.AWSToolkitforVisualStudio2022";

/// <summary>
/// Prefix used for values of command line arguments that support the value being stored in an environment variable.
/// This used in the Aspire integration where it is often easier to pass configuration via environment variables.
/// </summary>
public const string ArgumentEnvironmentVariablePrefix = "env:";
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ApiGatewayEmulatorProcess
/// <summary>
/// Creates the Web API and runs it in the background.
/// </summary>
public static ApiGatewayEmulatorProcess Startup(RunCommandSettings settings, CancellationToken cancellationToken = default)
public static ApiGatewayEmulatorProcess Startup(RunCommandSettings settings, IToolInteractiveService toolInteractiveService, CancellationToken cancellationToken = default)
{
if (settings.ApiGatewayEmulatorMode is null)
{
Expand All @@ -62,6 +62,11 @@ public static ApiGatewayEmulatorProcess Startup(RunCommandSettings settings, Can

var app = builder.Build();

if (!app.Environment.IsProduction())
{
app.Services.GetRequiredService<ILoggerFactory>().AddProvider(new ToolInteractiveLoggerProvider(toolInteractiveService));
}

app.MapHealthChecks("/__lambda_test_tool_apigateway_health__");

app.Lifetime.ApplicationStarted.Register(() =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.Lambda.Model;
using Amazon.Lambda.SQSEvents;
using Amazon.Runtime;
using Amazon.SQS.Model;
using Amazon.SQS;
using System.Text.Json;

namespace Amazon.Lambda.TestTool.Processes.SQSEventSource;

/// <summary>
/// IHostedService that will run continuially polling the SQS queue for messages and invoking the connected
/// Lambda function with the polled messages.
/// </summary>
public class SQSEventSourceBackgroundService : BackgroundService
{
private static readonly List<string> DefaultAttributesToReceive = new List<string> { "All" };
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

private readonly ILogger<SQSEventSourceProcess> _logger;
private readonly IAmazonSQS _sqsClient;
private readonly IAmazonLambda _lambdaClient;
private readonly SQSEventSourceBackgroundServiceConfig _config;

/// <summary>
/// Constructs instance of SQSEventSourceBackgroundService.
/// </summary>
/// <param name="logger"></param>
/// <param name="sqsClient"></param>
/// <param name="config"></param>
public SQSEventSourceBackgroundService(ILogger<SQSEventSourceProcess> logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config)
{
_logger = logger;
_sqsClient = sqsClient;
_config = config;

_lambdaClient = new AmazonLambdaClient(new BasicAWSCredentials("accessKey", "secretKey"), new AmazonLambdaConfig
{
ServiceURL = _config.LambdaRuntimeApi
});
}

private async Task<string> GetQueueArn(CancellationToken stoppingToken)
{
var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
{
QueueUrl = _config.QueueUrl,
AttributeNames = new List<string> { "QueueArn" }
}, stoppingToken);

return response.QueueARN;
}

/// <summary>
/// Execute the SQSEventSourceBackgroundService.
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// The queue arn is needed for creating the Lambda event.
var queueArn = await GetQueueArn(stoppingToken);
_logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn);
while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl);
// Read a message from the queue using the ExternalCommands console application.
var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = _config.QueueUrl,
WaitTimeSeconds = 20,
MessageAttributeNames = DefaultAttributesToReceive,
MessageSystemAttributeNames = DefaultAttributesToReceive,
MaxNumberOfMessages = _config.BatchSize,
VisibilityTimeout = _config.VisibilityTimeout,
}, stoppingToken);

if (stoppingToken.IsCancellationRequested)
{
return;
}
if (response.Messages == null || response.Messages.Count == 0)
{
_logger.LogDebug("No messages received from while polling SQS");
// Since there are no messages, sleep a bit to wait for messages to come.
await Task.Delay(1000);
continue;
}

var lambdaPayload = new
{
Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn)
};

var invokeRequest = new InvokeRequest
{
InvocationType = InvocationType.RequestResponse,
FunctionName = _config.FunctionName,
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
};

_logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count);
var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, stoppingToken);

if (lambdaResponse.FunctionError != null)
{
_logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError);
continue;
}

if (!_config.DisableMessageDelete)
{
List<Message> messagesToDelete;
if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0)
{
var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload);
if (partialResponse == null)
{
lambdaResponse.Payload.Position = 0;
using var reader = new StreamReader(lambdaResponse.Payload);
var payloadString = reader.ReadToEnd();
_logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString);
continue;
}

if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0)
{
_logger.LogDebug("Partial SQS response received with no failures");
messagesToDelete = response.Messages;
}
else
{
_logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count);
messagesToDelete = new List<Message>();
foreach (var message in response.Messages)
{
if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId)))
{
messagesToDelete.Add(message);
}
}
}
}
else
{
_logger.LogDebug("No partial response received. All messages eligible for deletion");
messagesToDelete = response.Messages;
}

if (messagesToDelete.Count > 0)
{
var deleteRequest = new DeleteMessageBatchRequest
{
QueueUrl = _config.QueueUrl,
Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList()
};

_logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count);
await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception e)
{
_logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message);

// Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset.
await Task.Delay(3000);
}
}
}

/// <summary>
/// Convert from the SDK's list of messages to the Lambda event's SQS message type.
/// </summary>
/// <param name="message"></param>
/// <param name="awsRegion"></param>
/// <param name="queueArn"></param>
/// <returns></returns>
internal static List<SQSEvent.SQSMessage> ConvertToLambdaMessages(List<Message> message, string awsRegion, string queueArn)
{
return message.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList();
}

/// <summary>
/// Convert from the SDK's SQS message to the Lambda event's SQS message type.
/// </summary>
/// <param name="message"></param>
/// <param name="awsRegion"></param>
/// <param name="queueArn"></param>
/// <returns></returns>
internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn)
{
var lambdaMessage = new SQSEvent.SQSMessage
{
AwsRegion = awsRegion,
Body = message.Body,
EventSource = "aws:sqs",
EventSourceArn = queueArn,
Md5OfBody = message.MD5OfBody,
Md5OfMessageAttributes = message.MD5OfMessageAttributes,
MessageId = message.MessageId,
ReceiptHandle = message.ReceiptHandle,
};

if (message.MessageAttributes != null && message.MessageAttributes.Count > 0)
{
lambdaMessage.MessageAttributes = new Dictionary<string, SQSEvent.MessageAttribute>();
foreach (var kvp in message.MessageAttributes)
{
var lambdaAttribute = new SQSEvent.MessageAttribute
{
DataType = kvp.Value.DataType,
StringValue = kvp.Value.StringValue,
BinaryValue = kvp.Value.BinaryValue
};

lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute);
}
}

if (message.Attributes != null && message.Attributes.Count > 0)
{
lambdaMessage.Attributes = message.Attributes;
}

return lambdaMessage;
}
}
Loading