-
Notifications
You must be signed in to change notification settings - Fork 490
Add SQS event source support for TestTool #2008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 13 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
9d91dd2
Add SQS event source to test tool
normj 49e4570
Add unit tests
normj a10ddaf
Add integ tests
normj 7ef91de
Add change log file
normj ae43475
Update switches description
normj 597d943
Add delay to handle SQS eventual consistency
normj fd1612f
Add dispose of TcpListener when looking for a free port.
normj de271ac
Rework how random ports are picked in tests
normj e484507
Add collection attribute to SQS integ tests
normj 128befd
Add [RetryFact] for new SQS integ tests
normj 016f318
Do more checks to see if port is available
normj 2eca321
Add more logging for tests.
normj 0bc3a19
Add started polling info log message
normj cc45ffa
Merge branch 'dev' into normj/testtool-sqs
normj 3108beb
Address PR comments
normj 377b15d
Merge branch 'dev' into normj/testtool-sqs
normj 76c92c8
Merge branch 'normj/testtool-sqs' of https://github.com/aws/aws-lambd…
normj eeee081
Cancel polling for next invocations if server if request has been can…
normj 2d37ce7
When cleaning up test instances don't block waiting for the cancel to…
normj e5121e4
Rework the RunCommand block on running tasks to take into account the…
normj 484a885
Shutdown cleanup
normj f64f3a3
Try disabling SQS integ tests
normj 6298a1a
Try disabling more tests to track down CI issue
normj 5e06508
Try reverting the use of TestOutputToolInteractiveService
normj fc0e128
Revert more test changes
normj 7cb6f87
Add back tests see if they now work in CI
normj a38f6d7
Tweak SQS tests
normj 3c6e8d5
Try turning off the SQS tests
normj 1a70ee3
Add back one SQS test
normj 8aa7b80
Add back another test
normj 81e8667
Add another test
normj bf19b4b
Add cancellation tokens on HTTP requests.
normj 54e8318
Try rewriting GetFreePort
normj 329c38d
Add some checks that the Lambda Runtime API started correct if not As…
normj f85380f
Add back all the SQS tests.
normj e3be00b
Add delay using the SQS queue before using it
normj 0daf44d
Check if Lambda function task is failed
normj 073f1de
Tweak tests
normj 98b10c0
Add health check for lambda runtime api
normj 7ac01d4
Try turning on trace logging
normj 33386a5
Try redirecting stderr to stdout in tests
normj 9f71778
More tweaks to code looking for a free port.
normj 768286f
Try again without SQS integ tests
normj 6c56167
Switch logging back to Error
normj b7bcbd9
Add Tests
normj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
11 changes: 11 additions & 0 deletions
11
.autover/changes/e390422f-955d-4699-97cf-67725872e746.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
{ | ||
"Projects": [ | ||
{ | ||
"Name": "Amazon.Lambda.TestTool", | ||
"Type": "Minor", | ||
"ChangelogMessages": [ | ||
"Add SQS event source support" | ||
] | ||
} | ||
] | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
244 changes: 244 additions & 0 deletions
244
...v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using Amazon.Lambda.Model; | ||
using Amazon.Lambda.SQSEvents; | ||
using Amazon.Runtime; | ||
using Amazon.SQS.Model; | ||
using Amazon.SQS; | ||
using System.Text.Json; | ||
|
||
namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; | ||
|
||
/// <summary> | ||
/// IHostedService that will run continuially polling the SQS queue for messages and invoking the connected | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Lambda function with the polled messages. | ||
/// </summary> | ||
public class SQSEventSourceBackgroundService : BackgroundService | ||
{ | ||
private static readonly List<string> DefaultAttributesToReceive = new List<string> { "All" }; | ||
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions | ||
{ | ||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase | ||
}; | ||
|
||
private readonly ILogger<SQSEventSourceProcess> _logger; | ||
private readonly IAmazonSQS _sqsClient; | ||
private readonly IAmazonLambda _lambdaClient; | ||
private readonly SQSEventSourceBackgroundServiceConfig _config; | ||
|
||
/// <summary> | ||
/// Constructs instance of SQSEventSourceBackgroundService. | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// </summary> | ||
/// <param name="logger"></param> | ||
/// <param name="sqsClient"></param> | ||
/// <param name="config"></param> | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public SQSEventSourceBackgroundService(ILogger<SQSEventSourceProcess> logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config) | ||
{ | ||
_logger = logger; | ||
_sqsClient = sqsClient; | ||
_config = config; | ||
|
||
_lambdaClient = new AmazonLambdaClient(new BasicAWSCredentials("accessKey", "secretKey"), new AmazonLambdaConfig | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
ServiceURL = _config.LambdaRuntimeApi | ||
}); | ||
} | ||
|
||
private async Task<string> GetQueueArn(CancellationToken stoppingToken) | ||
{ | ||
var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
AttributeNames = new List<string> { "QueueArn" } | ||
}, stoppingToken); | ||
|
||
return response.QueueARN; | ||
} | ||
|
||
/// <summary> | ||
/// Execute the SQSEventSourceBackgroundService. | ||
/// </summary> | ||
/// <param name="stoppingToken"></param> | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// <returns></returns> | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
{ | ||
// The queue arn is needed for creating the Lambda event. | ||
var queueArn = await GetQueueArn(stoppingToken); | ||
_logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); | ||
while (!stoppingToken.IsCancellationRequested) | ||
{ | ||
try | ||
{ | ||
_logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Read a message from the queue using the ExternalCommands console application. | ||
var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
WaitTimeSeconds = 20, | ||
MessageAttributeNames = DefaultAttributesToReceive, | ||
MessageSystemAttributeNames = DefaultAttributesToReceive, | ||
MaxNumberOfMessages = _config.BatchSize, | ||
VisibilityTimeout = _config.VisibilityTimeout, | ||
}, stoppingToken); | ||
|
||
if (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
if (response.Messages == null || response.Messages.Count == 0) | ||
{ | ||
_logger.LogDebug("No messages received from while polling SQS"); | ||
// Since there are no messages, sleep a bit to wait for messages to come. | ||
await Task.Delay(1000); | ||
continue; | ||
} | ||
|
||
var lambdaPayload = new | ||
{ | ||
Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn) | ||
}; | ||
|
||
var invokeRequest = new InvokeRequest | ||
{ | ||
InvocationType = InvocationType.RequestResponse, | ||
FunctionName = _config.FunctionName, | ||
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) | ||
}; | ||
|
||
_logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); | ||
var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, stoppingToken); | ||
|
||
if (lambdaResponse.FunctionError != null) | ||
{ | ||
_logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); | ||
continue; | ||
} | ||
|
||
if (!_config.DisableMessageDelete) | ||
{ | ||
List<Message> messagesToDelete; | ||
if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) | ||
{ | ||
var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload); | ||
if (partialResponse == null) | ||
{ | ||
lambdaResponse.Payload.Position = 0; | ||
using var reader = new StreamReader(lambdaResponse.Payload); | ||
var payloadString = reader.ReadToEnd(); | ||
_logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); | ||
continue; | ||
} | ||
|
||
if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) | ||
{ | ||
_logger.LogDebug("Partial SQS response received with no failures"); | ||
messagesToDelete = response.Messages; | ||
} | ||
else | ||
{ | ||
_logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); | ||
messagesToDelete = new List<Message>(); | ||
foreach (var message in response.Messages) | ||
{ | ||
if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) | ||
{ | ||
messagesToDelete.Add(message); | ||
} | ||
} | ||
} | ||
} | ||
else | ||
{ | ||
_logger.LogDebug("No partial response received. All messages eligible for deletion"); | ||
messagesToDelete = response.Messages; | ||
} | ||
|
||
if (messagesToDelete.Count > 0) | ||
{ | ||
var deleteRequest = new DeleteMessageBatchRequest | ||
{ | ||
QueueUrl = _config.QueueUrl, | ||
Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() | ||
}; | ||
|
||
_logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); | ||
await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); | ||
} | ||
} | ||
} | ||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) | ||
{ | ||
return; | ||
} | ||
catch (Exception e) | ||
{ | ||
_logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); | ||
|
||
// Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. | ||
await Task.Delay(3000); | ||
} | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Convert from the SDK's list of messages to the Lambda event's SQS message type. | ||
/// </summary> | ||
/// <param name="message"></param> | ||
/// <param name="awsRegion"></param> | ||
/// <param name="queueArn"></param> | ||
/// <returns></returns> | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
internal static List<SQSEvent.SQSMessage> ConvertToLambdaMessages(List<Message> message, string awsRegion, string queueArn) | ||
{ | ||
return message.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList(); | ||
} | ||
|
||
/// <summary> | ||
/// Convert from the SDK's SQS message to the Lambda event's SQS message type. | ||
/// </summary> | ||
/// <param name="message"></param> | ||
/// <param name="awsRegion"></param> | ||
/// <param name="queueArn"></param> | ||
/// <returns></returns> | ||
philasmar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn) | ||
{ | ||
var lambdaMessage = new SQSEvent.SQSMessage | ||
{ | ||
AwsRegion = awsRegion, | ||
Body = message.Body, | ||
EventSource = "aws:sqs", | ||
EventSourceArn = queueArn, | ||
Md5OfBody = message.MD5OfBody, | ||
Md5OfMessageAttributes = message.MD5OfMessageAttributes, | ||
MessageId = message.MessageId, | ||
ReceiptHandle = message.ReceiptHandle, | ||
}; | ||
|
||
if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) | ||
{ | ||
lambdaMessage.MessageAttributes = new Dictionary<string, SQSEvent.MessageAttribute>(); | ||
foreach (var kvp in message.MessageAttributes) | ||
{ | ||
var lambdaAttribute = new SQSEvent.MessageAttribute | ||
{ | ||
DataType = kvp.Value.DataType, | ||
StringValue = kvp.Value.StringValue, | ||
BinaryValue = kvp.Value.BinaryValue | ||
}; | ||
|
||
lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); | ||
} | ||
} | ||
|
||
if (message.Attributes != null && message.Attributes.Count > 0) | ||
{ | ||
lambdaMessage.Attributes = message.Attributes; | ||
} | ||
|
||
return lambdaMessage; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.