From 15d8d39306aca14347019ddb1abc91aaac1cf9c4 Mon Sep 17 00:00:00 2001 From: Andy Edwards Date: Thu, 29 Apr 2021 05:27:04 -0500 Subject: [PATCH] fix(publish): emit correct pmessage/messageBuffer/pmessageBuffer events (#1074) fix #1072 fix #1073 --- src/commands-utils/channel-subscription.js | 7 ++++- src/commands-utils/emitMessage.js | 17 ++++++++++-- test/commands/psubscribe.js | 4 +-- test/commands/publish.js | 30 +++++++++++++++++++++- test/commands/punsubscribe.js | 2 +- 5 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/commands-utils/channel-subscription.js b/src/commands-utils/channel-subscription.js index eea1929ae..fa232ee4f 100644 --- a/src/commands-utils/channel-subscription.js +++ b/src/commands-utils/channel-subscription.js @@ -24,7 +24,12 @@ export function subscribeToChannel(instance, chan, channels, isPattern) { } // Pattern events include the channel, regular events do not, so we pass chan in directly const listener = (message, channel) => - emitMessage(instance, isPattern ? channel : chan, message); + emitMessage( + instance, + isPattern ? channel : chan, + message, + isPattern ? chan : undefined + ); channels.on(chan, listener); channels.instanceListeners.get(chan).set(instance, listener); } diff --git a/src/commands-utils/emitMessage.js b/src/commands-utils/emitMessage.js index c7a282813..5e34ddbdc 100644 --- a/src/commands-utils/emitMessage.js +++ b/src/commands-utils/emitMessage.js @@ -1,5 +1,18 @@ -export default function emitMessage(redisMock, channel, message) { +export default function emitMessage(redisMock, channel, message, pattern) { process.nextTick(() => { - redisMock.emit('message', channel, message); + if (pattern) { + redisMock.emit( + Buffer.isBuffer(message) ? 'pmessageBuffer' : 'pmessage', + pattern, + channel, + message + ); + } else { + redisMock.emit( + Buffer.isBuffer(message) ? 'messageBuffer' : 'message', + channel, + message + ); + } }); } diff --git a/test/commands/psubscribe.js b/test/commands/psubscribe.js index f9da89d34..ed5c5680c 100644 --- a/test/commands/psubscribe.js +++ b/test/commands/psubscribe.js @@ -63,8 +63,8 @@ describe('psubscribe', () => { PromiseTwoFulfill = f; }); - redisOne.on('message', promiseOneFulfill); - redisTwo.on('message', PromiseTwoFulfill); + redisOne.on('pmessage', promiseOneFulfill); + redisTwo.on('pmessage', PromiseTwoFulfill); redisOne.createConnectedClient().publish('first.test', 'blah'); diff --git a/test/commands/publish.js b/test/commands/publish.js index f69522217..a856af391 100644 --- a/test/commands/publish.js +++ b/test/commands/publish.js @@ -29,6 +29,19 @@ describe('publish', () => { redis2.publish('emails', 'clark@daily.planet'); }); + it('should emit messageBuffer event when a Buffer message is published on a subscribed channel', (done) => { + const redisPubSub = new MockRedis(); + const redis2 = redisPubSub.createConnectedClient(); + const buffer = Buffer.alloc(8); + redisPubSub.on('messageBuffer', (channel, message) => { + expect(channel).toBe('emails'); + expect(message).toBe(buffer); + done(); + }); + redisPubSub.subscribe('emails'); + redis2.publish('emails', buffer); + }); + it('should return 1 when publishing with a single pattern subscriber', () => { const redisPubSub = new MockRedis(); const redis2 = redisPubSub.createConnectedClient(); @@ -41,7 +54,8 @@ describe('publish', () => { it('should publish a message, which can be received by a previous psubscribe', (done) => { const redisPubSub = new MockRedis(); const redis2 = redisPubSub.createConnectedClient(); - redisPubSub.on('message', (channel, message) => { + redisPubSub.on('pmessage', (pattern, channel, message) => { + expect(pattern).toBe('emails.*'); expect(channel).toBe('emails.urgent'); expect(message).toBe('clark@daily.planet'); done(); @@ -49,4 +63,18 @@ describe('publish', () => { redisPubSub.psubscribe('emails.*'); redis2.publish('emails.urgent', 'clark@daily.planet'); }); + + it('should emit a pmessageBuffer event when a Buffer message is published matching a psubscribed pattern', (done) => { + const redisPubSub = new MockRedis(); + const redis2 = redisPubSub.createConnectedClient(); + const buffer = Buffer.alloc(0); + redisPubSub.on('pmessageBuffer', (pattern, channel, message) => { + expect(pattern).toBe('emails.*'); + expect(channel).toBe('emails.urgent'); + expect(message).toBe(buffer); + done(); + }); + redisPubSub.psubscribe('emails.*'); + redis2.publish('emails.urgent', buffer); + }); }); diff --git a/test/commands/punsubscribe.js b/test/commands/punsubscribe.js index a1cddb70c..508412a84 100644 --- a/test/commands/punsubscribe.js +++ b/test/commands/punsubscribe.js @@ -54,7 +54,7 @@ describe('punsubscribe', () => { promiseFulfill = f; }); - redisOne.on('message', promiseFulfill); + redisOne.on('pmessage', promiseFulfill); redisOne.createConnectedClient().publish('first.test', 'TEST');