Skip to content

Commit 938fc71

Browse files
authored
Merge pull request #582 from nicomoya123/master
Fix: buffer messages from external resource
2 parents 1775b7c + 8be58eb commit 938fc71

File tree

2 files changed

+40
-5
lines changed

2 files changed

+40
-5
lines changed

src/redis-pubsub.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ export class RedisPubSub implements PubSubEngine {
8484
}
8585

8686
public async publish<T>(trigger: string, payload: T): Promise<void> {
87-
await this.redisPublisher.publish(trigger, this.serializer ? this.serializer(payload) : JSON.stringify(payload));
87+
if(this.serializer) {
88+
await this.redisPublisher.publish(trigger, this.serializer(payload));
89+
} else if (payload instanceof Buffer){
90+
await this.redisPublisher.publish(trigger, payload);
91+
} else {
92+
await this.redisPublisher.publish(trigger, JSON.stringify(payload));
93+
}
8894
}
8995

9096
public subscribe<T = any>(
@@ -169,17 +175,23 @@ export class RedisPubSub implements PubSubEngine {
169175
private readonly subsRefsMap: Map<string, Set<number>>;
170176
private currentSubscriptionId: number;
171177

172-
private onMessage(pattern: string, channel: string, message: string) {
178+
private onMessage(pattern: string, channel: string | Buffer, message: string | Buffer) {
179+
if(typeof channel === 'object') channel = channel.toString('utf8');
180+
173181
const subscribers = this.subsRefsMap.get(pattern || channel);
174182

175183
// Don't work for nothing..
176184
if (!subscribers?.size) return;
177185

178186
let parsedMessage;
179187
try {
180-
parsedMessage = this.deserializer
181-
? this.deserializer(message, { pattern, channel })
182-
: JSON.parse(message, this.reviver);
188+
if(this.deserializer){
189+
parsedMessage = this.deserializer(Buffer.from(message), { pattern, channel })
190+
} else if(typeof message === 'string'){
191+
parsedMessage = JSON.parse(message, this.reviver);
192+
} else {
193+
parsedMessage = message;
194+
}
183195
} catch (e) {
184196
parsedMessage = message;
185197
}

src/test/integration-tests.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,29 @@ describe('PubSubAsyncIterator', function() {
123123
}));
124124
});
125125

126+
describe('Subscribe to buffer', () => {
127+
it('can publish buffers as well' , done => {
128+
// when using messageBuffer, with redis instance the channel name is not a string but a buffer
129+
const pubSub = new RedisPubSub({ messageEventName: 'messageBuffer'});
130+
const payload = 'This is amazing';
131+
pubSub.subscribe('Posts', message => {
132+
try {
133+
expect(message).to.be.instanceOf(Buffer);
134+
expect(message.toString('utf-8')).to.be.equal(payload);
135+
done();
136+
} catch (e) {
137+
done(e);
138+
}
139+
}).then(async subId => {
140+
try {
141+
await pubSub.publish('Posts', Buffer.from(payload, 'utf-8'));
142+
pubSub.unsubscribe(subId);
143+
} catch (e) {
144+
done(e);
145+
}
146+
});
147+
});
148+
})
126149

127150
describe('PubSubCluster', () => {
128151
const nodes = [7006, 7001, 7002, 7003, 7004, 7005].map(port => ({ host: '127.0.0.1', port }));

0 commit comments

Comments
 (0)