From 42c7fe81b7f9834e2b4fcc16109ca08042938e92 Mon Sep 17 00:00:00 2001 From: Ivan Babanin Date: Tue, 11 Jul 2023 02:07:18 +0500 Subject: [PATCH 1/5] Replace `synchronized` with `j.u.c.l.ReentrantLock` for Loom --- .../redis/clients/jedis/CommandObjects.java | 31 +++++++------ .../clients/jedis/JedisSentinelPool.java | 10 ++++- .../jedis/graph/GraphCommandObjects.java | 16 ++++--- .../jedis/mcf/CircuitBreakerFailoverBase.java | 45 +++++++++++-------- .../MultiClusterPooledConnectionProvider.java | 19 ++++++-- .../SentineledConnectionProvider.java | 10 ++++- 6 files changed, 86 insertions(+), 45 deletions(-) diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index cbf0e19723..c0c9014a5b 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -5,6 +5,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; import org.json.JSONArray; @@ -50,6 +52,7 @@ protected RedisProtocol getProtocol() { return protocol; } + private Lock mapperLock = new ReentrantLock(true); private volatile JsonObjectMapper jsonObjectMapper; private final AtomicInteger searchDialect = new AtomicInteger(0); @@ -4424,25 +4427,25 @@ public final CommandObject tFunctionCallAsync(String library, String fun /** * Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with * default implementation will be created and returned. - *

This process of checking whether or not - * the instance reference exists follows 'double-checked lock optimization' approach to reduce the overhead of - * acquiring a lock by testing the lock criteria (the "lock hint") before acquiring the lock.

+ * * @return the JsonObjectMapper instance reference * @see DefaultGsonObjectMapper */ private JsonObjectMapper getJsonObjectMapper() { - JsonObjectMapper localRef = this.jsonObjectMapper; - if (Objects.isNull(localRef)) { - synchronized (this) { - localRef = this.jsonObjectMapper; - if (Objects.isNull(localRef)) { - this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper(); - } + JsonObjectMapper localRef = this.jsonObjectMapper; + if (Objects.isNull(localRef)) { + mapperLock.lock(); + + try { + localRef = this.jsonObjectMapper; + if (Objects.isNull(localRef)) { + this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper(); + } + } finally { + mapperLock.unlock(); + } } - } - return localRef; + return localRef; } public void setJsonObjectMapper(JsonObjectMapper jsonObjectMapper) { diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 586750540c..f6a9ea705d 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -6,6 +6,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool { private volatile HostAndPort currentHostMaster; - private final Object initPoolLock = new Object(); + private final Lock initPoolLock = new ReentrantLock(true); public JedisSentinelPool(String masterName, Set sentinels, final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) { @@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() { } private void initMaster(HostAndPort master) { - synchronized (initPoolLock) { + initPoolLock.lock(); + + try { if (!master.equals(currentHostMaster)) { currentHostMaster = master; factory.setHostAndPort(currentHostMaster); @@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) { LOG.info("Created JedisSentinelPool to master at {}", master); } + } finally { + initPoolLock.unlock(); } } diff --git a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java index a9a49c9081..ec778da0be 100644 --- a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java +++ b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import redis.clients.jedis.Builder; @@ -106,9 +108,7 @@ private Builder getBuilder(String graphName) { } private void createBuilder(String graphName) { - synchronized (builders) { - builders.putIfAbsent(graphName, new ResultSetBuilder(new GraphCacheImpl(graphName))); - } + builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey))); } private class GraphCacheImpl implements GraphCache { @@ -144,6 +144,8 @@ private class GraphCacheList { private final String name; private final String query; private final List data = new CopyOnWriteArrayList<>(); + + private final Lock dataLock = new ReentrantLock(true); /** * @@ -164,14 +166,18 @@ public GraphCacheList(String name, String procedure) { */ public String getCachedData(int index) { if (index >= data.size()) { - synchronized (data) { + dataLock.lock(); + + try { if (index >= data.size()) { getProcedureInfo(); } + } finally { + dataLock.unlock(); } } + return data.get(index); - } /** diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index 4ef383e649..c192867ff3 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -6,6 +6,9 @@ import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.util.IOUtils; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * @author Allen Terleto (aterleto) *

@@ -17,6 +20,7 @@ */ @Experimental public class CircuitBreakerFailoverBase implements AutoCloseable { + private final Lock lock = new ReentrantLock(true); protected final MultiClusterPooledConnectionProvider provider; @@ -32,28 +36,33 @@ public void close() { /** * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios */ - protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) { - - // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent - if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { + protected void clusterFailover(CircuitBreaker circuitBreaker) { + lock.lock(); + + try { + // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent + if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { - // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. - // To recover/transition from this forced state the user will need to manually failback - circuitBreaker.transitionToForcedOpenState(); + // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. + // To recover/transition from this forced state the user will need to manually failback + circuitBreaker.transitionToForcedOpenState(); - // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() - // to use the next cluster's connection pool - according to the configuration's prioritization/order - int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); + // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() + // to use the next cluster's connection pool - according to the configuration's prioritization/order + int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); - // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging - provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); - } + // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging + provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); + } - // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail - else if (provider.isLastClusterCircuitBreakerForcedOpen()) { - throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + - "provided with an additional cluster/database endpoint according to its prioritized sequence. " + - "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail + else if (provider.isLastClusterCircuitBreakerForcedOpen()) { + throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + + "provided with an additional cluster/database endpoint according to its prioritized sequence. " + + "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + } + } finally { + lock.unlock(); } } diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index 47b03c7773..9ddf8e810a 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -54,6 +56,8 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider * provided at startup via the MultiClusterClientConfig. All traffic will be routed according to this index. */ private volatile Integer activeMultiClusterIndex = 1; + + private final Lock activeClusterIndexLock = new ReentrantLock(true); /** * Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list @@ -162,8 +166,9 @@ public int incrementActiveMultiClusterIndex() { // Field-level synchronization is used to avoid the edge case in which // setActiveMultiClusterIndex(int multiClusterIndex) is called at the same time - synchronized (activeMultiClusterIndex) { - + activeClusterIndexLock.lock(); + + try { String originalClusterName = getClusterCircuitBreaker().getName(); // Only increment if it can pass this validation otherwise we will need to check for NULL in the data path @@ -185,6 +190,8 @@ public int incrementActiveMultiClusterIndex() { incrementActiveMultiClusterIndex(); else log.warn("Cluster/database endpoint successfully updated from '{}' to '{}'", originalClusterName, circuitBreaker.getName()); + } finally { + activeClusterIndexLock.unlock(); } return activeMultiClusterIndex; @@ -229,11 +236,13 @@ public void validateTargetConnection(int multiClusterIndex) { * Special care should be taken to confirm cluster/database availability AND * potentially cross-cluster replication BEFORE using this capability. */ - public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) { + public void setActiveMultiClusterIndex(int multiClusterIndex) { // Field-level synchronization is used to avoid the edge case in which // incrementActiveMultiClusterIndex() is called at the same time - synchronized (activeMultiClusterIndex) { + activeClusterIndexLock.lock(); + + try { // Allows an attempt to reset the current cluster from a FORCED_OPEN to CLOSED state in the event that no failover is possible if (activeMultiClusterIndex == multiClusterIndex && @@ -256,6 +265,8 @@ public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) { activeMultiClusterIndex = multiClusterIndex; lastClusterCircuitBreakerForcedOpen = false; + } finally { + activeClusterIndexLock.unlock(); } } diff --git a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java index 5058f07179..f2f0746460 100644 --- a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java @@ -5,6 +5,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class SentineledConnectionProvider implements ConnectionProvider { private final long subscribeRetryWaitTimeMillis; - private final Object initPoolLock = new Object(); + private final Lock initPoolLock = new ReentrantLock(true); public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig, Set sentinels, final JedisClientConfig sentinelClientConfig) { @@ -95,7 +97,9 @@ public HostAndPort getCurrentMaster() { } private void initMaster(HostAndPort master) { - synchronized (initPoolLock) { + initPoolLock.lock(); + + try { if (!master.equals(currentMaster)) { currentMaster = master; @@ -114,6 +118,8 @@ private void initMaster(HostAndPort master) { existingPool.close(); } } + } finally { + initPoolLock.unlock(); } } From 8044698cc05389f6b3d3e6e826559d3ee2cdc666 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 16 Jul 2024 00:35:28 +0600 Subject: [PATCH 2/5] Format spaces --- .../redis/clients/jedis/CommandObjects.java | 26 +++++++++---------- .../jedis/graph/GraphCommandObjects.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index c0c9014a5b..d434c97cb1 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -4432,20 +4432,20 @@ public final CommandObject tFunctionCallAsync(String library, String fun * @see DefaultGsonObjectMapper */ private JsonObjectMapper getJsonObjectMapper() { - JsonObjectMapper localRef = this.jsonObjectMapper; - if (Objects.isNull(localRef)) { - mapperLock.lock(); - - try { - localRef = this.jsonObjectMapper; - if (Objects.isNull(localRef)) { - this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper(); - } - } finally { - mapperLock.unlock(); - } + JsonObjectMapper localRef = this.jsonObjectMapper; + if (Objects.isNull(localRef)) { + mapperLock.lock(); + + try { + localRef = this.jsonObjectMapper; + if (Objects.isNull(localRef)) { + this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper(); + } + } finally { + mapperLock.unlock(); + } } - return localRef; + return localRef; } public void setJsonObjectMapper(JsonObjectMapper jsonObjectMapper) { diff --git a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java index ec778da0be..7496e6e928 100644 --- a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java +++ b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java @@ -108,7 +108,7 @@ private Builder getBuilder(String graphName) { } private void createBuilder(String graphName) { - builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey))); + builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey))); } private class GraphCacheImpl implements GraphCache { From 2313a90ff1825063acedad817ac65814e3d93a99 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 16 Jul 2024 00:37:01 +0600 Subject: [PATCH 3/5] Format spaces --- src/main/java/redis/clients/jedis/CommandObjects.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index d434c97cb1..3b1d06b039 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -4444,7 +4444,7 @@ private JsonObjectMapper getJsonObjectMapper() { } finally { mapperLock.unlock(); } - } + } return localRef; } From af154082d7ccc3e5da6f1486fe826eb86834a66f Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:42:00 +0600 Subject: [PATCH 4/5] Format imports --- .../clients/jedis/mcf/CircuitBreakerFailoverBase.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index c192867ff3..91ad29d9a0 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -1,14 +1,13 @@ package redis.clients.jedis.mcf; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import redis.clients.jedis.annots.Experimental; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.util.IOUtils; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - /** * @author Allen Terleto (aterleto) *

@@ -66,4 +65,4 @@ else if (provider.isLastClusterCircuitBreakerForcedOpen()) { } } -} \ No newline at end of file +} From cf6590a2e4c4dee541b96b4f5c3ca67209210eea Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:43:15 +0600 Subject: [PATCH 5/5] Double check lock is back and so the respective comment --- src/main/java/redis/clients/jedis/CommandObjects.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 3b1d06b039..5c6af322dc 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -4427,7 +4427,11 @@ public final CommandObject tFunctionCallAsync(String library, String fun /** * Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with * default implementation will be created and returned. - * + *

This process of checking whether or not + * the instance reference exists follows 'double-checked lock optimization' approach to reduce the overhead of + * acquiring a lock by testing the lock criteria (the "lock hint") before acquiring the lock.

* @return the JsonObjectMapper instance reference * @see DefaultGsonObjectMapper */