From 2e6a7b4d6146611fbf5da1a1129968cfd22d7fa4 Mon Sep 17 00:00:00 2001 From: Amogh Shetkar Date: Mon, 7 Oct 2024 08:46:30 +0000 Subject: [PATCH] (git cherry-pick -n 5a1f2f0f) Support for connection load balancing across read-replica cluster nodes (#22) * Allow new values of load-balance: any, only-primary, only-rr, prefer-primary and prefer-rr * Make load-balance property editable in YBClusterAwareDataSource * Add missing lb properties to PGProperty --- README.md | 36 ++++-- examples/pom.xml | 2 +- .../examples/UniformLoadBalanceExample.java | 2 +- gradle.properties | 2 +- .../ysql/ClusterAwareLoadBalancer.java | 106 ++++++++++++------ .../yugabyte/ysql/LoadBalanceProperties.java | 68 +++++++---- .../com/yugabyte/ysql/LoadBalanceService.java | 69 ++++++++++-- .../java/com/yugabyte/ysql/LoadBalancer.java | 7 +- .../ysql/TopologyAwareLoadBalancer.java | 68 ++++++++--- .../ysql/YBClusterAwareDataSource.java | 58 +++++++++- .../src/main/java/org/postgresql/Driver.java | 2 +- .../main/java/org/postgresql/PGProperty.java | 11 ++ 12 files changed, 327 insertions(+), 104 deletions(-) diff --git a/README.md b/README.md index 3b138cb633..2e80d88d8a 100644 --- a/README.md +++ b/README.md @@ -20,26 +20,34 @@ This is similar to 'Cluster Awareness' but uses those servers which are part of ### Connection Properties added for load balancing -- _load-balance_ - It expects **true/false** as its possible values. In YBClusterAwareDataSource load balancing is true by default. However when using the DriverManager.getConnection() API the 'load-balance' property needs to be set to 'true'. +- _load-balance_ - Starting with version 42.3.5-yb-7, it expects one of **false, any (same as true), only-primary, only-rr, prefer-primary and prefer-rr** as its possible values. In `YBClusterAwareDataSource` load balancing is `true` by default. However, when using the `DriverManager.getConnection()` API the 'load-balance' property is considered to be `false` by default. + - _false_ - No connection load balancing. Behaviour is similar to vanilla PGJDBC driver + - _any_ - Same as value _true_. Distribute connections equally across all nodes in the cluster, irrespective of its type (`primary` or `read-replica`) + - _only-primary_ - Create connections equally across only the primary nodes of the cluster + - _only-rr_ - Create connections equally across only the read-replica nodes of the cluster + - _prefer-primary_ - Create connections equally across primary cluster nodes. If none available, on any available read replica node in the cluster + - _prefer-rr_ - Create connections equally across read replica nodes of the cluster. If none available, on any available primary cluster node - _topology-keys_ - It takes a comma separated geo-location values. A single geo-location can be given as 'cloud.region.zone'. Multiple geo-locations too can be specified, separated by comma (`,`). -- _yb-servers-refresh-interval_ - Time interval, in seconds, between two attempts to refresh the information about cluster nodes. Default is 300. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used. +- _yb-servers-refresh-interval_ - Time interval, in seconds, between two attempts to refresh the information about cluster nodes. Default is 300 seconds. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used. +- _fallback-to-topology-keys-only_ - Decides if the driver can fall back to nodes outside of the given placements for new connections, if the nodes in the given placements are not available. Value `true` means stick to explicitly given placements for fallback, else fail. Value `false` means fall back to entire cluster nodes when nodes in the given placements are unavailable. Default is `false`. It is ignored if `topology-keys` is not specified or `load-balance` is set to either `prefer-primary` or `prefer-rr`. +- _failed-host-reconnect-delay-secs_ - When the driver cannot connect to a server, it marks it as _failed_ with a timestamp. Later, whenever it refreshes the server list via `yb_servers()`, if it sees the failed server in the response, it marks the server as UP only if the time specified via this property has elapsed since the time it was last marked as a failed host. Default is 5 seconds. -Please refer to the [Use the Driver](#Use the Driver) section for examples. +Please refer to the [Use the Driver](#use-the-driver) section for examples. ### Get the Driver ### From Maven -Either add the following lines to your maven project in pom.xml file (Use the latest version available), +Add the following lines to your maven project in pom.xml file (Use the latest version available), ``` com.yugabyte jdbc-yugabytedb - 42.3.5-yb-6 + ${driver.version} ``` -or you can visit to this link for the latest version of dependency: https://search.maven.org/artifact/com.yugabyte/jdbc-yugabytedb +You can visit to this link for the latest version of the driver: https://search.maven.org/artifact/com.yugabyte/jdbc-yugabytedb ### Build locally @@ -69,7 +77,7 @@ or you can visit to this link for the latest version of dependency: https://sear com.yugabyte jdbc-yugabytedb - 42.3.5-yb-6 + ${driver.version} ``` > **Note:** You need to have installed 2.7.2.0-b0 or above version of YugabyteDB on your system for load balancing to work. @@ -78,19 +86,25 @@ or you can visit to this link for the latest version of dependency: https://sear - Passing new connection properties for load balancing in connection url or properties bag - For uniform load balancing across all the server you just need to specify the _load-balance=true_ property in the url. + For uniform load balancing across all the servers you just need to specify the _load-balance_ property in the url: ``` - String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=true"; + String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=any"; DriverManager.getConnection(yburl); ``` - For specifying topology keys you need to set the additional property with a valid comma separated value. - + For specifying topology keys you need to set the additional property with a valid comma separated value: ``` String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=true&topology-keys=cloud1.region1.zone1,cloud1.region1.zone2"; DriverManager.getConnection(yburl); ``` + If you have a read-replica cluster in your universe and want to connect your app strictly to the read-replica nodes in the universe (for example, because its a read-only app and you don't want to affect primary nodes which are servicing write-workloads): + ``` + String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=only-rr"; + DriverManager.getConnection(yburl); + ``` + If no read-replica nodes are available above, the driver will attempt to connect to the endpoint(s) given in the url; `127.0.0.1` in this case. + ### Specifying fallback zones For topology-aware load balancing, you can now specify fallback placements too. This is not applicable for cluster-aware load balancing. diff --git a/examples/pom.xml b/examples/pom.xml index e10e45fbec..c1f8e2c8f2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -11,7 +11,7 @@ com.yugabyte jdbc-yugabytedb - 42.3.5-yb-6 + 42.3.5-yb-7 diff --git a/examples/src/main/java/com/yugabyte/examples/UniformLoadBalanceExample.java b/examples/src/main/java/com/yugabyte/examples/UniformLoadBalanceExample.java index c2047f7021..0b4f3749d2 100644 --- a/examples/src/main/java/com/yugabyte/examples/UniformLoadBalanceExample.java +++ b/examples/src/main/java/com/yugabyte/examples/UniformLoadBalanceExample.java @@ -51,7 +51,7 @@ public static void main(String[] args) { System.out.println("Setting up the connection pool having 6 connections......."); - testUsingHikariPool("uniform_load_balance", "true", "simple", + testUsingHikariPool("uniform_load_balance", "true", "ignored", controlHost, controlPort, numConnections, verbose, interactive); } diff --git a/gradle.properties b/gradle.properties index abecab04b5..c6ac771689 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,7 +14,7 @@ kotlin.code.style=official # This is version for PgJdbc itself # Note: it should not include "-SNAPSHOT" as it is automatically added by build.gradle.kts # Release version can be generated by using -Prelease or -Prc= arguments -pgjdbc.version=42.3.5-yb-6 +pgjdbc.version=42.3.5-yb-7 lastEditYear=2024 # The options below configures the use of local clone (e.g. testing development versions) diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/ClusterAwareLoadBalancer.java b/pgjdbc/src/main/java/com/yugabyte/ysql/ClusterAwareLoadBalancer.java index 64d3a85591..eeaa6e4dee 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/ClusterAwareLoadBalancer.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/ClusterAwareLoadBalancer.java @@ -13,9 +13,11 @@ package com.yugabyte.ysql; -import static com.yugabyte.ysql.LoadBalanceProperties.*; +import com.yugabyte.ysql.LoadBalanceService.LoadBalanceType; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; @@ -23,7 +25,9 @@ public class ClusterAwareLoadBalancer implements LoadBalancer { protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + ClusterAwareLoadBalancer.class.getName()); private static volatile ClusterAwareLoadBalancer instance; - List attempted = new ArrayList<>(); + private List attempted = new ArrayList<>(); + private final LoadBalanceService.LoadBalanceType loadBalance; + private byte requestFlags; @Override public int getRefreshListSeconds() { @@ -32,66 +36,94 @@ public int getRefreshListSeconds() { protected int refreshListSeconds = LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL; - public ClusterAwareLoadBalancer() { + public ClusterAwareLoadBalancer(LoadBalanceService.LoadBalanceType lb, int refreshInterval) { + this.loadBalance = lb; + this.refreshListSeconds = refreshInterval; } - public static ClusterAwareLoadBalancer getInstance(int refreshListSeconds) { + public static ClusterAwareLoadBalancer getInstance(LoadBalanceService.LoadBalanceType lb, + int refreshListSeconds) { if (instance == null) { synchronized (ClusterAwareLoadBalancer.class) { if (instance == null) { - instance = new ClusterAwareLoadBalancer(); + instance = new ClusterAwareLoadBalancer(lb, refreshListSeconds); instance.refreshListSeconds = refreshListSeconds >= 0 && refreshListSeconds <= LoadBalanceProperties.MAX_REFRESH_INTERVAL ? refreshListSeconds : LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL; - LOGGER.fine("Created a new cluster-aware LB instance with refresh" + - " interval " + instance.refreshListSeconds + " seconds"); + LOGGER.fine("Created a new cluster-aware LB instance with loadbalance = " + + instance.loadBalance + " and refresh interval " + instance.refreshListSeconds + " seconds"); } } } return instance; } + public String toString() { + return this.getClass().getSimpleName() + ": loadBalance = " + + loadBalance + ", refreshInterval = " + refreshListSeconds; + } + @Override - public boolean isHostEligible(Map.Entry e) { - return !attempted.contains(e.getKey()) && !e.getValue().isDown(); + public boolean isHostEligible(Map.Entry e, + Byte requestFlags) { + // e.getKey() is the hostname + return !attempted.contains(e.getKey()) && !e.getValue().isDown() + && LoadBalanceService.isRightNodeType(loadBalance, e.getValue().getNodeType(), requestFlags); } - public synchronized String getLeastLoadedServer(boolean newRequest, List failedHosts, ArrayList timedOutHosts) { - LOGGER.fine("failedHosts: " + failedHosts + ", timedOutHosts: " + timedOutHosts); + public synchronized String getLeastLoadedServer(boolean newRequest, List failedHosts, + ArrayList timedOutHosts) { attempted = failedHosts; if (timedOutHosts != null) { attempted.addAll(timedOutHosts); } - ArrayList hosts = LoadBalanceService.getAllEligibleHosts(this); + requestFlags = newRequest ? LoadBalanceService.STRICT_PREFERENCE : requestFlags; + LOGGER.fine("newRequest: " + newRequest + ", failedHosts: " + failedHosts + + ", timedOutHosts: " + timedOutHosts + ", requestFlags: " + requestFlags); + String chosenHost = null; - int min = Integer.MAX_VALUE; - ArrayList minConnectionsHostList = new ArrayList<>(); - for (String h : hosts) { - boolean wasTimedOutHost = timedOutHosts != null && timedOutHosts.contains(h); - if (failedHosts.contains(h) || wasTimedOutHost) { - LOGGER.fine("Skipping failed host " + h + "(was timed out host=" + wasTimedOutHost +")"); - continue; + while (true) { + ArrayList hosts = LoadBalanceService.getAllEligibleHosts(this, requestFlags); + int min = Integer.MAX_VALUE; + ArrayList minConnectionsHostList = new ArrayList<>(); + for (String h : hosts) { + boolean wasTimedOutHost = timedOutHosts != null && timedOutHosts.contains(h); + if (failedHosts.contains(h) || wasTimedOutHost) { + LOGGER.fine("Skipping failed host " + h + "(was timed out host=" + wasTimedOutHost + ")"); + continue; + } + int currLoad = LoadBalanceService.getLoad(h); + LOGGER.fine("Number of connections to " + h + ": " + currLoad); + if (currLoad < min) { + min = currLoad; + minConnectionsHostList.clear(); + minConnectionsHostList.add(h); + } else if (currLoad == min) { + minConnectionsHostList.add(h); + } } - int currLoad = LoadBalanceService.getLoad(h); - LOGGER.fine("Number of connections to " + h + ": " + currLoad); - if (currLoad < min) { - min = currLoad; - minConnectionsHostList.clear(); - minConnectionsHostList.add(h); - } else if (currLoad == min) { - minConnectionsHostList.add(h); + // Choose a random from the minimum list + if (!minConnectionsHostList.isEmpty()) { + int idx = ThreadLocalRandom.current().nextInt(0, minConnectionsHostList.size()); + chosenHost = minConnectionsHostList.get(idx); + } + if (chosenHost != null) { + LoadBalanceService.incrementConnectionCount(chosenHost); + break; // We got a host + } else if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) { + // Relax the STRICT_PREFERENCE condition and consider other node types + requestFlags = (byte) 0; + } else { + break; // No more hosts to try } - } - // Choose a random from the minimum list - String chosenHost = null; - if (minConnectionsHostList.size() > 0) { - int idx = ThreadLocalRandom.current().nextInt(0, minConnectionsHostList.size()); - chosenHost = minConnectionsHostList.get(idx); - } - if (chosenHost != null) { - LoadBalanceService.incrementConnectionCount(chosenHost); } LOGGER.fine("Host chosen for new connection: " + chosenHost); + if (chosenHost == null && (loadBalance == LoadBalanceType.ONLY_PRIMARY || + loadBalance == LoadBalanceType.ONLY_RR)) { + throw new IllegalStateException("No node available in " + + (loadBalance == LoadBalanceType.ONLY_PRIMARY ? "primary" : "read-replica") + + " cluster to connect to."); + } return chosenHost; } diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceProperties.java b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceProperties.java index 9e322f3372..f3d19a829e 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceProperties.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceProperties.java @@ -21,7 +21,6 @@ import java.util.logging.Logger; public class LoadBalanceProperties { - private static final String SIMPLE_LB = "simple"; public static final String LOAD_BALANCE_PROPERTY_KEY = "load-balance"; public static final String TOPOLOGY_AWARE_PROPERTY_KEY = "topology-keys"; public static final String REFRESH_INTERVAL_KEY = "yb-servers-refresh-interval"; @@ -62,11 +61,11 @@ public class LoadBalanceProperties { private static final Map CONNECTION_MANAGER_MAP = new HashMap<>(); - private static Map loadBalancePropertiesMap = - new ConcurrentHashMap(); + private static final Map loadBalancePropertiesMap = + new ConcurrentHashMap<>(); private final String originalUrl; private final Properties originalProperties; - private boolean hasLoadBalance; + private LoadBalanceService.LoadBalanceType loadBalance; private final String ybURL; private String placements = null; private int refreshInterval = -1; @@ -114,7 +113,7 @@ public String processURLAndProperties() { String loadBalancerKey = LOAD_BALANCE_PROPERTY_KEY + EQUALS; String topologyKey = TOPOLOGY_AWARE_PROPERTY_KEY + EQUALS; String refreshIntervalKey = REFRESH_INTERVAL_KEY + EQUALS; - String explicitFallbackOnlyKey = EXPLICIT_FALLBACK_ONLY_KEY; + String explicitFallbackOnlyKey = EXPLICIT_FALLBACK_ONLY_KEY + EQUALS; String failedHostReconnectDelayKey = FAILED_HOST_RECONNECT_DELAY_SECS_KEY + EQUALS; for (String part : urlParts) { if (part.startsWith(loadBalancerKey)) { @@ -124,9 +123,7 @@ public String processURLAndProperties() { continue; } String propValue = lbParts[1]; - if (propValue.equalsIgnoreCase("true")) { - this.hasLoadBalance = true; - } + setLoadBalanceValue(propValue); } else if (part.startsWith(topologyKey)) { String[] lbParts = part.split(EQUALS); if (lbParts.length != 2) { @@ -178,9 +175,7 @@ public String processURLAndProperties() { if (originalProperties != null) { if (originalProperties.containsKey(LOAD_BALANCE_PROPERTY_KEY)) { String propValue = originalProperties.getProperty(LOAD_BALANCE_PROPERTY_KEY); - if (propValue.equalsIgnoreCase("true")) { - hasLoadBalance = true; - } + setLoadBalanceValue(propValue); } if (originalProperties.containsKey(TOPOLOGY_AWARE_PROPERTY_KEY)) { String propValue = originalProperties.getProperty(TOPOLOGY_AWARE_PROPERTY_KEY); @@ -207,6 +202,33 @@ public String processURLAndProperties() { return sb.toString(); } + private void setLoadBalanceValue(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "true": + case "any": + this.loadBalance = LoadBalanceService.LoadBalanceType.ANY; + break; + case "prefer-primary": + this.loadBalance = LoadBalanceService.LoadBalanceType.PREFER_PRIMARY; + break; + case "prefer-rr": + this.loadBalance = LoadBalanceService.LoadBalanceType.PREFER_RR; + break; + case "only-primary": + this.loadBalance = LoadBalanceService.LoadBalanceType.ONLY_PRIMARY; + break; + case "only-rr": + this.loadBalance = LoadBalanceService.LoadBalanceType.ONLY_RR; + break; + case "false": + this.loadBalance = LoadBalanceService.LoadBalanceType.FALSE; + break; + default: + LOGGER.warning("Invalid value for load-balance: " + value + ", ignoring it."); + } + LOGGER.fine("loadbalance value set to " + this.loadBalance); + } + private int parseAndGetValue(String propValue, int defaultValue, int maxValue) { try { int value = Integer.parseInt(propValue); @@ -230,8 +252,8 @@ public Properties getOriginalProperties() { return originalProperties; } - public boolean hasLoadBalance() { - return hasLoadBalance; + public boolean isLoadBalanceEnabled() { + return this.loadBalance != LoadBalanceService.LoadBalanceType.FALSE; } public String getPlacements() { @@ -243,7 +265,7 @@ public String getStrippedURL() { } public LoadBalancer getAppropriateLoadBalancer() { - if (!hasLoadBalance) { + if (!isLoadBalanceEnabled()) { throw new IllegalStateException( "This method is expected to be called only when load-balance is true"); } @@ -258,27 +280,33 @@ public LoadBalancer getAppropriateLoadBalancer() { LoadBalancer ld = null; if (placements == null) { // return base class conn manager. - ld = CONNECTION_MANAGER_MAP.get(SIMPLE_LB); + ld = CONNECTION_MANAGER_MAP.get(this.loadBalance.name()); if (ld == null) { + LOGGER.fine("No LB found for " + this.loadBalance + ", creating one ..."); synchronized (CONNECTION_MANAGER_MAP) { - ld = CONNECTION_MANAGER_MAP.get(SIMPLE_LB); + ld = CONNECTION_MANAGER_MAP.get(this.loadBalance.name()); if (ld == null) { - ld = ClusterAwareLoadBalancer.getInstance(refreshInterval); - CONNECTION_MANAGER_MAP.put(SIMPLE_LB, ld); + ld = new ClusterAwareLoadBalancer(this.loadBalance, refreshInterval); + CONNECTION_MANAGER_MAP.put(this.loadBalance.name(), ld); } } + } else { + LOGGER.fine("LB found for " + this.loadBalance + ": " + ld); } } else { - String key = placements + "&" + String.valueOf(explicitFallbackOnly).toLowerCase(Locale.ROOT); + String key = this.loadBalance.name() + "&" + placements + "&" + String.valueOf(explicitFallbackOnly).toLowerCase(Locale.ROOT); ld = CONNECTION_MANAGER_MAP.get(key); if (ld == null) { + LOGGER.fine("No LB found for " + this.loadBalance + " and placements " + placements + " and fallback? " + explicitFallbackOnly + ", creating one ..."); synchronized (CONNECTION_MANAGER_MAP) { ld = CONNECTION_MANAGER_MAP.get(key); if (ld == null) { - ld = new TopologyAwareLoadBalancer(placements, explicitFallbackOnly); + ld = new TopologyAwareLoadBalancer(loadBalance, placements, explicitFallbackOnly); CONNECTION_MANAGER_MAP.put(key, ld); } } + } else { + LOGGER.fine("LB found for " + this.loadBalance + " and placements " + placements + ": " + ld); } } return ld; diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceService.java b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceService.java index 0bb5deab87..56e6f7c388 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceService.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceService.java @@ -28,6 +28,7 @@ public class LoadBalanceService { private static final ConcurrentHashMap clusterInfoMap = new ConcurrentHashMap<>(); + static final byte STRICT_PREFERENCE = 0b00000001; private static Connection controlConnection = null; protected static final String GET_SERVERS_QUERY = "select * from yb_servers()"; protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + LoadBalanceService.class.getName()); @@ -49,10 +50,13 @@ public static void printHostToConnectionMap() { /** * FOR TEST PURPOSE ONLY */ - static synchronized void clear() { + static synchronized void clear() throws SQLException { LOGGER.warning("Clearing LoadBalanceService state for testing purposes"); clusterInfoMap.clear(); - controlConnection = null; + if (controlConnection != null) { + controlConnection.close(); + controlConnection = null; + } lastRefreshTime = 0; forceRefreshOnce = false; useHostColumn = null; @@ -94,12 +98,15 @@ private static synchronized boolean refresh(Connection conn, long refreshInterva String cloud = rs.getString("cloud"); String region = rs.getString("region"); String zone = rs.getString("zone"); + String nodeType = rs.getString("node_type"); NodeInfo nodeInfo = clusterInfoMap.containsKey(host) ? clusterInfoMap.get(host) : new NodeInfo(); synchronized (nodeInfo) { nodeInfo.host = host; nodeInfo.publicIP = publicHost; publicIPsGivenForAll = !publicHost.isEmpty(); nodeInfo.placement = new CloudPlacement(cloud, region, zone); + LOGGER.fine("Setting node_type to " + nodeType + " for host " + host); + nodeInfo.nodeType = nodeType; try { nodeInfo.port = Integer.valueOf(port); } catch (NumberFormatException nfe) { @@ -176,11 +183,11 @@ static int getLoad(String host) { return info == null ? 0 : info.connectionCount; } - static ArrayList getAllEligibleHosts(LoadBalancer policy) { + static ArrayList getAllEligibleHosts(LoadBalancer policy, Byte requestFlags) { ArrayList list = new ArrayList<>(); Set> set = clusterInfoMap.entrySet(); for (Map.Entry e : set) { - if (policy.isHostEligible(e)) { + if (policy.isHostEligible(e, requestFlags)) { list.add(e.getKey()); } else { LOGGER.finest("Skipping " + e + " because it is not eligible."); @@ -211,8 +218,8 @@ static boolean incrementConnectionCount(String host) { if (info != null) { synchronized (info) { if (info.connectionCount < 0) { - info.connectionCount = 0; LOGGER.fine("Resetting connection count for " + host + " to zero from " + info.connectionCount); + info.connectionCount = 0; } info.connectionCount += 1; return true; @@ -240,8 +247,8 @@ public static boolean decrementConnectionCount(String host) { public static Connection getConnection(String url, Properties properties, LoadBalanceProperties lbProperties, ArrayList timedOutHosts) { // Cleanup extra properties used for load balancing? - if (lbProperties.hasLoadBalance()) { - Connection conn = getConnection(lbProperties, properties, timedOutHosts); + if (lbProperties.isLoadBalanceEnabled()) { + Connection conn = getConnection(lbProperties, properties, user, database, timedOutHosts); if (conn != null) { return conn; } @@ -307,7 +314,6 @@ private static Connection getConnection(LoadBalanceProperties loadBalancePropert } /** - * * @param loadBalanceProperties * @param lb LoadBalancer instance * @param user @@ -319,14 +325,16 @@ private static synchronized boolean checkAndRefresh(LoadBalanceProperties loadBa if (needsRefresh(lb.getRefreshListSeconds())) { Properties props = loadBalanceProperties.getOriginalProperties(); String url = loadBalanceProperties.getStrippedURL(); - HostSpec[] hspec = hostSpecs(props); + Properties properties = new Properties(props); + properties.setProperty("socketTimeout", "15"); + HostSpec[] hspec = hostSpecs(properties); ArrayList hosts = getAllAvailableHosts(new ArrayList<>()); while (true) { boolean refreshFailed = false; try { if (controlConnection == null || controlConnection.isClosed()) { - controlConnection = new PgConnection(hspec, props, url); + controlConnection = new PgConnection(hspec, user, dbName, properties, url); } try { refresh(controlConnection, lb.getRefreshListSeconds()); @@ -392,12 +400,39 @@ private static InetAddress getConnectedInetAddress(Connection conn) throws SQLEx return hostConnectedInetAddr; } - static class NodeInfo { + static boolean isRightNodeType(LoadBalanceType loadBalance, String nodeType, byte requestFlags) { + LOGGER.fine("loadBalance " + loadBalance + ", nodeType: " + nodeType + ", requestFlags: " + requestFlags); + switch (loadBalance) { + case ANY: + return true; + case ONLY_PRIMARY: + return nodeType.equalsIgnoreCase("primary"); + case ONLY_RR: + return nodeType.equalsIgnoreCase("read_replica"); + case PREFER_PRIMARY: + if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) { + return nodeType.equalsIgnoreCase("primary"); + } else { + return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica"); + } + case PREFER_RR: + if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) { + return nodeType.equalsIgnoreCase("read_replica"); + } else { + return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica"); + } + default: + return false; + } + } + + public static class NodeInfo { private String host; private int port; private CloudPlacement placement; private String publicIP; + private String nodeType; private int connectionCount; private boolean isDown; private long isDownSince; @@ -429,6 +464,14 @@ public int getConnectionCount() { public long getIsDownSince() { return isDownSince; } + + public String getNodeType() { + return nodeType; + } + + public void setNodeType(String nodeType) { + this.nodeType = nodeType; + } } static class CloudPlacement { @@ -482,4 +525,8 @@ public String toString() { return "CloudPlacement: " + cloud + "." + region + "." + zone; } } + + public enum LoadBalanceType { + FALSE, ANY, PREFER_PRIMARY, PREFER_RR, ONLY_PRIMARY, ONLY_RR + } } diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java index 762f33b068..3a7e902eb2 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java @@ -16,13 +16,14 @@ public interface LoadBalancer { /** * @param e The {@link LoadBalanceService.NodeInfo} object for the host + * @param requestFlags The attributes for the load balancer to make use of * @return true, if a host is eligible to be considered for a connection request */ - boolean isHostEligible(Map.Entry e); + boolean isHostEligible(Map.Entry e, Byte requestFlags); /** - * @param newRequest whether this invocation is first for a new connection request - * @param failedHosts list of host names which have been known to be down + * @param newRequest whether this invocation is first for a new connection request + * @param failedHosts list of host names which have been known to be down * @param timedOutHosts list of host names where connections were attempted but timed out * @return the name of a host with the least number of connections, as per the driver's stats */ diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/TopologyAwareLoadBalancer.java b/pgjdbc/src/main/java/com/yugabyte/ysql/TopologyAwareLoadBalancer.java index a382bf25c3..598887c794 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/TopologyAwareLoadBalancer.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/TopologyAwareLoadBalancer.java @@ -20,6 +20,8 @@ import static com.yugabyte.ysql.LoadBalanceProperties.REFRESH_INTERVAL_KEY; import static com.yugabyte.ysql.LoadBalanceProperties.TOPOLOGY_AWARE_PROPERTY_KEY; +import com.yugabyte.ysql.LoadBalanceService.LoadBalanceType; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -35,6 +37,7 @@ public class TopologyAwareLoadBalancer implements LoadBalancer { * Holds the value of topology-keys specified. */ private final String placements; + private final LoadBalanceService.LoadBalanceType loadBalance; private long lastRequestTime; /** @@ -48,11 +51,13 @@ public class TopologyAwareLoadBalancer implements LoadBalancer { * reset to zero for a new connection request. */ private int currentPlacementIndex = 1; - List attempted = new ArrayList<>(); - private int refreshIntervalSeconds; + private List attempted = new ArrayList<>(); + private final int refreshIntervalSeconds; private boolean explicitFallbackOnly = false; + private byte requestFlags; - public TopologyAwareLoadBalancer(String placementValues, boolean onlyExplicitFallback) { + public TopologyAwareLoadBalancer(LoadBalanceType lb, String placementValues, boolean onlyExplicitFallback) { + loadBalance = lb; placements = placementValues; explicitFallbackOnly = onlyExplicitFallback; refreshIntervalSeconds = Integer.getInteger(REFRESH_INTERVAL_KEY, DEFAULT_REFRESH_INTERVAL); @@ -92,7 +97,7 @@ private void parseGeoLocations() { allowedPlacements.computeIfAbsent(PRIMARY_PLACEMENTS_INDEX, k -> new HashSet<>()); populatePlacementSet(v[0], primary); } else { - int pref = Integer.valueOf(v[1]); + int pref = Integer.parseInt(v[1]); if (pref > 0 && pref <= MAX_PREFERENCE_VALUE) { Set cpSet = allowedPlacements.computeIfAbsent(pref, k -> new HashSet<>()); populatePlacementSet(v[0], cpSet); @@ -101,6 +106,7 @@ private void parseGeoLocations() { } } } + LOGGER.fine("allowedPlacements: " + allowedPlacements); } @Override @@ -113,17 +119,27 @@ public boolean isExplicitFallbackOnly() { } @Override - public boolean isHostEligible(Map.Entry e) { + public boolean isHostEligible(Map.Entry e, + Byte requestFlags) { Set set = allowedPlacements.get(currentPlacementIndex); - boolean found = (currentPlacementIndex == REST_OF_CLUSTER_INDEX && !explicitFallbackOnly) - || (set != null && e.getValue().getPlacement().isContainedIn(set)); + // found is true when: + // we are searching for nodes in entire cluster AND fallback-to-topology-keys-only is false + // OR + // we are searching for nodes in entire cluster AND load-balance is set to prefer-* + // OR + // allowed placements contain the node's placement + boolean found = (currentPlacementIndex == REST_OF_CLUSTER_INDEX + && (!explicitFallbackOnly || loadBalance == LoadBalanceType.PREFER_PRIMARY || loadBalance == LoadBalanceType.PREFER_RR)) + || (set != null && e.getValue().getPlacement().isContainedIn(set)); + boolean isRightNode = LoadBalanceService.isRightNodeType(loadBalance, e.getValue().getNodeType(), requestFlags); boolean isAttempted = attempted.contains(e.getKey()); boolean isDown = e.getValue().isDown(); - LOGGER.fine(e.getKey() + " has required placement? " + found + ", isDown? " - + isDown + ", attempted? " + isAttempted); + LOGGER.fine(e.getKey() + " has currentPlacementIndex " + currentPlacementIndex + ", required placement? " + + found + ", isDown? " + isDown + ", attempted? " + isAttempted + ", isRightNodeType? " + isRightNode); return found && !isAttempted - && !isDown; + && !isDown + && isRightNode; } public synchronized String getLeastLoadedServer(boolean newRequest, List failedHosts, ArrayList timedOutHosts) { @@ -135,16 +151,17 @@ public synchronized String getLeastLoadedServer(boolean newRequest, List } else { LOGGER.fine("Placements: [" + placements + "]. Attempting to connect to servers in fallback level-" - + (currentPlacementIndex-1) + " ..."); + + (currentPlacementIndex - 1) + " ..."); } ArrayList hosts; String chosenHost = null; + requestFlags = newRequest ? LoadBalanceService.STRICT_PREFERENCE : requestFlags; while (chosenHost == null && currentPlacementIndex <= MAX_PREFERENCE_VALUE) { attempted = failedHosts; if (timedOutHosts != null) { attempted.addAll(timedOutHosts); } - hosts = LoadBalanceService.getAllEligibleHosts(this); + hosts = LoadBalanceService.getAllEligibleHosts(this, requestFlags); int min = Integer.MAX_VALUE; ArrayList minConnectionsHostList = new ArrayList<>(); @@ -164,7 +181,7 @@ public synchronized String getLeastLoadedServer(boolean newRequest, List } } // Choose a random from the minimum list - if (minConnectionsHostList.size() > 0) { + if (!minConnectionsHostList.isEmpty()) { int idx = ThreadLocalRandom.current().nextInt(0, minConnectionsHostList.size()); chosenHost = minConnectionsHostList.get(idx); } @@ -173,25 +190,42 @@ public synchronized String getLeastLoadedServer(boolean newRequest, List } else { LOGGER.fine("chosenHost is null for placement level " + currentPlacementIndex + ", allowedPlacements: " + allowedPlacements); + // No host found. Go to the next placement level. currentPlacementIndex += 1; while (allowedPlacements.get(currentPlacementIndex) == null && currentPlacementIndex > 0) { currentPlacementIndex += 1; if (currentPlacementIndex > MAX_PREFERENCE_VALUE) { // All explicit fallbacks are done with no luck. Now try rest-of-cluster currentPlacementIndex = REST_OF_CLUSTER_INDEX; - } else if (currentPlacementIndex == 0) { - // Even rest-of-cluster did not help. Quit - break; } } if (currentPlacementIndex == 0) { - break; + // No host found in entire cluster. Relax the STRICT_PREFERENCE if load-balance is prefer-* + if (requestFlags == LoadBalanceService.STRICT_PREFERENCE && + (loadBalance == LoadBalanceType.PREFER_PRIMARY || loadBalance == LoadBalanceType.PREFER_RR)) { + LOGGER.fine("Even rest of cluster did not have a host for us." + + " So relax the node type condition for prefer-* and try again once"); + currentPlacementIndex = REST_OF_CLUSTER_INDEX; + requestFlags = (byte) 0; + } else { + break; + } } LOGGER.fine("Next, attempting to connect to hosts from placement level " + currentPlacementIndex); } } lastRequestTime = System.currentTimeMillis(); LOGGER.fine("Host chosen for new connection: " + chosenHost); + // Throw error if no host is found AND load-balance=only-* OR + // load-balance=any AND fallback-to-topology-keys-only is true + if (chosenHost == null && + (loadBalance == LoadBalanceType.ONLY_PRIMARY || loadBalance == LoadBalanceType.ONLY_RR || + (loadBalance == LoadBalanceType.ANY && explicitFallbackOnly))) { + throw new IllegalStateException("No node available in the given placements for the " + + (loadBalance == LoadBalanceType.ONLY_PRIMARY ? "primary" : + (loadBalance == LoadBalanceType.ONLY_RR ? "read-replica" : "entire")) + + " cluster to connect to."); + } return chosenHost; } diff --git a/pgjdbc/src/main/java/com/yugabyte/ysql/YBClusterAwareDataSource.java b/pgjdbc/src/main/java/com/yugabyte/ysql/YBClusterAwareDataSource.java index 6d47ea3642..567889ce61 100644 --- a/pgjdbc/src/main/java/com/yugabyte/ysql/YBClusterAwareDataSource.java +++ b/pgjdbc/src/main/java/com/yugabyte/ysql/YBClusterAwareDataSource.java @@ -24,18 +24,74 @@ public YBClusterAwareDataSource() { private String additionalEndPoints; - private void setLoadBalance(String value) { + /** + * @param value load balance value + * @see PGProperty#YB_LOAD_BALANCE + */ + public void setLoadBalance(String value) { PGProperty.YB_LOAD_BALANCE.set(properties, value); } + /** + * @return load balance value + * @see PGProperty#YB_LOAD_BALANCE + */ + public boolean getLoadBalance() { + return PGProperty.YB_LOAD_BALANCE.getBoolean(properties); + } + public void setYbServersRefreshInterval(String value) { PGProperty.YB_SERVERS_REFRESH_INTERVAL.set(properties, value); } + /** + * @return yb_servers() refresh interval in seconds + * @see PGProperty#YB_SERVERS_REFRESH_INTERVAL + */ + public int getYbServersRefreshInterval() { + return PGProperty.YB_SERVERS_REFRESH_INTERVAL.getIntNoCheck(properties); + } + public void setTopologyKeys(String value) { PGProperty.YB_TOPOLOGY_KEYS.set(properties, value); } + /** + * @return topology keys + * @see PGProperty#YB_TOPOLOGY_KEYS + */ + public String getTopologyKeys() { + return PGProperty.YB_TOPOLOGY_KEYS.get(properties); + } + + /** + * @param value limit fallback to nodes in topology keys only + * @see PGProperty#YB_FALLBACK_TO_TOPOLOGY_KEYS_ONLY + */ + public void setFallbackToTopologyKeysOnly(String value) { + PGProperty.YB_FALLBACK_TO_TOPOLOGY_KEYS_ONLY.set(properties, value); + } + + /** + * @return boolean + * @see PGProperty#YB_FALLBACK_TO_TOPOLOGY_KEYS_ONLY + */ + public boolean isFallbackToTopologyKeysOnly() { + return PGProperty.YB_FALLBACK_TO_TOPOLOGY_KEYS_ONLY.getBoolean(properties); + } + + public void setFailedHostReconnectDelaySecs(String value) { + PGProperty.YB_FAILED_HOST_RECONNECT_DELAY_SECS.set(properties, value); + } + + /** + * @return delay in seconds + * @see PGProperty#YB_FAILED_HOST_RECONNECT_DELAY_SECS + */ + public int getFailedHostReconnectDelaySecs() { + return PGProperty.YB_FAILED_HOST_RECONNECT_DELAY_SECS.getIntNoCheck(properties); + } + // additionalEndpoints public void setAdditionalEndpoints(String value) { this.additionalEndPoints = value; diff --git a/pgjdbc/src/main/java/org/postgresql/Driver.java b/pgjdbc/src/main/java/org/postgresql/Driver.java index bbee0bcf87..acf19a316a 100644 --- a/pgjdbc/src/main/java/org/postgresql/Driver.java +++ b/pgjdbc/src/main/java/org/postgresql/Driver.java @@ -331,7 +331,7 @@ private Properties loadDefaultProperties() throws IOException { return ct.getResult(timeout); } catch (PSQLException ex1) { LOGGER.log(Level.INFO, "got exception state: " + ex1.getSQLState()); - if (lbprops.hasLoadBalance() && !prevTimedOutServers.isEmpty() && tries++ < maxRetries && + if (lbprops.isLoadBalanceEnabled() && !prevTimedOutServers.isEmpty() && tries++ < maxRetries && ex1.getSQLState().equals(PSQLState.CONNECTION_UNABLE_TO_CONNECT.getState())) { LOGGER.log(Level.INFO, "Connection timeout error occurred with server: " + prevTimedOutServers.get(prevTimedOutServers.size() - 1) + diff --git a/pgjdbc/src/main/java/org/postgresql/PGProperty.java b/pgjdbc/src/main/java/org/postgresql/PGProperty.java index 43d04c6486..5f929a33cf 100644 --- a/pgjdbc/src/main/java/org/postgresql/PGProperty.java +++ b/pgjdbc/src/main/java/org/postgresql/PGProperty.java @@ -576,6 +576,17 @@ public enum PGProperty { "300", "Refresh the list of nodes after given interval in seconds"), + YB_FALLBACK_TO_TOPOLOGY_KEYS_ONLY( + "fallback-to-topology-keys-only", + "false", + "Limit the connection attempts to the nodes in the given topology only"), + + YB_FAILED_HOST_RECONNECT_DELAY_SECS( + "failed-host-reconnect-delay-secs", + "5", + "Delay the attempt to create a connection to a node from yb_servers()" + + " response till this time elapses from the time it was marked as down"), + /** * Configure optimization to enable batch insert re-writing. */