Skip to content

Commit

Permalink
fix(publish): emit correct pmessage/messageBuffer/pmessageBuffer even…
Browse files Browse the repository at this point in the history
…ts (#1074)

fix #1072
fix #1073
  • Loading branch information
jedwards1211 authored Apr 29, 2021
1 parent a668afb commit 15d8d39
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/commands-utils/channel-subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ export function subscribeToChannel(instance, chan, channels, isPattern) {
}
// Pattern events include the channel, regular events do not, so we pass chan in directly
const listener = (message, channel) =>
emitMessage(instance, isPattern ? channel : chan, message);
emitMessage(
instance,
isPattern ? channel : chan,
message,
isPattern ? chan : undefined
);
channels.on(chan, listener);
channels.instanceListeners.get(chan).set(instance, listener);
}
Expand Down
17 changes: 15 additions & 2 deletions src/commands-utils/emitMessage.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
export default function emitMessage(redisMock, channel, message) {
export default function emitMessage(redisMock, channel, message, pattern) {
process.nextTick(() => {
redisMock.emit('message', channel, message);
if (pattern) {
redisMock.emit(
Buffer.isBuffer(message) ? 'pmessageBuffer' : 'pmessage',
pattern,
channel,
message
);
} else {
redisMock.emit(
Buffer.isBuffer(message) ? 'messageBuffer' : 'message',
channel,
message
);
}
});
}
4 changes: 2 additions & 2 deletions test/commands/psubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ describe('psubscribe', () => {
PromiseTwoFulfill = f;
});

redisOne.on('message', promiseOneFulfill);
redisTwo.on('message', PromiseTwoFulfill);
redisOne.on('pmessage', promiseOneFulfill);
redisTwo.on('pmessage', PromiseTwoFulfill);

redisOne.createConnectedClient().publish('first.test', 'blah');

Expand Down
30 changes: 29 additions & 1 deletion test/commands/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ describe('publish', () => {
redis2.publish('emails', 'clark@daily.planet');
});

it('should emit messageBuffer event when a Buffer message is published on a subscribed channel', (done) => {
const redisPubSub = new MockRedis();
const redis2 = redisPubSub.createConnectedClient();
const buffer = Buffer.alloc(8);
redisPubSub.on('messageBuffer', (channel, message) => {
expect(channel).toBe('emails');
expect(message).toBe(buffer);
done();
});
redisPubSub.subscribe('emails');
redis2.publish('emails', buffer);
});

it('should return 1 when publishing with a single pattern subscriber', () => {
const redisPubSub = new MockRedis();
const redis2 = redisPubSub.createConnectedClient();
Expand All @@ -41,12 +54,27 @@ describe('publish', () => {
it('should publish a message, which can be received by a previous psubscribe', (done) => {
const redisPubSub = new MockRedis();
const redis2 = redisPubSub.createConnectedClient();
redisPubSub.on('message', (channel, message) => {
redisPubSub.on('pmessage', (pattern, channel, message) => {
expect(pattern).toBe('emails.*');
expect(channel).toBe('emails.urgent');
expect(message).toBe('clark@daily.planet');
done();
});
redisPubSub.psubscribe('emails.*');
redis2.publish('emails.urgent', 'clark@daily.planet');
});

it('should emit a pmessageBuffer event when a Buffer message is published matching a psubscribed pattern', (done) => {
const redisPubSub = new MockRedis();
const redis2 = redisPubSub.createConnectedClient();
const buffer = Buffer.alloc(0);
redisPubSub.on('pmessageBuffer', (pattern, channel, message) => {
expect(pattern).toBe('emails.*');
expect(channel).toBe('emails.urgent');
expect(message).toBe(buffer);
done();
});
redisPubSub.psubscribe('emails.*');
redis2.publish('emails.urgent', buffer);
});
});
2 changes: 1 addition & 1 deletion test/commands/punsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('punsubscribe', () => {
promiseFulfill = f;
});

redisOne.on('message', promiseFulfill);
redisOne.on('pmessage', promiseFulfill);

redisOne.createConnectedClient().publish('first.test', 'TEST');

Expand Down

0 comments on commit 15d8d39

Please sign in to comment.