Skip to content

Commit 3ed3c58

Browse files
committed
Update links in records published to post-ingest SNS topic
Gives subscribers to the post-ingest topic the full URIs of the ingested collections and items by repurposing the link generation fucntions used by the API.
1 parent 8cd7dba commit 3ed3c58

File tree

4 files changed

+104
-4
lines changed

4 files changed

+104
-4
lines changed

serverless.example.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ provider:
2222
# PRE_HOOK: ${self:service}-${self:provider.stage}-preHook
2323
# API_KEYS_SECRET_ID: ${self:service}-${self:provider.stage}-api-keys
2424
# POST_HOOK: ${self:service}-${self:provider.stage}-postHook
25+
# If you will be subscribing to post-ingest SNS notifications make
26+
# sure that STAC_API_URL is set so that links are updated correctly
27+
STAC_API_URL: "https://some-stac-server.com"
2528
iam:
2629
role:
2730
statements:

src/lib/api.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ export const parsePath = function (inpath) {
383383
}
384384

385385
// Impure - mutates results
386-
const addCollectionLinks = function (results, endpoint) {
386+
export const addCollectionLinks = function (results, endpoint) {
387387
results.forEach((result) => {
388388
const { id } = result
389389
let { links } = result
@@ -438,7 +438,7 @@ const addCollectionLinks = function (results, endpoint) {
438438
}
439439

440440
// Impure - mutates results
441-
const addItemLinks = function (results, endpoint) {
441+
export const addItemLinks = function (results, endpoint) {
442442
results.forEach((result) => {
443443
let { links } = result
444444
const { id, collection } = result

src/lib/ingest.js

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { getItemCreated } from './database.js'
2+
import { addItemLinks, addCollectionLinks } from './api.js'
23
import { dbClient, createIndex } from './databaseClient.js'
34
import logger from './logger.js'
45
import { publishRecordToSns } from './sns.js'
@@ -13,6 +14,8 @@ export class InvalidIngestError extends Error {
1314
}
1415
}
1516

17+
const hierarchyLinks = ['self', 'root', 'parent', 'child', 'collection', 'item', 'items']
18+
1619
export async function convertIngestObjectToDbObject(
1720
// eslint-disable-next-line max-len
1821
/** @type {{ hasOwnProperty: (arg0: string) => any; type: string, collection: string; links: any[]; id: any; }} */ data
@@ -30,12 +33,11 @@ export async function convertIngestObjectToDbObject(
3033
}
3134

3235
// remove any hierarchy links in a non-mutating way
33-
const hlinks = ['self', 'root', 'parent', 'child', 'collection', 'item', 'items']
3436
if (!data.links) {
3537
throw new InvalidIngestError('Expected a "links" proporty on the stac object')
3638
}
3739
const links = data.links.filter(
38-
(/** @type {{ rel: string; }} */ link) => !hlinks.includes(link.rel)
40+
(/** @type {{ rel: string; }} */ link) => !hierarchyLinks.includes(link.rel)
3941
)
4042
const dbDataObject = { ...data, links }
4143

@@ -162,8 +164,34 @@ export async function ingestItems(items) {
162164
return results
163165
}
164166

167+
// Impure - mutates record
168+
function updateLinksWithinRecord(record) {
169+
const endpoint = process.env['STAC_API_URL']
170+
if (!endpoint) {
171+
logger.info('STAC_API_URL not set, not updating links within ingested record')
172+
return record
173+
}
174+
if (!isItem(record) && !isCollection(record)) {
175+
logger.info('Record is not a collection or item, not updating links within ingested record')
176+
return record
177+
}
178+
179+
record.links = record.links.filter(
180+
(/** @type {{ rel: string; }} */ link) => !hierarchyLinks.includes(link.rel)
181+
)
182+
if (isItem(record)) {
183+
addItemLinks([record], endpoint)
184+
} else if (isCollection(record)) {
185+
addCollectionLinks([record], endpoint)
186+
}
187+
return record
188+
}
189+
165190
export async function publishResultsToSns(results, topicArn) {
166191
results.forEach((result) => {
192+
if (result.record && !result.error) {
193+
updateLinksWithinRecord(result.record)
194+
}
167195
publishRecordToSns(topicArn, result.record, result.error)
168196
})
169197
}

tests/system/test-ingest.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// @ts-nocheck
22

3+
import url from 'url'
34
import test from 'ava'
45
import nock from 'nock'
56
import { DateTime } from 'luxon'
@@ -349,6 +350,28 @@ test('Ingested collection is published to post-ingest SNS topic', async (t) => {
349350
t.is(attrs.recordType.Value, 'Collection')
350351
})
351352

353+
test('Ingested collection is published to post-ingest SNS topic with updated links', async (t) => {
354+
const envBeforeTest = { ...process.env }
355+
try {
356+
const hostname = 'some-stac-server.com'
357+
const endpoint = `https://${hostname}`
358+
process.env['STAC_API_URL'] = endpoint
359+
360+
const collection = await loadFixture(
361+
'landsat-8-l1-collection.json',
362+
{ id: randomId('collection') }
363+
)
364+
365+
const { message } = await testPostIngestSNS(t, collection)
366+
367+
t.truthy(message.record.links)
368+
t.true(message.record.links.every((/** @type {Link} */ link) => (
369+
link.href && url.parse(link.href).hostname === hostname)))
370+
} finally {
371+
process.env = envBeforeTest
372+
}
373+
})
374+
352375
test('Ingest collection failure is published to post-ingest SNS topic', async (t) => {
353376
const { message, attrs } = await testPostIngestSNS(t, {
354377
type: 'Collection',
@@ -407,3 +430,49 @@ test('Ingest item failure is published to post-ingest SNS topic', async (t) => {
407430
t.is(attrs.ingestStatus.Value, 'failed')
408431
t.is(attrs.recordType.Value, 'Item')
409432
})
433+
434+
test('Ingested item is published to post-ingest SNS topic with updated links', async (t) => {
435+
const envBeforeTest = { ...process.env }
436+
try {
437+
const hostname = 'some-stac-server.com'
438+
const endpoint = `https://${hostname}`
439+
process.env['STAC_API_URL'] = endpoint
440+
441+
const collection = await ingestCollectionAndPurgePostIngestQueue(t)
442+
443+
const item = await loadFixture(
444+
'stac/ingest-item.json',
445+
{ id: randomId('item'), collection: collection.id }
446+
)
447+
448+
const { message } = await testPostIngestSNS(t, item)
449+
450+
t.truthy(message.record.links)
451+
t.true(message.record.links.every((/** @type {Link} */ link) => (
452+
link.href && url.parse(link.href).hostname === hostname)))
453+
} finally {
454+
process.env = envBeforeTest
455+
}
456+
})
457+
458+
test('Ingested item facilure is published to post-ingest SNS topic without updated links', async (t) => {
459+
const envBeforeTest = { ...process.env }
460+
try {
461+
const hostname = 'some-stac-server.com'
462+
const endpoint = `https://${hostname}`
463+
process.env['STAC_API_URL'] = endpoint
464+
465+
const item = await loadFixture(
466+
'stac/ingest-item.json',
467+
{ id: randomId('item'), collection: 'INVALID COLLECTION' }
468+
)
469+
470+
const { message } = await testPostIngestSNS(t, item)
471+
472+
t.truthy(message.record.links)
473+
t.false(message.record.links.every((/** @type {Link} */ link) => (
474+
link.href && url.parse(link.href).hostname === hostname)))
475+
} finally {
476+
process.env = envBeforeTest
477+
}
478+
})

0 commit comments

Comments
 (0)