Skip to content

Commit 5a6a93f

Browse files
committed
Publish ingest results to post-ingest SNS topic
1 parent a956961 commit 5a6a93f

File tree

8 files changed

+411
-33
lines changed

8 files changed

+411
-33
lines changed

src/lambdas/ingest/index.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable import/prefer-default-export */
22
import got from 'got' // eslint-disable-line import/no-unresolved
33
import { createIndex } from '../../lib/databaseClient.js'
4-
import { ingestItems } from '../../lib/ingest.js'
4+
import { ingestItems, publishResultsToSns } from '../../lib/ingest.js'
55
import getObjectJson from '../../lib/s3-utils.js'
66
import logger from '../../lib/logger.js'
77

@@ -60,8 +60,17 @@ export const handler = async (event, _context) => {
6060
: [event]
6161

6262
try {
63-
await ingestItems(stacItems)
63+
const results = await ingestItems(stacItems)
6464
logger.debug('Ingested %d items: %j', stacItems.length, stacItems)
65+
66+
const postIngestTopicArn = process.env['POST_INGEST_TOPIC_ARN']
67+
68+
if (postIngestTopicArn) {
69+
logger.debug('Publishing to post-ingest topic: %s', postIngestTopicArn)
70+
publishResultsToSns(results, postIngestTopicArn)
71+
} else {
72+
logger.debug('Skkipping post-ingest notification since no topic is configured')
73+
}
6574
} catch (error) {
6675
logger.error(error)
6776
throw (error)

src/lib/ingest.js

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { getItemCreated } from './database.js'
22
import { dbClient, createIndex } from './databaseClient.js'
33
import logger from './logger.js'
4+
import { publishRecordToSns } from './sns.js'
5+
import { isCollection, isItem } from './stac-utils.js'
46

57
const COLLECTIONS_INDEX = process.env['COLLECTIONS_INDEX'] || 'collections'
68

@@ -10,12 +12,12 @@ export async function convertIngestObjectToDbObject(
1012
) {
1113
let index = ''
1214
logger.debug('data', data)
13-
if (data && data.type === 'Collection') {
15+
if (isCollection(data)) {
1416
index = COLLECTIONS_INDEX
15-
} else if (data && data.type === 'Feature') {
17+
} else if (isItem(data)) {
1618
index = data.collection
1719
} else {
18-
return null
20+
throw new Error(`Expeccted data.type to be "Collection" or "Feature" not ${data.type}`)
1921
}
2022

2123
// remove any hierarchy links in a non-mutating way
@@ -112,33 +114,40 @@ export async function writeRecordsInBulkToDb(records) {
112114
}
113115
}
114116

115-
async function asyncMapInSequence(objects, asyncFn) {
117+
function logIngestItemsResults(results) {
118+
results.forEach((result) => {
119+
if (result.error) {
120+
logger.error('Error while ingesting item', result.error)
121+
} else {
122+
logger.debug('Ingested item %j', result)
123+
}
124+
})
125+
}
126+
127+
export async function ingestItems(items) {
116128
const results = []
117-
for (const object of objects) {
129+
for (const record of items) {
130+
let dbRecord
131+
let result
132+
let error
118133
try {
119-
// This helper is inteneted to be used with the objects must be processed
120-
// in sequence so we intentionally await each iteration.
134+
// We are intentionally writing records one at a time in sequence so we
135+
// disable this rule
121136
// eslint-disable-next-line no-await-in-loop
122-
const result = await asyncFn(object)
123-
results.push(result)
124-
} catch (error) {
125-
results.push(error)
137+
dbRecord = await convertIngestObjectToDbObject(record)
138+
// eslint-disable-next-line no-await-in-loop
139+
result = await writeRecordToDb(dbRecord)
140+
} catch (e) {
141+
error = e
126142
}
143+
results.push({ record, dbRecord, result, error })
127144
}
145+
logIngestItemsResults(results)
128146
return results
129147
}
130148

131-
function logErrorResults(results) {
149+
export async function publishResultsToSns(results, topicArn) {
132150
results.forEach((result) => {
133-
if (result instanceof Error) {
134-
logger.error('Error while ingesting item', result)
135-
}
151+
publishRecordToSns(topicArn, result.record, result.error)
136152
})
137153
}
138-
139-
export async function ingestItems(items) {
140-
const records = await asyncMapInSequence(items, convertIngestObjectToDbObject)
141-
const results = await asyncMapInSequence(records, writeRecordToDb)
142-
logErrorResults(results)
143-
return results
144-
}

src/lib/sns.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { sns } from './aws-clients.js'
2+
import logger from './logger.js'
3+
import { isCollection, isItem } from './stac-utils.js'
4+
5+
const attrsFromPayload = function (payload) {
6+
let type = 'unknown'
7+
let collection = ''
8+
if (isCollection(payload.record)) {
9+
type = 'Collection'
10+
collection = payload.record.id || ''
11+
} else if (isItem(payload.record)) {
12+
type = 'Item'
13+
collection = payload.record.collection || ''
14+
}
15+
16+
return {
17+
recordType: {
18+
DataType: 'String',
19+
StringValue: type
20+
},
21+
ingestStatus: {
22+
DataType: 'String',
23+
StringValue: payload.error ? 'failed' : 'successful'
24+
},
25+
collection: {
26+
DataType: 'String',
27+
StringValue: collection
28+
}
29+
}
30+
}
31+
32+
/* eslint-disable-next-line import/prefer-default-export */
33+
export async function publishRecordToSns(topicArn, record, error) {
34+
const payload = { record, error }
35+
try {
36+
await sns().publish({
37+
Message: JSON.stringify(payload),
38+
TopicArn: topicArn,
39+
MessageAttributes: attrsFromPayload(payload)
40+
}).promise()
41+
logger.info(`Wrote record ${record.id} to ${topicArn}`)
42+
} catch (err) {
43+
logger.error(`Failed to write record ${record.id} to ${topicArn}: ${err}`)
44+
}
45+
}

src/lib/stac-utils.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export function isCollection(record) {
2+
return record && record.type === 'Collection'
3+
}
4+
5+
export function isItem(record) {
6+
return record && record.type === 'Feature'
7+
}

tests/fixtures/stac/ingest-item.json

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
{
2+
"assets": {
3+
"ANG": {
4+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_ANG.txt",
5+
"title": "Angle coefficients file",
6+
"type": "text/plain"
7+
},
8+
"B1": {
9+
"eo:bands": [
10+
0
11+
],
12+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B1.TIF",
13+
"title": "Band 1 (coastal)",
14+
"type": "image/tiff; application=geotiff"
15+
},
16+
"B10": {
17+
"eo:bands": [
18+
9
19+
],
20+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B10.TIF",
21+
"title": "Band 10 (lwir)",
22+
"type": "image/tiff; application=geotiff"
23+
},
24+
"B11": {
25+
"eo:bands": [
26+
10
27+
],
28+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B11.TIF",
29+
"title": "Band 11 (lwir)",
30+
"type": "image/tiff; application=geotiff"
31+
},
32+
"B2": {
33+
"eo:bands": [
34+
1
35+
],
36+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B2.TIF",
37+
"title": "Band 2 (blue)",
38+
"type": "image/tiff; application=geotiff"
39+
},
40+
"B3": {
41+
"eo:bands": [
42+
2
43+
],
44+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B3.TIF",
45+
"title": "Band 3 (green)",
46+
"type": "image/tiff; application=geotiff"
47+
},
48+
"B4": {
49+
"eo:bands": [
50+
3
51+
],
52+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B4.TIF",
53+
"title": "Band 4 (red)",
54+
"type": "image/tiff; application=geotiff"
55+
},
56+
"B5": {
57+
"eo:bands": [
58+
4
59+
],
60+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B5.TIF",
61+
"title": "Band 5 (nir)",
62+
"type": "image/tiff; application=geotiff"
63+
},
64+
"B6": {
65+
"eo:bands": [
66+
5
67+
],
68+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B6.TIF",
69+
"title": "Band 6 (swir16)",
70+
"type": "image/tiff; application=geotiff"
71+
},
72+
"B7": {
73+
"eo:bands": [
74+
6
75+
],
76+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B7.TIF",
77+
"title": "Band 7 (swir22)",
78+
"type": "image/tiff; application=geotiff"
79+
},
80+
"B8": {
81+
"eo:bands": [
82+
7
83+
],
84+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B8.TIF",
85+
"title": "Band 8 (pan)",
86+
"type": "image/tiff; application=geotiff"
87+
},
88+
"B9": {
89+
"eo:bands": [
90+
8
91+
],
92+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_B9.TIF",
93+
"title": "Band 9 (cirrus)",
94+
"type": "image/tiff; application=geotiff"
95+
},
96+
"BQA": {
97+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_BQA.TIF",
98+
"title": "Band quality data",
99+
"type": "image/tiff; application=geotiff"
100+
},
101+
"MTL": {
102+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_MTL.txt",
103+
"title": "original metadata file",
104+
"type": "text/plain"
105+
},
106+
"index": {
107+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/index.html",
108+
"title": "HTML index page",
109+
"type": "text/html"
110+
},
111+
"thumbnail": {
112+
"href": "https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/010/LC80100102015082LGN00/LC80100102015082LGN00_thumb_large.jpg",
113+
"title": "Thumbnail image",
114+
"type": "image/jpeg"
115+
}
116+
},
117+
"bbox": [
118+
-52.70915,
119+
69.68455,
120+
-45.21422,
121+
72.07651
122+
],
123+
"geometry": {
124+
"coordinates": [
125+
[
126+
[
127+
-52.70915,
128+
72.07651
129+
],
130+
[
131+
-45.21422,
132+
71.9981
133+
],
134+
[
135+
-45.8512,
136+
69.68455
137+
],
138+
[
139+
-52.52,
140+
69.75337
141+
],
142+
[
143+
-52.70915,
144+
72.07651
145+
]
146+
]
147+
],
148+
"type": "Polygon"
149+
},
150+
"id": "itemId",
151+
"links": [
152+
{
153+
"href": "LC80100102015050LGN00.json",
154+
"rel": "self"
155+
},
156+
{
157+
"href": "catalog.json",
158+
"rel": "root"
159+
},
160+
{
161+
"href": "collection.json",
162+
"rel": "parent"
163+
},
164+
{
165+
"href": "collection.json",
166+
"rel": "collection"
167+
}
168+
],
169+
"collection": "ingest",
170+
"properties": {
171+
"datetime": "2015-03-23T15:05:56.200728+00:00",
172+
"view:sun_azimuth": 175.47823280,
173+
"view:sun_elevation": 20.20572191,
174+
"landsat:path": "10",
175+
"landsat:processing_level": "L1T",
176+
"landsat:product_id": null,
177+
"landsat:row": "10",
178+
"landsat:scene_id": "LC80100102015082LGN00"
179+
},
180+
"type": "Feature",
181+
"stac_extensions": [
182+
"https://landsat.usgs.gov/stac/landsat-extension/v1.1.1/schema.json",
183+
"https://stac-extensions.github.io/eo/v1.0.0/schema.json",
184+
"https://stac-extensions.github.io/view/v1.0.0/schema.json"
185+
],
186+
"stac_version": "1.0.0"
187+
}

tests/helpers/ingest.js

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { sns } from '../../src/lib/aws-clients.js'
1+
import { sns, sqs } from '../../src/lib/aws-clients.js'
22
import { handler } from '../../src/lambdas/ingest/index.js'
33
import { sqsTriggerLambda } from './sqs.js'
44
import { refreshIndices } from './database.js'
@@ -73,3 +73,31 @@ export const ingestFixtureC = (ingestTopicArn, ingestQueueUrl) =>
7373
filename,
7474
overrides
7575
})
76+
77+
export async function testPostIngestSNS(t, record) {
78+
// @ts-ignore
79+
process.env.POST_INGEST_TOPIC_ARN = t.context.postIngestTopicArn
80+
81+
await sns().publish({
82+
TopicArn: t.context.ingestTopicArn,
83+
Message: JSON.stringify(record)
84+
}).promise()
85+
86+
await sqsTriggerLambda(t.context.ingestQueueUrl, handler)
87+
88+
const { Messages } = await sqs().receiveMessage({
89+
QueueUrl: t.context.postIngestQueueUrl,
90+
WaitTimeSeconds: 1
91+
}).promise()
92+
93+
t.truthy(Messages, 'Post-ingest message not found in queue')
94+
t.false(Messages && Messages.length > 1, 'More than one message in post-ingest queue')
95+
96+
const message = Messages && Messages.length > 0 ? Messages[0] : undefined
97+
const messageBody = message && message.Body ? JSON.parse(message.Body) : undefined
98+
99+
return {
100+
message: messageBody && messageBody.Message ? JSON.parse(messageBody.Message) : undefined,
101+
attrs: messageBody ? messageBody.MessageAttributes : undefined
102+
}
103+
}

0 commit comments

Comments
 (0)