Skip to content

Commit 2802f06

Browse files
authored
Merge pull request #508 from stac-utils/jcw/fix-sns-delivery
2 parents df7f8e8 + fafa599 commit 2802f06

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

src/lib/ingest.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,11 @@ function updateLinksWithinRecord(record) {
187187
return record
188188
}
189189

190-
export async function publishResultsToSns(results, topicArn) {
191-
results.forEach((result) => {
190+
export function publishResultsToSns(results, topicArn) {
191+
results.forEach(async (result) => {
192192
if (result.record && !result.error) {
193193
updateLinksWithinRecord(result.record)
194194
}
195-
publishRecordToSns(topicArn, result.record, result.error)
195+
await publishRecordToSns(topicArn, result.record, result.error)
196196
})
197197
}

tests/system/test-ingest.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,17 +384,35 @@ test('Ingest collection failure is published to post-ingest SNS topic', async (t
384384
t.is(attrs.recordType.Value, 'Collection')
385385
})
386386

387+
async function emptyPostIngestQueue(t) {
388+
// We initially tried calling
389+
// await sqs().purgeQueue({ QueueUrl: postIngestQueueUrl }).promise()
390+
// But at least one test would intermittently fail because of an additional
391+
// message in the queue.
392+
// The documentation for the purgeQueue method says:
393+
// "The message deletion process takes up to 60 seconds.
394+
// We recommend waiting for 60 seconds regardless of your queue's size."
395+
let result
396+
do {
397+
// eslint-disable-next-line no-await-in-loop
398+
result = await sqs().receiveMessage({
399+
QueueUrl: t.context.postIngestQueueUrl,
400+
WaitTimeSeconds: 1
401+
}).promise()
402+
} while (result.Message && result.Message.length > 0)
403+
}
404+
387405
async function ingestCollectionAndPurgePostIngestQueue(t) {
388-
const { ingestFixture, postIngestQueueUrl } = t.context
406+
const { ingestFixture } = t.context
389407

390408
const collection = await ingestFixture(
391409
'landsat-8-l1-collection.json',
392410
{ id: randomId('collection') }
393411
)
394412

395-
// Purging the post-ingest queue ensures that subsequent calls to testPostIngestSNS
413+
// Emptying the post-ingest queue ensures that subsequent calls to testPostIngestSNS
396414
// only see the message posted after the final ingest
397-
await sqs().purgeQueue({ QueueUrl: postIngestQueueUrl }).promise()
415+
await emptyPostIngestQueue(t)
398416

399417
return collection
400418
}

0 commit comments

Comments
 (0)