diff --git a/forge/ee/lib/teamBroker/index.js b/forge/ee/lib/teamBroker/index.js index e29f5ee9f4..dc8e82dea9 100644 --- a/forge/ee/lib/teamBroker/index.js +++ b/forge/ee/lib/teamBroker/index.js @@ -41,7 +41,7 @@ module.exports.init = async function (app) { */ async function addUsedTopic (topic, team) { const teamId = app.db.models.Team.decodeHashid(team) - const cacheHit = topicCache.get(`${teamId}#${topic}`) + const cacheHit = topicCache.get(`${teamId[0]}#${topic}`) if (!cacheHit) { await app.db.models.MQTTTopicSchema.upsert({ topic, @@ -76,7 +76,13 @@ module.exports.init = async function (app) { app.log.debug(`Error populating Team Broker Topic Cache ${err.toString()}`) } + function removeTopicFromCache (topic, team) { + const teamId = app.db.models.Team.decodeHashid(team) + topicCache.delete(`${teamId[0]}#${topic.topic}`) + } + app.decorate('teamBroker', { - addUsedTopic + addUsedTopic, + removeTopicFromCache }) } diff --git a/forge/ee/routes/teamBroker/3rdPartyBroker.js b/forge/ee/routes/teamBroker/3rdPartyBroker.js index 6d94b9d7f2..c5ce11dfe1 100644 --- a/forge/ee/routes/teamBroker/3rdPartyBroker.js +++ b/forge/ee/routes/teamBroker/3rdPartyBroker.js @@ -1,3 +1,5 @@ +const { LRUCache } = require('lru-cache') + module.exports = async function (app) { app.addHook('preHandler', async (request, reply) => { if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) { @@ -545,6 +547,12 @@ module.exports = async function (app) { reply.send(clean) }) + // set up topic cache + const topicCache = new LRUCache({ + max: 5000, + ttl: 1000 * 60 * 30 // 30 min cache life + }) + /** * Store Topics from a 3rd Party Broker * @name /api/v1/teams/:teamId/brokers/:brokerId/topics @@ -626,10 +634,14 @@ module.exports = async function (app) { topicObj.metadata = topicInfo.metadata } try { - await app.db.models.MQTTTopicSchema.upsert(topicObj, { - fields: ['inferredSchema', 'metadata'], - conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId'] - }) + const cacheHit = topicCache.get(`${teamId}#${brokerId}#${topicInfo.topic}`) + if (!cacheHit) { + await app.db.models.MQTTTopicSchema.upsert(topicObj, { + fields: ['inferredSchema', 'metadata'], + conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId'] + }) + topicCache.set(`${teamId}#${brokerId}#${topicInfo.topic}`, true) + } } catch (err) { // reply.status(500).send({ error: 'unknown_erorr', message: err.toString() }) // return @@ -727,12 +739,21 @@ module.exports = async function (app) { } }, async (request, reply) => { let brokerId = request.params.brokerId + let teamBroker = false if (brokerId === 'team-broker') { + teamBroker = true brokerId = app.settings.get('team:broker:creds') } const topic = await app.db.models.MQTTTopicSchema.get(request.params.teamId, brokerId, request.params.topicId) if (topic) { await topic.destroy() + if (teamBroker) { + app.teamBroker.removeTopicFromCache(topic, request.params.teamId) + } else { + const team = app.db.models.Team.decodeHashid(request.params.teamId)[0] + const broker = brokerId = app.db.models.BrokerCredentials.decodeHashid(request.params.brokerId)[0] + topicCache.delete(`${team}#${broker}#${topic.topic}`) + } reply.status(201).send({}) } else { reply.status(404).send({ code: 'not_found', error: 'not found' }) diff --git a/test/unit/forge/comms/authRoutesV2_spec.js b/test/unit/forge/comms/authRoutesV2_spec.js index c21ff6f44d..2b727617eb 100644 --- a/test/unit/forge/comms/authRoutesV2_spec.js +++ b/test/unit/forge/comms/authRoutesV2_spec.js @@ -600,6 +600,11 @@ describe('Broker Auth v2 API', async function () { }) }) describe('Team Broker Topic Update Cache', async function () { + async function sleep (seconds) { + return new Promise((resolve) => { + setTimeout(resolve, (1000 * 3)) + }) + } it('should not update if multiple calls to the same topic', async function () { const topic = 'update/topic/timestamp' await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid) @@ -618,6 +623,26 @@ describe('Broker Auth v2 API', async function () { }) secondTopic[0].updatedAt.toISOString().should.equal(firstTopic[0].updatedAt.toISOString()) }) + it('should delete from cache', async function () { + const topic = 'update/topic/timestamp/2' + await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid) + const firstTopic = await app.db.models.MQTTTopicSchema.findAll({ + where: { + topic, + TeamId: TestObjects.ATeam.id + } + }) + app.teamBroker.removeTopicFromCache(firstTopic[0], TestObjects.ATeam.hashid) + await sleep(3) + await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid) + const secondTopic = await app.db.models.MQTTTopicSchema.findAll({ + where: { + topic, + TeamId: TestObjects.ATeam.id + } + }) + secondTopic[0].updatedAt.toISOString().should.not.equal(firstTopic[0].updatedAt.toISOString()) + }) }) }) }) diff --git a/test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js b/test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js index a2874ddfd5..02d690a3b3 100644 --- a/test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js +++ b/test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js @@ -344,7 +344,7 @@ describe('3rd Party Broker API', function () { cookies: { sid: TestObjects.tokens.bob }, body: [ { - topic: 'bar/baz/qux', + topic: 'bar/baz/qux/x', metadata: { description: 'a topic' } } ] @@ -359,18 +359,21 @@ describe('3rd Party Broker API', function () { }) response.statusCode.should.equal(200) const result = response.json() - result.topics.should.have.a.lengthOf(3) + result.topics.should.have.a.lengthOf(4) const topics = result.topics topics.sort((A, B) => A.topic.localeCompare(B.topic)) topics[0].should.have.property('topic', 'bar/baz/qux') - topics[0].should.have.property('metadata', { description: 'a topic' }) + topics[0].should.have.property('metadata', {}) - topics[1].should.have.property('topic', 'foo/bar/baz') - topics[1].should.have.property('metadata', {}) + topics[1].should.have.property('topic', 'bar/baz/qux/x') + topics[1].should.have.property('metadata', { description: 'a topic' }) - topics[2].should.have.property('topic', 'foo/bar/baz/qux') + topics[2].should.have.property('topic', 'foo/bar/baz') topics[2].should.have.property('metadata', {}) + + topics[3].should.have.property('topic', 'foo/bar/baz/qux') + topics[3].should.have.property('metadata', {}) }) it('Get Topics for 3rd Pary broker as a Team Owner', async function () { const response = await app.inject({ @@ -380,7 +383,7 @@ describe('3rd Party Broker API', function () { }) response.statusCode.should.equal(200) const result = response.json() - result.topics.should.have.a.lengthOf(3) + result.topics.should.have.a.lengthOf(4) }) it('Add Metadata to a Topic', async function () { let response = await app.inject({ @@ -390,7 +393,7 @@ describe('3rd Party Broker API', function () { }) response.statusCode.should.equal(200) let result = response.json() - result.topics.should.have.a.lengthOf(3) + result.topics.should.have.a.lengthOf(4) result.topics[0].should.have.property('id') result.topics[0].should.have.property('topic') const topicId = result.topics[0].id @@ -420,7 +423,7 @@ describe('3rd Party Broker API', function () { }) response.statusCode.should.equal(200) let result = response.json() - result.topics.should.have.a.lengthOf(3) + result.topics.should.have.a.lengthOf(4) result.topics[0].should.have.property('id') result.topics[0].should.have.property('topic') const topicId = result.topics[0].id @@ -439,8 +442,76 @@ describe('3rd Party Broker API', function () { }) response.statusCode.should.equal(200) result = response.json() - result.count.should.equal(2) + result.count.should.equal(3) }) + + it('Topic Cache', async function () { + const response = await app.inject({ + method: 'POST', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`, + cookies: { sid: TestObjects.tokens.bob }, + body: [ + { + topic: 'bar/baz/qux', + metadata: { description: 'a topic' } + } + ] + }) + response.statusCode.should.equal(201) + + const responseTopics = await app.inject({ + method: 'GET', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`, + cookies: { sid: TestObjects.tokens.bob } + }) + const result = responseTopics.json() + + const topic = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id) + await app.inject({ + method: 'POST', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`, + cookies: { sid: TestObjects.tokens.bob }, + body: [ + { + topic: 'bar/baz/qux', + metadata: { description: 'a topic' } + } + ] + }) + + const topicSecond = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id) + + topicSecond.updatedAt.toISOString().should.equal(topic.updatedAt.toISOString()) + const response2 = await app.inject({ + method: 'DELETE', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics/${result.topics[0].id}`, + cookies: { sid: TestObjects.tokens.bob } + }) + response2.statusCode.should.equal(201) + + await app.inject({ + method: 'POST', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`, + cookies: { sid: TestObjects.tokens.bob }, + body: [ + { + topic: 'bar/baz/qux', + metadata: { description: 'a topic' } + } + ] + }) + + const responseTopics2 = await app.inject({ + method: 'GET', + url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`, + cookies: { sid: TestObjects.tokens.bob } + }) + const result2 = responseTopics2.json() + + const topicThird = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result2.topics[0].id) + topicThird.updatedAt.toISOString().should.not.equal(topic.updatedAt.toISOString()) + }) + describe('Team Broker', function () { before(async function () { app.team2 = await app.factory.createTeam({ name: 'BTeam' })