Skip to content

Commit

Permalink
(git cherry-pick -n 5a1f2f0) Support for connection load balancing ac…
Browse files Browse the repository at this point in the history
…ross read-replica cluster nodes (#22)

* Allow new values of load-balance: any, only-primary, only-rr, prefer-primary and prefer-rr
* Make load-balance property editable in YBClusterAwareDataSource
* Add missing lb properties to PGProperty
  • Loading branch information
ashetkar committed Oct 7, 2024
1 parent be031fc commit 2e6a7b4
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 104 deletions.
36 changes: 25 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,34 @@ This is similar to 'Cluster Awareness' but uses those servers which are part of
### Connection Properties added for load balancing

- _load-balance_ - It expects **true/false** as its possible values. In YBClusterAwareDataSource load balancing is true by default. However when using the DriverManager.getConnection() API the 'load-balance' property needs to be set to 'true'.
- _load-balance_ - Starting with version 42.3.5-yb-7, it expects one of **false, any (same as true), only-primary, only-rr, prefer-primary and prefer-rr** as its possible values. In `YBClusterAwareDataSource` load balancing is `true` by default. However, when using the `DriverManager.getConnection()` API the 'load-balance' property is considered to be `false` by default.
- _false_ - No connection load balancing. Behaviour is similar to vanilla PGJDBC driver
- _any_ - Same as value _true_. Distribute connections equally across all nodes in the cluster, irrespective of its type (`primary` or `read-replica`)
- _only-primary_ - Create connections equally across only the primary nodes of the cluster
- _only-rr_ - Create connections equally across only the read-replica nodes of the cluster
- _prefer-primary_ - Create connections equally across primary cluster nodes. If none available, on any available read replica node in the cluster
- _prefer-rr_ - Create connections equally across read replica nodes of the cluster. If none available, on any available primary cluster node
- _topology-keys_ - It takes a comma separated geo-location values. A single geo-location can be given as 'cloud.region.zone'. Multiple geo-locations too can be specified, separated by comma (`,`).
- _yb-servers-refresh-interval_ - Time interval, in seconds, between two attempts to refresh the information about cluster nodes. Default is 300. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used.
- _yb-servers-refresh-interval_ - Time interval, in seconds, between two attempts to refresh the information about cluster nodes. Default is 300 seconds. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used.
- _fallback-to-topology-keys-only_ - Decides if the driver can fall back to nodes outside of the given placements for new connections, if the nodes in the given placements are not available. Value `true` means stick to explicitly given placements for fallback, else fail. Value `false` means fall back to entire cluster nodes when nodes in the given placements are unavailable. Default is `false`. It is ignored if `topology-keys` is not specified or `load-balance` is set to either `prefer-primary` or `prefer-rr`.
- _failed-host-reconnect-delay-secs_ - When the driver cannot connect to a server, it marks it as _failed_ with a timestamp. Later, whenever it refreshes the server list via `yb_servers()`, if it sees the failed server in the response, it marks the server as UP only if the time specified via this property has elapsed since the time it was last marked as a failed host. Default is 5 seconds.

Please refer to the [Use the Driver](#Use the Driver) section for examples.
Please refer to the [Use the Driver](#use-the-driver) section for examples.

### Get the Driver

### From Maven

Either add the following lines to your maven project in pom.xml file (Use the latest version available),
Add the following lines to your maven project in pom.xml file (Use the latest version available),
```
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>jdbc-yugabytedb</artifactId>
<version>42.3.5-yb-6</version>
<version>${driver.version}</version>
</dependency>
```

or you can visit to this link for the latest version of dependency: https://search.maven.org/artifact/com.yugabyte/jdbc-yugabytedb
You can visit to this link for the latest version of the driver: https://search.maven.org/artifact/com.yugabyte/jdbc-yugabytedb

### Build locally

Expand Down Expand Up @@ -69,7 +77,7 @@ or you can visit to this link for the latest version of dependency: https://sear
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>jdbc-yugabytedb</artifactId>
<version>42.3.5-yb-6</version>
<version>${driver.version}</version>
</dependency>
```
> **Note:** You need to have installed 2.7.2.0-b0 or above version of YugabyteDB on your system for load balancing to work.
Expand All @@ -78,19 +86,25 @@ or you can visit to this link for the latest version of dependency: https://sear
- Passing new connection properties for load balancing in connection url or properties bag
For uniform load balancing across all the server you just need to specify the _load-balance=true_ property in the url.
For uniform load balancing across all the servers you just need to specify the _load-balance_ property in the url:
```
String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=true";
String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=any";
DriverManager.getConnection(yburl);
```
For specifying topology keys you need to set the additional property with a valid comma separated value.
For specifying topology keys you need to set the additional property with a valid comma separated value:
```
String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=true&topology-keys=cloud1.region1.zone1,cloud1.region1.zone2";
DriverManager.getConnection(yburl);
```
If you have a read-replica cluster in your universe and want to connect your app strictly to the read-replica nodes in the universe (for example, because its a read-only app and you don't want to affect primary nodes which are servicing write-workloads):
```
String yburl = "jdbc:yugabytedb://127.0.0.1:5433/yugabyte?user=yugabyte&password=yugabyte&load-balance=only-rr";
DriverManager.getConnection(yburl);
```
If no read-replica nodes are available above, the driver will attempt to connect to the endpoint(s) given in the url; `127.0.0.1` in this case.
### Specifying fallback zones
For topology-aware load balancing, you can now specify fallback placements too. This is not applicable for cluster-aware load balancing.
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>jdbc-yugabytedb</artifactId>
<version>42.3.5-yb-6</version>
<version>42.3.5-yb-7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void main(String[] args) {

System.out.println("Setting up the connection pool having 6 connections.......");

testUsingHikariPool("uniform_load_balance", "true", "simple",
testUsingHikariPool("uniform_load_balance", "true", "ignored",
controlHost, controlPort, numConnections, verbose, interactive);
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ kotlin.code.style=official
# This is version for PgJdbc itself
# Note: it should not include "-SNAPSHOT" as it is automatically added by build.gradle.kts
# Release version can be generated by using -Prelease or -Prc=<int> arguments
pgjdbc.version=42.3.5-yb-6
pgjdbc.version=42.3.5-yb-7

lastEditYear=2024
# The options below configures the use of local clone (e.g. testing development versions)
Expand Down
106 changes: 69 additions & 37 deletions pgjdbc/src/main/java/com/yugabyte/ysql/ClusterAwareLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@

package com.yugabyte.ysql;

import static com.yugabyte.ysql.LoadBalanceProperties.*;
import com.yugabyte.ysql.LoadBalanceService.LoadBalanceType;

import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

public class ClusterAwareLoadBalancer implements LoadBalancer {
protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + ClusterAwareLoadBalancer.class.getName());

private static volatile ClusterAwareLoadBalancer instance;
List<String> attempted = new ArrayList<>();
private List<String> attempted = new ArrayList<>();
private final LoadBalanceService.LoadBalanceType loadBalance;
private byte requestFlags;

@Override
public int getRefreshListSeconds() {
Expand All @@ -32,66 +36,94 @@ public int getRefreshListSeconds() {

protected int refreshListSeconds = LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL;

public ClusterAwareLoadBalancer() {
public ClusterAwareLoadBalancer(LoadBalanceService.LoadBalanceType lb, int refreshInterval) {
this.loadBalance = lb;
this.refreshListSeconds = refreshInterval;
}

public static ClusterAwareLoadBalancer getInstance(int refreshListSeconds) {
public static ClusterAwareLoadBalancer getInstance(LoadBalanceService.LoadBalanceType lb,
int refreshListSeconds) {
if (instance == null) {
synchronized (ClusterAwareLoadBalancer.class) {
if (instance == null) {
instance = new ClusterAwareLoadBalancer();
instance = new ClusterAwareLoadBalancer(lb, refreshListSeconds);
instance.refreshListSeconds =
refreshListSeconds >= 0 && refreshListSeconds <= LoadBalanceProperties.MAX_REFRESH_INTERVAL ?
refreshListSeconds : LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL;
LOGGER.fine("Created a new cluster-aware LB instance with refresh" +
" interval " + instance.refreshListSeconds + " seconds");
LOGGER.fine("Created a new cluster-aware LB instance with loadbalance = " +
instance.loadBalance + " and refresh interval " + instance.refreshListSeconds + " seconds");
}
}
}
return instance;
}

public String toString() {
return this.getClass().getSimpleName() + ": loadBalance = " +
loadBalance + ", refreshInterval = " + refreshListSeconds;
}

@Override
public boolean isHostEligible(Map.Entry<String, LoadBalanceService.NodeInfo> e) {
return !attempted.contains(e.getKey()) && !e.getValue().isDown();
public boolean isHostEligible(Map.Entry<String, LoadBalanceService.NodeInfo> e,
Byte requestFlags) {
// e.getKey() is the hostname
return !attempted.contains(e.getKey()) && !e.getValue().isDown()
&& LoadBalanceService.isRightNodeType(loadBalance, e.getValue().getNodeType(), requestFlags);
}

public synchronized String getLeastLoadedServer(boolean newRequest, List<String> failedHosts, ArrayList<String> timedOutHosts) {
LOGGER.fine("failedHosts: " + failedHosts + ", timedOutHosts: " + timedOutHosts);
public synchronized String getLeastLoadedServer(boolean newRequest, List<String> failedHosts,
ArrayList<String> timedOutHosts) {
attempted = failedHosts;
if (timedOutHosts != null) {
attempted.addAll(timedOutHosts);
}
ArrayList<String> hosts = LoadBalanceService.getAllEligibleHosts(this);
requestFlags = newRequest ? LoadBalanceService.STRICT_PREFERENCE : requestFlags;
LOGGER.fine("newRequest: " + newRequest + ", failedHosts: " + failedHosts +
", timedOutHosts: " + timedOutHosts + ", requestFlags: " + requestFlags);
String chosenHost = null;

int min = Integer.MAX_VALUE;
ArrayList<String> minConnectionsHostList = new ArrayList<>();
for (String h : hosts) {
boolean wasTimedOutHost = timedOutHosts != null && timedOutHosts.contains(h);
if (failedHosts.contains(h) || wasTimedOutHost) {
LOGGER.fine("Skipping failed host " + h + "(was timed out host=" + wasTimedOutHost +")");
continue;
while (true) {
ArrayList<String> hosts = LoadBalanceService.getAllEligibleHosts(this, requestFlags);
int min = Integer.MAX_VALUE;
ArrayList<String> minConnectionsHostList = new ArrayList<>();
for (String h : hosts) {
boolean wasTimedOutHost = timedOutHosts != null && timedOutHosts.contains(h);
if (failedHosts.contains(h) || wasTimedOutHost) {
LOGGER.fine("Skipping failed host " + h + "(was timed out host=" + wasTimedOutHost + ")");
continue;
}
int currLoad = LoadBalanceService.getLoad(h);
LOGGER.fine("Number of connections to " + h + ": " + currLoad);
if (currLoad < min) {
min = currLoad;
minConnectionsHostList.clear();
minConnectionsHostList.add(h);
} else if (currLoad == min) {
minConnectionsHostList.add(h);
}
}
int currLoad = LoadBalanceService.getLoad(h);
LOGGER.fine("Number of connections to " + h + ": " + currLoad);
if (currLoad < min) {
min = currLoad;
minConnectionsHostList.clear();
minConnectionsHostList.add(h);
} else if (currLoad == min) {
minConnectionsHostList.add(h);
// Choose a random from the minimum list
if (!minConnectionsHostList.isEmpty()) {
int idx = ThreadLocalRandom.current().nextInt(0, minConnectionsHostList.size());
chosenHost = minConnectionsHostList.get(idx);
}
if (chosenHost != null) {
LoadBalanceService.incrementConnectionCount(chosenHost);
break; // We got a host
} else if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) {
// Relax the STRICT_PREFERENCE condition and consider other node types
requestFlags = (byte) 0;
} else {
break; // No more hosts to try
}
}
// Choose a random from the minimum list
String chosenHost = null;
if (minConnectionsHostList.size() > 0) {
int idx = ThreadLocalRandom.current().nextInt(0, minConnectionsHostList.size());
chosenHost = minConnectionsHostList.get(idx);
}
if (chosenHost != null) {
LoadBalanceService.incrementConnectionCount(chosenHost);
}
LOGGER.fine("Host chosen for new connection: " + chosenHost);
if (chosenHost == null && (loadBalance == LoadBalanceType.ONLY_PRIMARY ||
loadBalance == LoadBalanceType.ONLY_RR)) {
throw new IllegalStateException("No node available in "
+ (loadBalance == LoadBalanceType.ONLY_PRIMARY ? "primary" : "read-replica")
+ " cluster to connect to.");
}
return chosenHost;
}

Expand Down
Loading

0 comments on commit 2e6a7b4

Please sign in to comment.