-
Notifications
You must be signed in to change notification settings - Fork 490
Add SQS event source support for TestTool #2008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
9d91dd2
Add SQS event source to test tool
normj 49e4570
Add unit tests
normj a10ddaf
Add integ tests
normj 7ef91de
Add change log file
normj ae43475
Update switches description
normj 597d943
Add delay to handle SQS eventual consistency
normj fd1612f
Add dispose of TcpListener when looking for a free port.
normj de271ac
Rework how random ports are picked in tests
normj e484507
Add collection attribute to SQS integ tests
normj 128befd
Add [RetryFact] for new SQS integ tests
normj 016f318
Do more checks to see if port is available
normj 2eca321
Add more logging for tests.
normj 0bc3a19
Add started polling info log message
normj cc45ffa
Merge branch 'dev' into normj/testtool-sqs
normj 3108beb
Address PR comments
normj 377b15d
Merge branch 'dev' into normj/testtool-sqs
normj 76c92c8
Merge branch 'normj/testtool-sqs' of https://github.com/aws/aws-lambd…
normj eeee081
Cancel polling for next invocations if server if request has been can…
normj 2d37ce7
When cleaning up test instances don't block waiting for the cancel to…
normj e5121e4
Rework the RunCommand block on running tasks to take into account the…
normj 484a885
Shutdown cleanup
normj f64f3a3
Try disabling SQS integ tests
normj 6298a1a
Try disabling more tests to track down CI issue
normj 5e06508
Try reverting the use of TestOutputToolInteractiveService
normj fc0e128
Revert more test changes
normj 7cb6f87
Add back tests see if they now work in CI
normj a38f6d7
Tweak SQS tests
normj 3c6e8d5
Try turning off the SQS tests
normj 1a70ee3
Add back one SQS test
normj 8aa7b80
Add back another test
normj 81e8667
Add another test
normj bf19b4b
Add cancellation tokens on HTTP requests.
normj 54e8318
Try rewriting GetFreePort
normj 329c38d
Add some checks that the Lambda Runtime API started correct if not As…
normj f85380f
Add back all the SQS tests.
normj e3be00b
Add delay using the SQS queue before using it
normj 0daf44d
Check if Lambda function task is failed
normj 073f1de
Tweak tests
normj 98b10c0
Add health check for lambda runtime api
normj 7ac01d4
Try turning on trace logging
normj 33386a5
Try redirecting stderr to stdout in tests
normj 9f71778
More tweaks to code looking for a free port.
normj 768286f
Try again without SQS integ tests
normj 6c56167
Switch logging back to Error
normj b7bcbd9
Add Tests
normj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
11 changes: 11 additions & 0 deletions
11
.autover/changes/e390422f-955d-4699-97cf-67725872e746.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
{ | ||
"Projects": [ | ||
{ | ||
"Name": "Amazon.Lambda.TestTool", | ||
"Type": "Minor", | ||
"ChangelogMessages": [ | ||
"Add SQS event source support" | ||
] | ||
} | ||
] | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
242 changes: 242 additions & 0 deletions
242
...v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using Amazon.Lambda.Model; | ||
using Amazon.Lambda.SQSEvents; | ||
using Amazon.Runtime; | ||
using Amazon.SQS.Model; | ||
using Amazon.SQS; | ||
using System.Text.Json; | ||
using Amazon.Lambda.TestTool.Services; | ||
|
||
namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; | ||
|
||
/// <summary> | ||
/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected | ||
/// Lambda function with the polled messages. | ||
/// </summary> | ||
public class SQSEventSourceBackgroundService : BackgroundService | ||
{ | ||
private static readonly List<string> DefaultAttributesToReceive = new List<string> { "All" }; | ||
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions | ||
{ | ||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase | ||
}; | ||
|
||
private readonly ILogger<SQSEventSourceProcess> _logger; | ||
private readonly IAmazonSQS _sqsClient; | ||
private readonly ILambdaClient _lambdaClient; | ||
private readonly SQSEventSourceBackgroundServiceConfig _config; | ||
|
||
/// <summary> | ||
/// Constructs instance of <see cref="SQSEventSourceBackgroundService"/>. | ||
/// </summary> | ||
/// <param name="logger">The logger</param> | ||
/// <param name="sqsClient">The SQS client used to poll messages from a queue.</param> | ||
/// <param name="config">The config of the service</param> | ||
/// <param name="lambdaClient">The Lambda client that can use a different endpoint for each invoke request.</param> | ||
public SQSEventSourceBackgroundService(ILogger<SQSEventSourceProcess> logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) | ||
{ | ||
_logger = logger; | ||
_sqsClient = sqsClient; | ||
_config = config; | ||
_lambdaClient = lambdaClient; | ||
} | ||
|
||
private async Task<string> GetQueueArn(CancellationToken stoppingToken) | ||
{ | ||
var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
AttributeNames = new List<string> { "QueueArn" } | ||
}, stoppingToken); | ||
|
||
return response.QueueARN; | ||
} | ||
|
||
/// <summary> | ||
/// Execute the SQSEventSourceBackgroundService. | ||
/// </summary> | ||
/// <param name="stoppingToken">CancellationToken used to end the service.</param> | ||
/// <returns>Task for the background service.</returns> | ||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
{ | ||
// The queue arn is needed for creating the Lambda event. | ||
var queueArn = await GetQueueArn(stoppingToken); | ||
_logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); | ||
while (!stoppingToken.IsCancellationRequested) | ||
{ | ||
try | ||
{ | ||
_logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Read a message from the queue using the ExternalCommands console application. | ||
var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
WaitTimeSeconds = 20, | ||
MessageAttributeNames = DefaultAttributesToReceive, | ||
MessageSystemAttributeNames = DefaultAttributesToReceive, | ||
MaxNumberOfMessages = _config.BatchSize, | ||
VisibilityTimeout = _config.VisibilityTimeout, | ||
}, stoppingToken); | ||
|
||
if (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
if (response.Messages == null || response.Messages.Count == 0) | ||
{ | ||
_logger.LogDebug("No messages received from while polling SQS"); | ||
// Since there are no messages, sleep a bit to wait for messages to come. | ||
await Task.Delay(200); | ||
continue; | ||
} | ||
|
||
var lambdaPayload = new | ||
{ | ||
Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn) | ||
}; | ||
|
||
var invokeRequest = new InvokeRequest | ||
{ | ||
InvocationType = InvocationType.RequestResponse, | ||
FunctionName = _config.FunctionName, | ||
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) | ||
}; | ||
|
||
_logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); | ||
var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); | ||
|
||
if (lambdaResponse.FunctionError != null) | ||
{ | ||
_logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); | ||
continue; | ||
} | ||
|
||
if (!_config.DisableMessageDelete) | ||
{ | ||
List<Message> messagesToDelete; | ||
if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) | ||
{ | ||
var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload); | ||
if (partialResponse == null) | ||
{ | ||
lambdaResponse.Payload.Position = 0; | ||
using var reader = new StreamReader(lambdaResponse.Payload); | ||
var payloadString = reader.ReadToEnd(); | ||
_logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); | ||
continue; | ||
} | ||
|
||
if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) | ||
{ | ||
_logger.LogDebug("Partial SQS response received with no failures"); | ||
messagesToDelete = response.Messages; | ||
} | ||
else | ||
{ | ||
_logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); | ||
messagesToDelete = new List<Message>(); | ||
foreach (var message in response.Messages) | ||
{ | ||
if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) | ||
{ | ||
messagesToDelete.Add(message); | ||
} | ||
} | ||
} | ||
} | ||
else | ||
{ | ||
_logger.LogDebug("No partial response received. All messages eligible for deletion"); | ||
messagesToDelete = response.Messages; | ||
} | ||
|
||
if (messagesToDelete.Count > 0) | ||
{ | ||
var deleteRequest = new DeleteMessageBatchRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() | ||
}; | ||
|
||
_logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); | ||
await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); | ||
} | ||
} | ||
} | ||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
catch (Exception e) | ||
{ | ||
_logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); | ||
|
||
// Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. | ||
await Task.Delay(3000); | ||
} | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Convert from the SDK's list of messages to the Lambda event's SQS message type. | ||
/// </summary> | ||
/// <param name="messages">List of messages using the SDK's .NET type</param> | ||
/// <param name="awsRegion">The aws region the messages came from.</param> | ||
/// <param name="queueArn">The SQS queue arn the messages came from.</param> | ||
/// <returns>List of messages using the Lambda event's .NET type.</returns> | ||
internal static List<SQSEvent.SQSMessage> ConvertToLambdaMessages(List<Message> messages, string awsRegion, string queueArn) | ||
{ | ||
return messages.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList(); | ||
} | ||
|
||
/// <summary> | ||
/// Convert from the SDK's SQS message to the Lambda event's SQS message type. | ||
/// </summary> | ||
/// <param name="message">Message using the SDK's .NET type</param> | ||
/// <param name="awsRegion">The aws region the message came from.</param> | ||
/// <param name="queueArn">The SQS queue arn the message came from.</param> | ||
/// <returns>Messages using the Lambda event's .NET type.</returns> | ||
internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn) | ||
{ | ||
var lambdaMessage = new SQSEvent.SQSMessage | ||
{ | ||
AwsRegion = awsRegion, | ||
Body = message.Body, | ||
EventSource = "aws:sqs", | ||
EventSourceArn = queueArn, | ||
Md5OfBody = message.MD5OfBody, | ||
Md5OfMessageAttributes = message.MD5OfMessageAttributes, | ||
MessageId = message.MessageId, | ||
ReceiptHandle = message.ReceiptHandle, | ||
}; | ||
|
||
if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) | ||
{ | ||
lambdaMessage.MessageAttributes = new Dictionary<string, SQSEvent.MessageAttribute>(); | ||
foreach (var kvp in message.MessageAttributes) | ||
{ | ||
var lambdaAttribute = new SQSEvent.MessageAttribute | ||
{ | ||
DataType = kvp.Value.DataType, | ||
StringValue = kvp.Value.StringValue, | ||
BinaryValue = kvp.Value.BinaryValue | ||
}; | ||
|
||
lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); | ||
} | ||
} | ||
|
||
if (message.Attributes != null && message.Attributes.Count > 0) | ||
{ | ||
lambdaMessage.Attributes = message.Attributes; | ||
} | ||
|
||
return lambdaMessage; | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.