|
| 1 | +/* eslint-disable no-param-reassign, no-nested-ternary, no-undef */ |
| 2 | +const grpc = require('@grpc/grpc-js'); |
| 3 | +const protoLoader = require('@grpc/proto-loader'); |
| 4 | +const certifi = require('certifi'); |
| 5 | +const avro = require('avro-js'); |
| 6 | +const { EventEmitter } = require('events'); |
| 7 | +const fs = require('fs').promises; |
| 8 | + |
| 9 | +const DEFAULT_PUBSUB_API_ENDPOINT = 'api.pubsub.salesforce.com:7443'; |
| 10 | +const CUSTOM_LONG_AVRO_TYPE = avro.types.LongType.using({ |
| 11 | + fromBuffer: (buf) => { |
| 12 | + const big = buf.readBigInt64LE(); |
| 13 | + if (big < Number.MIN_SAFE_INTEGER || big > Number.MAX_SAFE_INTEGER) { |
| 14 | + return big; |
| 15 | + } |
| 16 | + return Number(BigInt.asIntN(64, big)); |
| 17 | + }, |
| 18 | + toBuffer: (n) => { |
| 19 | + const buf = Buffer.allocUnsafe(8); |
| 20 | + if (n instanceof BigInt) { |
| 21 | + buf.writeBigInt64LE(n); |
| 22 | + } else { |
| 23 | + buf.writeBigInt64LE(BigInt(n)); |
| 24 | + } |
| 25 | + return buf; |
| 26 | + }, |
| 27 | + fromJSON: BigInt, |
| 28 | + toJSON: Number, |
| 29 | + isValid: (n) => { |
| 30 | + const type = typeof n; |
| 31 | + return (type === 'number' && n % 1 === 0) || type === 'bigint'; |
| 32 | + }, |
| 33 | + compare: (n1, n2) => (n1 === n2 ? 0 : n1 < n2 ? -1 : 1), |
| 34 | +}); |
| 35 | + |
| 36 | +class PubSubClient { |
| 37 | + logger; |
| 38 | + |
| 39 | + cfg; |
| 40 | + |
| 41 | + topicName; |
| 42 | + |
| 43 | + client; |
| 44 | + |
| 45 | + schema; |
| 46 | + |
| 47 | + lastKeepAlive; |
| 48 | + |
| 49 | + subscription; |
| 50 | + |
| 51 | + eventEmitter; |
| 52 | + |
| 53 | + constructor(logger, cfg) { |
| 54 | + this.logger = logger; |
| 55 | + this.cfg = cfg; |
| 56 | + this.topicName = `/event/${this.cfg.object}`; |
| 57 | + } |
| 58 | + |
| 59 | + async connect(accessToken, instanceUrl) { |
| 60 | + const rootCert = await fs.readFile(certifi); |
| 61 | + const tenantId = accessToken.split('!')[0]; |
| 62 | + const packageDefinition = await protoLoader.load(`${__dirname}/helpers/pubsub_api.proto`); |
| 63 | + const grpcObj = grpc.loadPackageDefinition(packageDefinition); |
| 64 | + const sfdcPackage = grpcObj.eventbus.v1; |
| 65 | + const metaCallback = (_params, callback) => { |
| 66 | + const meta = new grpc.Metadata(); |
| 67 | + meta.add('accesstoken', accessToken); |
| 68 | + meta.add('instanceurl', instanceUrl); |
| 69 | + meta.add('tenantid', tenantId); |
| 70 | + callback(null, meta); |
| 71 | + }; |
| 72 | + const callCreds = grpc.credentials.createFromMetadataGenerator(metaCallback); |
| 73 | + const combCreds = grpc.credentials.combineChannelCredentials(grpc.credentials.createSsl(rootCert), callCreds); |
| 74 | + this.client = new sfdcPackage.PubSub( |
| 75 | + this.cfg.pubsubEndpoint || DEFAULT_PUBSUB_API_ENDPOINT, |
| 76 | + combCreds, |
| 77 | + ); |
| 78 | + this.logger.info('Connected to Pub/Sub API endpoint'); |
| 79 | + try { |
| 80 | + await this.loadSchema(); |
| 81 | + this.logger.info('Schema loaded'); |
| 82 | + } catch (err) { |
| 83 | + throw new Error(`Failed to load schema: ${err.message}`); |
| 84 | + } |
| 85 | + } |
| 86 | + |
| 87 | + setLogger(logger) { this.logger = logger; } |
| 88 | + |
| 89 | + async loadSchema() { |
| 90 | + if (this.schema) return; |
| 91 | + this.schema = await new Promise((resolve, reject) => { |
| 92 | + this.client.GetTopic({ topicName: this.topicName }, (topicError, response) => { |
| 93 | + if (topicError) { |
| 94 | + reject(topicError); |
| 95 | + } else { |
| 96 | + const { schemaId } = response; |
| 97 | + this.client.GetSchema({ schemaId }, (schemaError, res) => { |
| 98 | + if (schemaError) { |
| 99 | + reject(schemaError); |
| 100 | + } else { |
| 101 | + const schemaType = avro.parse(res.schemaJson, { registry: { long: CUSTOM_LONG_AVRO_TYPE } }); |
| 102 | + resolve({ |
| 103 | + id: schemaId, |
| 104 | + type: schemaType, |
| 105 | + }); |
| 106 | + } |
| 107 | + }); |
| 108 | + } |
| 109 | + }); |
| 110 | + }); |
| 111 | + } |
| 112 | + |
| 113 | + flattenSinglePropertyObjects(theObject) { |
| 114 | + Object.entries(theObject).forEach(([key, value]) => { |
| 115 | + if (key !== 'ChangeEventHeader' && value && typeof value === 'object') { |
| 116 | + const subKeys = Object.keys(value); |
| 117 | + if (subKeys.length === 1) { |
| 118 | + const subValue = value[subKeys[0]]; |
| 119 | + theObject[key] = subValue; |
| 120 | + if (subValue && typeof subValue === 'object') { |
| 121 | + this.flattenSinglePropertyObjects(theObject[key]); |
| 122 | + } |
| 123 | + } |
| 124 | + } |
| 125 | + }); |
| 126 | + } |
| 127 | + |
| 128 | + async subscribe(numRequested, fromReplayId) { |
| 129 | + const eventEmitter = new EventEmitter(); |
| 130 | + this.subscription = this.client.Subscribe(); |
| 131 | + const writeOptions = { topicName: this.topicName, numRequested }; |
| 132 | + |
| 133 | + if (fromReplayId) { |
| 134 | + const buf = Buffer.allocUnsafe(8); |
| 135 | + buf.writeBigUInt64BE(BigInt(fromReplayId), 0); |
| 136 | + writeOptions.replayPreset = 2; |
| 137 | + writeOptions.replayId = buf; |
| 138 | + } |
| 139 | + this.logger.info(`Requesting first ${numRequested} records`); |
| 140 | + this.subscription.write(writeOptions); |
| 141 | + |
| 142 | + this.subscription.on('data', (data) => { |
| 143 | + const latestReplayId = Number(data.latestReplayId.readBigUInt64BE()); |
| 144 | + if (data.events) { |
| 145 | + for (const event of data.events) { |
| 146 | + const replayId = Number(event.replayId.readBigUInt64BE()); |
| 147 | + const payload = this.schema.type.fromBuffer(event.event.payload); |
| 148 | + this.flattenSinglePropertyObjects(payload); |
| 149 | + eventEmitter.emit('data', { event: { replayId }, payload }); |
| 150 | + } |
| 151 | + if (!data.pendingNumRequested) { |
| 152 | + this.logger.info(`requesting ${numRequested} more records`); |
| 153 | + this.subscription.write({ topicName: this.topicName, numRequested }); |
| 154 | + } |
| 155 | + } else { |
| 156 | + this.logger.debug(`Received keepalive message. Latest replay ID: ${latestReplayId}`); |
| 157 | + data.latestReplayId = latestReplayId; |
| 158 | + this.lastKeepAlive = new Date().getTime(); |
| 159 | + eventEmitter.emit('keepalive', data); |
| 160 | + } |
| 161 | + }); |
| 162 | + this.subscription.on('end', () => { |
| 163 | + this.logger.warn('gRPC stream ended'); |
| 164 | + eventEmitter.emit('end'); |
| 165 | + }); |
| 166 | + this.subscription.on('error', (error) => { |
| 167 | + this.logger.error(`gRPC stream error: ${JSON.stringify(error)}`); |
| 168 | + eventEmitter.emit('error', error); |
| 169 | + }); |
| 170 | + this.subscription.on('status', (status) => { |
| 171 | + this.logger.warn(`gRPC stream status: ${JSON.stringify(status)}`); |
| 172 | + eventEmitter.emit('status', status); |
| 173 | + }); |
| 174 | + this.eventEmitter = eventEmitter; |
| 175 | + |
| 176 | + return this.eventEmitter; |
| 177 | + } |
| 178 | + |
| 179 | + getLastKeepAlive() { |
| 180 | + return this.lastKeepAlive; |
| 181 | + } |
| 182 | + |
| 183 | + close() { |
| 184 | + try { |
| 185 | + this.eventEmitter.removeAllListeners(); |
| 186 | + this.eventEmitter = null; |
| 187 | + this.subscription.removeAllListeners(); |
| 188 | + this.subscription = null; |
| 189 | + this.client.close(); |
| 190 | + this.client = null; |
| 191 | + // eslint-disable-next-line no-empty |
| 192 | + } catch (_e) {} |
| 193 | + } |
| 194 | +} |
| 195 | + |
| 196 | +module.exports.PubSubClient = PubSubClient; |
0 commit comments