Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions forge/ee/lib/teamBroker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module.exports.init = async function (app) {
*/
async function addUsedTopic (topic, team) {
const teamId = app.db.models.Team.decodeHashid(team)
const cacheHit = topicCache.get(`${teamId}#${topic}`)
const cacheHit = topicCache.get(`${teamId[0]}#${topic}`)
if (!cacheHit) {
await app.db.models.MQTTTopicSchema.upsert({
topic,
Expand Down Expand Up @@ -76,7 +76,13 @@ module.exports.init = async function (app) {
app.log.debug(`Error populating Team Broker Topic Cache ${err.toString()}`)
}

function removeTopicFromCache (topic, team) {
const teamId = app.db.models.Team.decodeHashid(team)
topicCache.delete(`${teamId[0]}#${topic.topic}`)
}

app.decorate('teamBroker', {
addUsedTopic
addUsedTopic,
removeTopicFromCache
})
}
29 changes: 25 additions & 4 deletions forge/ee/routes/teamBroker/3rdPartyBroker.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { LRUCache } = require('lru-cache')

module.exports = async function (app) {
app.addHook('preHandler', async (request, reply) => {
if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) {
Expand Down Expand Up @@ -545,6 +547,12 @@ module.exports = async function (app) {
reply.send(clean)
})

// set up topic cache
const topicCache = new LRUCache({
max: 5000,
ttl: 1000 * 60 * 30 // 30 min cache life
})

/**
* Store Topics from a 3rd Party Broker
* @name /api/v1/teams/:teamId/brokers/:brokerId/topics
Expand Down Expand Up @@ -626,10 +634,14 @@ module.exports = async function (app) {
topicObj.metadata = topicInfo.metadata
}
try {
await app.db.models.MQTTTopicSchema.upsert(topicObj, {
fields: ['inferredSchema', 'metadata'],
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
})
const cacheHit = topicCache.get(`${teamId}#${brokerId}#${topicInfo.topic}`)
if (!cacheHit) {
await app.db.models.MQTTTopicSchema.upsert(topicObj, {
fields: ['inferredSchema', 'metadata'],
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
})
topicCache.set(`${teamId}#${brokerId}#${topicInfo.topic}`, true)
}
} catch (err) {
// reply.status(500).send({ error: 'unknown_erorr', message: err.toString() })
// return
Expand Down Expand Up @@ -727,12 +739,21 @@ module.exports = async function (app) {
}
}, async (request, reply) => {
let brokerId = request.params.brokerId
let teamBroker = false
if (brokerId === 'team-broker') {
teamBroker = true
brokerId = app.settings.get('team:broker:creds')
}
const topic = await app.db.models.MQTTTopicSchema.get(request.params.teamId, brokerId, request.params.topicId)
if (topic) {
await topic.destroy()
if (teamBroker) {
app.teamBroker.removeTopicFromCache(topic, request.params.teamId)
} else {
const team = app.db.models.Team.decodeHashid(request.params.teamId)[0]
const broker = brokerId = app.db.models.BrokerCredentials.decodeHashid(request.params.brokerId)[0]
topicCache.delete(`${team}#${broker}#${topic.topic}`)
}
reply.status(201).send({})
} else {
reply.status(404).send({ code: 'not_found', error: 'not found' })
Expand Down
25 changes: 25 additions & 0 deletions test/unit/forge/comms/authRoutesV2_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ describe('Broker Auth v2 API', async function () {
})
})
describe('Team Broker Topic Update Cache', async function () {
async function sleep (seconds) {
return new Promise((resolve) => {
setTimeout(resolve, (1000 * 3))
})
}
it('should not update if multiple calls to the same topic', async function () {
const topic = 'update/topic/timestamp'
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
Expand All @@ -618,6 +623,26 @@ describe('Broker Auth v2 API', async function () {
})
secondTopic[0].updatedAt.toISOString().should.equal(firstTopic[0].updatedAt.toISOString())
})
it('should delete from cache', async function () {
const topic = 'update/topic/timestamp/2'
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
const firstTopic = await app.db.models.MQTTTopicSchema.findAll({
where: {
topic,
TeamId: TestObjects.ATeam.id
}
})
app.teamBroker.removeTopicFromCache(firstTopic[0], TestObjects.ATeam.hashid)
await sleep(3)
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
const secondTopic = await app.db.models.MQTTTopicSchema.findAll({
where: {
topic,
TeamId: TestObjects.ATeam.id
}
})
secondTopic[0].updatedAt.toISOString().should.not.equal(firstTopic[0].updatedAt.toISOString())
})
})
})
})
Expand Down
91 changes: 81 additions & 10 deletions test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ describe('3rd Party Broker API', function () {
cookies: { sid: TestObjects.tokens.bob },
body: [
{
topic: 'bar/baz/qux',
topic: 'bar/baz/qux/x',
metadata: { description: 'a topic' }
}
]
Expand All @@ -359,18 +359,21 @@ describe('3rd Party Broker API', function () {
})
response.statusCode.should.equal(200)
const result = response.json()
result.topics.should.have.a.lengthOf(3)
result.topics.should.have.a.lengthOf(4)
const topics = result.topics
topics.sort((A, B) => A.topic.localeCompare(B.topic))

topics[0].should.have.property('topic', 'bar/baz/qux')
topics[0].should.have.property('metadata', { description: 'a topic' })
topics[0].should.have.property('metadata', {})

topics[1].should.have.property('topic', 'foo/bar/baz')
topics[1].should.have.property('metadata', {})
topics[1].should.have.property('topic', 'bar/baz/qux/x')
topics[1].should.have.property('metadata', { description: 'a topic' })

topics[2].should.have.property('topic', 'foo/bar/baz/qux')
topics[2].should.have.property('topic', 'foo/bar/baz')
topics[2].should.have.property('metadata', {})

topics[3].should.have.property('topic', 'foo/bar/baz/qux')
topics[3].should.have.property('metadata', {})
})
it('Get Topics for 3rd Pary broker as a Team Owner', async function () {
const response = await app.inject({
Expand All @@ -380,7 +383,7 @@ describe('3rd Party Broker API', function () {
})
response.statusCode.should.equal(200)
const result = response.json()
result.topics.should.have.a.lengthOf(3)
result.topics.should.have.a.lengthOf(4)
})
it('Add Metadata to a Topic', async function () {
let response = await app.inject({
Expand All @@ -390,7 +393,7 @@ describe('3rd Party Broker API', function () {
})
response.statusCode.should.equal(200)
let result = response.json()
result.topics.should.have.a.lengthOf(3)
result.topics.should.have.a.lengthOf(4)
result.topics[0].should.have.property('id')
result.topics[0].should.have.property('topic')
const topicId = result.topics[0].id
Expand Down Expand Up @@ -420,7 +423,7 @@ describe('3rd Party Broker API', function () {
})
response.statusCode.should.equal(200)
let result = response.json()
result.topics.should.have.a.lengthOf(3)
result.topics.should.have.a.lengthOf(4)
result.topics[0].should.have.property('id')
result.topics[0].should.have.property('topic')
const topicId = result.topics[0].id
Expand All @@ -439,8 +442,76 @@ describe('3rd Party Broker API', function () {
})
response.statusCode.should.equal(200)
result = response.json()
result.count.should.equal(2)
result.count.should.equal(3)
})

it('Topic Cache', async function () {
const response = await app.inject({
method: 'POST',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
cookies: { sid: TestObjects.tokens.bob },
body: [
{
topic: 'bar/baz/qux',
metadata: { description: 'a topic' }
}
]
})
response.statusCode.should.equal(201)

const responseTopics = await app.inject({
method: 'GET',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
cookies: { sid: TestObjects.tokens.bob }
})
const result = responseTopics.json()

const topic = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id)
await app.inject({
method: 'POST',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
cookies: { sid: TestObjects.tokens.bob },
body: [
{
topic: 'bar/baz/qux',
metadata: { description: 'a topic' }
}
]
})

const topicSecond = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id)

topicSecond.updatedAt.toISOString().should.equal(topic.updatedAt.toISOString())
const response2 = await app.inject({
method: 'DELETE',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics/${result.topics[0].id}`,
cookies: { sid: TestObjects.tokens.bob }
})
response2.statusCode.should.equal(201)

await app.inject({
method: 'POST',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
cookies: { sid: TestObjects.tokens.bob },
body: [
{
topic: 'bar/baz/qux',
metadata: { description: 'a topic' }
}
]
})

const responseTopics2 = await app.inject({
method: 'GET',
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
cookies: { sid: TestObjects.tokens.bob }
})
const result2 = responseTopics2.json()

const topicThird = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result2.topics[0].id)
topicThird.updatedAt.toISOString().should.not.equal(topic.updatedAt.toISOString())
})

describe('Team Broker', function () {
before(async function () {
app.team2 = await app.factory.createTeam({ name: 'BTeam' })
Expand Down
Loading