From 411f50e07b627fd6da8fe3a31d46b29a6c7c21fd Mon Sep 17 00:00:00 2001 From: "charles.chang" Date: Mon, 19 Feb 2024 14:28:23 +0800 Subject: [PATCH 1/2] check for thread interrupt in process --- .../redis/clients/jedis/JedisPubSubBase.java | 2 +- .../clients/jedis/JedisShardedPubSubBase.java | 2 +- .../clients/jedis/JedisPubSubBaseTest.java | 58 +++++++++++++++++++ .../jedis/JedisShardedPubSubBaseTest.java | 56 ++++++++++++++++++ 4 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java create mode 100644 src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java diff --git a/src/main/java/redis/clients/jedis/JedisPubSubBase.java b/src/main/java/redis/clients/jedis/JedisPubSubBase.java index 7092680e33..552310e4de 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisPubSubBase.java @@ -172,7 +172,7 @@ private void process() { } else { throw new JedisException("Unknown message type: " + reply); } - } while (isSubscribed()); + } while (!Thread.currentThread().isInterrupted() && isSubscribed()); // /* Invalidate instance since this thread is no longer listening */ // this.client = null; diff --git a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java index f0a251f61f..2b2ce944fe 100644 --- a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java @@ -99,7 +99,7 @@ private void process() { } else { throw new JedisException("Unknown message type: " + reply); } - } while (isSubscribed()); + } while (!Thread.currentThread().isInterrupted() && isSubscribed()); // /* Invalidate instance since this thread is no longer listening */ // this.client = null; diff --git a/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java b/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java new file mode 100644 index 0000000000..75409979b1 --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java @@ -0,0 +1,58 @@ +package redis.clients.jedis; + +import junit.framework.TestCase; +import redis.clients.jedis.util.SafeEncoder; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static redis.clients.jedis.Protocol.ResponseKeyword.*; + +public class JedisPubSubBaseTest extends TestCase { + + public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException { + // setup + final JedisPubSubBase pubSub = new JedisPubSubBase() { + + @Override + public void onMessage(String channel, String message) { + fail("this should not happen when thread is interrupted"); + } + + @Override + protected String encode(byte[] raw) { + return SafeEncoder.encode(raw); + } + }; + + final Connection mockConnection = mock(Connection.class); + final List mockSubscribe = Arrays.asList( + SUBSCRIBE.getRaw(), "channel".getBytes(), 1L + ); + final List mockResponse = Arrays.asList( + MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes() + ); + + when(mockConnection.getUnflushedObject()). + + thenReturn(mockSubscribe, mockResponse); + + + final CountDownLatch countDownLatch = new CountDownLatch(1); + // action + final Thread thread = new Thread(() -> { + Thread.currentThread().interrupt(); + pubSub.proceed(mockConnection, "channel"); + + countDownLatch.countDown(); + }); + thread.start(); + + assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); + + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java b/src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java new file mode 100644 index 0000000000..fb1ecdd87a --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java @@ -0,0 +1,56 @@ +package redis.clients.jedis; + +import junit.framework.TestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE; +import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE; + +public class JedisShardedPubSubBaseTest extends TestCase { + + public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException { + // setup + final JedisShardedPubSubBase pubSub = new JedisShardedPubSubBase() { + + @Override + public void onSMessage(String channel, String message) { + fail("this should not happen when thread is interrupted"); + } + + @Override + protected String encode(byte[] raw) { + return new String(raw); + } + + }; + + final Connection mockConnection = mock(Connection.class); + final List mockSubscribe = Arrays.asList( + SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L + ); + final List mockResponse = Arrays.asList( + SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes() + ); + when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse); + + + final CountDownLatch countDownLatch = new CountDownLatch(1); + // action + final Thread thread = new Thread(() -> { + Thread.currentThread().interrupt(); + pubSub.proceed(mockConnection, "channel"); + + countDownLatch.countDown(); + }); + thread.start(); + + assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); + + } +} \ No newline at end of file From a6861add29e396dca032d4a64998415b03ce727d Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 19 Feb 2024 16:03:12 +0600 Subject: [PATCH 2/2] Update JedisPubSubBaseTest.java --- src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java b/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java index 75409979b1..a7910bd6a3 100644 --- a/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java +++ b/src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java @@ -10,7 +10,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static redis.clients.jedis.Protocol.ResponseKeyword.*; +import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE; +import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE; public class JedisPubSubBaseTest extends TestCase { @@ -55,4 +56,4 @@ protected String encode(byte[] raw) { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } -} \ No newline at end of file +}