Skip to content

Commit 0e3b0c6

Browse files
authored
Merge pull request #494 from stac-utils/jcw/post-ingest-notification
2 parents 859d280 + d0a6ef5 commit 0e3b0c6

File tree

12 files changed

+562
-42
lines changed

12 files changed

+562
-42
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## [Unreleased] - TBD
99

10+
### Added
11+
12+
- Publish ingest results to a post-ingest SNS topic
13+
1014
### Changed
1115

1216
- Remove node streams-based ingest code to prepare for post-ingest notifications

README.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ subgraph ingest[Ingest]
9090
ingestSnsTopic[Ingest SNS Topic]
9191
ingestQueue[Ingest SQS Queue]
9292
ingestLambda[Ingest Lambda]
93+
postIngestSnsTopic[Post-Ingest SNS Topic]
9394
9495
ingestDeadLetterQueue[Ingest Dead Letter Queue]
95-
failedIngestLambda[Failed Ingest Lambda]
9696
end
9797
9898
users[Users]
@@ -109,9 +109,10 @@ opensearch[(OpenSearch)]
109109
itemsForIngest --> ingestSnsTopic
110110
ingestSnsTopic --> ingestQueue
111111
ingestQueue --> ingestLambda
112+
ingestQueue --> ingestDeadLetterQueue
112113
ingestLambda --> opensearch
114+
ingestLambda --> postIngestSnsTopic
113115
114-
ingestDeadLetterQueue --> failedIngestLambda
115116
116117
%% API workflow
117118
@@ -916,6 +917,17 @@ ingestion will either fail (in the case of a single Item ingest) or if auto-crea
916917

917918
If a collection or item is ingested, and an item with that id already exists in STAC, the new item will completely replace the old item.
918919

920+
After a collection or item is ingested, the status of the ingest (success or failure) along with details of the collection or item are sent to a post-ingest SNS topic. To take action on items after they are ingested subscribe an endpoint to this topic.
921+
922+
Messages published to the post-ingest SNS topic include the following atributes that can be used for filtering:
923+
924+
| attribute | type | values |
925+
| ------------ | ------ | ------------------------ |
926+
| recordType | String | `Collection` or `Item` |
927+
| ingestStatus | String | `successful` or `failed` |
928+
| collection | String | |
929+
930+
919931
### Ingesting large items
920932

921933
There is a 256 KB limit on the size of SQS messages. Larger items can by publishing a message to the `stac-server-<stage>-ingest` SNS topic in with the format:
@@ -936,7 +948,7 @@ Stac-server can also be subscribed to SNS Topics that publish complete STAC Item
936948

937949
### Ingest Errors
938950

939-
Errors that occur during ingest will end up in the dead letter processing queue, where they are processed by the `stac-server-<stage>-failed-ingest` Lambda function. Currently all the failed-ingest Lambda does is log the error, see the CloudWatch log `/aws/lambda/stac-server-<stage>-failed-ingest` for errors.
951+
Errors that occur while consuming items from the ingest queue will end up in the dead letter processing queue.
940952

941953
## Supporting Cross-cluster Search and Replication
942954

serverless.example.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ provider:
2222
# PRE_HOOK: ${self:service}-${self:provider.stage}-preHook
2323
# API_KEYS_SECRET_ID: ${self:service}-${self:provider.stage}-api-keys
2424
# POST_HOOK: ${self:service}-${self:provider.stage}-postHook
25+
# If you will be subscribing to post-ingest SNS notifications make
26+
# sure that STAC_API_URL is set so that links are updated correctly
27+
STAC_API_URL: "https://some-stac-server.com"
2528
iam:
2629
role:
2730
statements:
@@ -72,6 +75,8 @@ functions:
7275
handler: index.handler
7376
memorySize: 512
7477
timeout: 60
78+
environment:
79+
POST_INGEST_TOPIC_ARN: !Ref postIngestTopic
7580
package:
7681
artifact: dist/ingest/ingest.zip
7782
events:
@@ -101,6 +106,14 @@ resources:
101106
Type: "AWS::SNS::Topic"
102107
Properties:
103108
TopicName: ${self:service}-${self:provider.stage}-ingest
109+
postIngestTopic:
110+
# After a collection or item is ingested, the status of the ingest (success
111+
# or failure) along with details of the collection or item are sent to this
112+
# SNS topic. To take future action on items after they are ingested
113+
# suscribe an endpoint to this topic
114+
Type: AWS::SNS::Topic
115+
Properties:
116+
TopicName: ${self:service}-${self:provider.stage}-post-ingest
104117
deadLetterQueue:
105118
Type: AWS::SQS::Queue
106119
Properties:

src/lambdas/ingest/index.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable import/prefer-default-export */
22
import got from 'got' // eslint-disable-line import/no-unresolved
33
import { createIndex } from '../../lib/databaseClient.js'
4-
import { ingestItems } from '../../lib/ingest.js'
4+
import { ingestItems, publishResultsToSns } from '../../lib/ingest.js'
55
import getObjectJson from '../../lib/s3-utils.js'
66
import logger from '../../lib/logger.js'
77

@@ -60,8 +60,17 @@ export const handler = async (event, _context) => {
6060
: [event]
6161

6262
try {
63-
await ingestItems(stacItems)
63+
const results = await ingestItems(stacItems)
6464
logger.debug('Ingested %d items: %j', stacItems.length, stacItems)
65+
66+
const postIngestTopicArn = process.env['POST_INGEST_TOPIC_ARN']
67+
68+
if (postIngestTopicArn) {
69+
logger.debug('Publishing to post-ingest topic: %s', postIngestTopicArn)
70+
publishResultsToSns(results, postIngestTopicArn)
71+
} else {
72+
logger.debug('Skkipping post-ingest notification since no topic is configured')
73+
}
6574
} catch (error) {
6675
logger.error(error)
6776
throw (error)

src/lib/api.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ export const parsePath = function (inpath) {
383383
}
384384

385385
// Impure - mutates results
386-
const addCollectionLinks = function (results, endpoint) {
386+
export const addCollectionLinks = function (results, endpoint) {
387387
results.forEach((result) => {
388388
const { id } = result
389389
let { links } = result
@@ -438,7 +438,7 @@ const addCollectionLinks = function (results, endpoint) {
438438
}
439439

440440
// Impure - mutates results
441-
const addItemLinks = function (results, endpoint) {
441+
export const addItemLinks = function (results, endpoint) {
442442
results.forEach((result) => {
443443
let { links } = result
444444
const { id, collection } = result

src/lib/ingest.js

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,43 @@
11
import { getItemCreated } from './database.js'
2+
import { addItemLinks, addCollectionLinks } from './api.js'
23
import { dbClient, createIndex } from './databaseClient.js'
34
import logger from './logger.js'
5+
import { publishRecordToSns } from './sns.js'
6+
import { isCollection, isItem } from './stac-utils.js'
47

58
const COLLECTIONS_INDEX = process.env['COLLECTIONS_INDEX'] || 'collections'
69

10+
export class InvalidIngestError extends Error {
11+
constructor(message) {
12+
super(message)
13+
this.name = 'InvalidIngestError'
14+
}
15+
}
16+
17+
const hierarchyLinks = ['self', 'root', 'parent', 'child', 'collection', 'item', 'items']
18+
719
export async function convertIngestObjectToDbObject(
820
// eslint-disable-next-line max-len
921
/** @type {{ hasOwnProperty: (arg0: string) => any; type: string, collection: string; links: any[]; id: any; }} */ data
1022
) {
1123
let index = ''
1224
logger.debug('data', data)
13-
if (data && data.type === 'Collection') {
25+
if (isCollection(data)) {
1426
index = COLLECTIONS_INDEX
15-
} else if (data && data.type === 'Feature') {
27+
} else if (isItem(data)) {
1628
index = data.collection
1729
} else {
18-
return null
30+
throw new InvalidIngestError(
31+
`Expeccted data.type to be "Collection" or "Feature" not ${data.type}`
32+
)
1933
}
2034

2135
// remove any hierarchy links in a non-mutating way
22-
const hlinks = ['self', 'root', 'parent', 'child', 'collection', 'item', 'items']
36+
if (!data.links) {
37+
throw new InvalidIngestError('Expected a "links" proporty on the stac object')
38+
}
2339
const links = data.links.filter(
24-
(/** @type {{ rel: string; }} */ link) => !hlinks.includes(link.rel)
40+
(/** @type {{ rel: string; }} */ link) => !hierarchyLinks.includes(link.rel)
2541
)
2642
const dbDataObject = { ...data, links }
2743

@@ -77,9 +93,7 @@ export async function writeRecordToDb(
7793
// if this isn't a collection check if index exists
7894
const exists = await client.indices.exists({ index })
7995
if (!exists.body) {
80-
const msg = `Index ${index} does not exist, add before ingesting items`
81-
logger.debug(msg)
82-
throw new Error(msg)
96+
throw new InvalidIngestError(`Index ${index} does not exist, add before ingesting items`)
8397
}
8498
}
8599

@@ -112,33 +126,72 @@ export async function writeRecordsInBulkToDb(records) {
112126
}
113127
}
114128

115-
async function asyncMapInSequence(objects, asyncFn) {
129+
function logIngestItemsResults(results) {
130+
results.forEach((result) => {
131+
if (result.error) {
132+
if (result.error instanceof InvalidIngestError) {
133+
// Attempting to ingest invalid stac objects is not a system error so we
134+
// log it as info and not error
135+
logger.info('Invalid ingest item', result.error)
136+
} else {
137+
logger.error('Error while ingesting item', result.error)
138+
}
139+
} else {
140+
logger.debug('Ingested item %j', result)
141+
}
142+
})
143+
}
144+
145+
export async function ingestItems(items) {
116146
const results = []
117-
for (const object of objects) {
147+
for (const record of items) {
148+
let dbRecord
149+
let result
150+
let error
118151
try {
119-
// This helper is inteneted to be used with the objects must be processed
120-
// in sequence so we intentionally await each iteration.
152+
// We are intentionally writing records one at a time in sequence so we
153+
// disable this rule
154+
// eslint-disable-next-line no-await-in-loop
155+
dbRecord = await convertIngestObjectToDbObject(record)
121156
// eslint-disable-next-line no-await-in-loop
122-
const result = await asyncFn(object)
123-
results.push(result)
124-
} catch (error) {
125-
results.push(error)
157+
result = await writeRecordToDb(dbRecord)
158+
} catch (e) {
159+
error = e
126160
}
161+
results.push({ record, dbRecord, result, error })
127162
}
163+
logIngestItemsResults(results)
128164
return results
129165
}
130166

131-
function logErrorResults(results) {
167+
// Impure - mutates record
168+
function updateLinksWithinRecord(record) {
169+
const endpoint = process.env['STAC_API_URL']
170+
if (!endpoint) {
171+
logger.info('STAC_API_URL not set, not updating links within ingested record')
172+
return record
173+
}
174+
if (!isItem(record) && !isCollection(record)) {
175+
logger.info('Record is not a collection or item, not updating links within ingested record')
176+
return record
177+
}
178+
179+
record.links = record.links.filter(
180+
(/** @type {{ rel: string; }} */ link) => !hierarchyLinks.includes(link.rel)
181+
)
182+
if (isItem(record)) {
183+
addItemLinks([record], endpoint)
184+
} else if (isCollection(record)) {
185+
addCollectionLinks([record], endpoint)
186+
}
187+
return record
188+
}
189+
190+
export async function publishResultsToSns(results, topicArn) {
132191
results.forEach((result) => {
133-
if (result instanceof Error) {
134-
logger.error('Error while ingesting item', result)
192+
if (result.record && !result.error) {
193+
updateLinksWithinRecord(result.record)
135194
}
195+
publishRecordToSns(topicArn, result.record, result.error)
136196
})
137197
}
138-
139-
export async function ingestItems(items) {
140-
const records = await asyncMapInSequence(items, convertIngestObjectToDbObject)
141-
const results = await asyncMapInSequence(records, writeRecordToDb)
142-
logErrorResults(results)
143-
return results
144-
}

src/lib/sns.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { sns } from './aws-clients.js'
2+
import logger from './logger.js'
3+
import { isCollection, isItem } from './stac-utils.js'
4+
5+
const attrsFromPayload = function (payload) {
6+
let type = 'unknown'
7+
let collection = ''
8+
if (isCollection(payload.record)) {
9+
type = 'Collection'
10+
collection = payload.record.id || ''
11+
} else if (isItem(payload.record)) {
12+
type = 'Item'
13+
collection = payload.record.collection || ''
14+
}
15+
16+
return {
17+
recordType: {
18+
DataType: 'String',
19+
StringValue: type
20+
},
21+
ingestStatus: {
22+
DataType: 'String',
23+
StringValue: payload.error ? 'failed' : 'successful'
24+
},
25+
collection: {
26+
DataType: 'String',
27+
StringValue: collection
28+
}
29+
}
30+
}
31+
32+
/* eslint-disable-next-line import/prefer-default-export */
33+
export async function publishRecordToSns(topicArn, record, error) {
34+
const payload = { record, error }
35+
try {
36+
await sns().publish({
37+
Message: JSON.stringify(payload),
38+
TopicArn: topicArn,
39+
MessageAttributes: attrsFromPayload(payload)
40+
}).promise()
41+
logger.info(`Wrote record ${record.id} to ${topicArn}`)
42+
} catch (err) {
43+
logger.error(`Failed to write record ${record.id} to ${topicArn}: ${err}`)
44+
}
45+
}

src/lib/stac-utils.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export function isCollection(record) {
2+
return record && record.type === 'Collection'
3+
}
4+
5+
export function isItem(record) {
6+
return record && record.type === 'Feature'
7+
}

0 commit comments

Comments
 (0)