Skip to content

Commit

Permalink
Load Shedding: TCP Vegas and priority load shedding
Browse files Browse the repository at this point in the history
The overload detector uses a TCP Vegas based algorithm,
as implemented by Netflix Concurrency Limiters.

Priority load shedding uses 5 priority levels and 128 cohorts.
A simple cubic function is used to determine whether current CPU
load is over the limit for the current request.
  • Loading branch information
Ladicek committed May 17, 2024
1 parent 455ec6c commit 7a4c509
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 1 deletion.
10 changes: 10 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,16 @@
<artifactId>quarkus-smallrye-openapi-common-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-load-shedding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-load-shedding-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
Expand Down
52 changes: 52 additions & 0 deletions extensions/load-shedding/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-load-shedding-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-load-shedding-deployment</artifactId>

<name>Quarkus - Load Shedding - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-load-shedding</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http-deployment</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.load.shedding.deployment;

import java.util.ArrayList;
import java.util.List;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.load.shedding.runtime.HttpLoadShedding;
import io.quarkus.load.shedding.runtime.HttpRequestClassifier;
import io.quarkus.load.shedding.runtime.ManagementRequestPrioritizer;
import io.quarkus.load.shedding.runtime.OverloadDetector;
import io.quarkus.load.shedding.runtime.PriorityLoadShedding;

public class LoadSheddingProcessor {
private static final String FEATURE = "load-shedding";

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
AdditionalBeanBuildItem beans() {
List<String> beans = new ArrayList<>();
beans.add(OverloadDetector.class.getName());
beans.add(HttpLoadShedding.class.getName());
beans.add(PriorityLoadShedding.class.getName());
beans.add(ManagementRequestPrioritizer.class.getName());
beans.add(HttpRequestClassifier.class.getName());

return AdditionalBeanBuildItem.builder().addBeanClasses(beans).build();
}
}
24 changes: 24 additions & 0 deletions extensions/load-shedding/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-extensions-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>quarkus-load-shedding-parent</artifactId>
<packaging>pom</packaging>

<name>Quarkus - Load Shedding</name>

<modules>
<module>deployment</module>
<module>runtime</module>
</modules>

</project>
44 changes: 44 additions & 0 deletions extensions/load-shedding/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-load-shedding-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-load-shedding</artifactId>

<name>Quarkus - Load Shedding - Runtime</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.quarkus.load.shedding;

public interface RequestClassifier<R> {
int MIN_COHORT = 1;
int MAX_COHORT = 128;

boolean appliesTo(Object request);

int cohort(R request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.load.shedding;

public interface RequestPrioritizer<R> {
boolean appliesTo(Object request);

RequestPriority priority(R request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.load.shedding;

public enum RequestPriority {
CRITICAL(0),
IMPORTANT(1),
NORMAL(2),
BACKGROUND(3),
DEGRADED(4),
;

private final int value;

RequestPriority(int value) {
this.value = value;
}

public int value() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.quarkus.load.shedding.runtime;

import jakarta.annotation.Priority;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Singleton;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;

@Singleton
public class HttpLoadShedding {
public void init(@Observes @Priority(-1_000_000_000) Router router, OverloadDetector detector,
PriorityLoadShedding priorityLoadShedding, LoadSheddingRuntimeConfig config) {

if (!config.enabled()) {
return;
}

router.route().order(-1_000_000_000).handler(ctx -> {
if (detector.isOverloaded() && priorityLoadShedding.shedLoad(ctx.request())) {
HttpServerResponse response = ctx.response();
response.setStatusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.code());
response.headers().add(HttpHeaderNames.CONNECTION, "close");
response.endHandler(new Handler<Void>() {
@Override
public void handle(Void ignored) {
ctx.request().connection().close();
}
});
response.end();
} else {
detector.requestBegin();
long start = System.nanoTime();
ctx.addEndHandler(new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ignored) {
long end = System.nanoTime();
detector.requestEnd((end - start) / 1_000);
}
});
ctx.next();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.load.shedding.runtime;

import jakarta.inject.Singleton;

import io.quarkus.load.shedding.RequestClassifier;
import io.vertx.core.http.HttpServerRequest;

@Singleton
public class HttpRequestClassifier implements RequestClassifier<HttpServerRequest> {
@Override
public boolean appliesTo(Object request) {
return request instanceof HttpServerRequest;
}

@Override
public int cohort(HttpServerRequest request) {
long hour = System.currentTimeMillis() >> 23; // roughly 2.5 hours
String host = request.remoteAddress().hostAddress(); // TODO proxying, load balancing, etc.?
if (host == null) {
host = "";
}
long hash = hour + host.hashCode();
return (int) ((hash >> 56) & 0xFF + (hash >> 48) & 0xFF + (hash >> 40) & 0xFF + (hash >> 32) & 0xFF
+ (hash >> 24) & 0xFF + (hash >> 16) & 0xFF + (hash >> 8) & 0xFF + (hash) & 0xFF);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.quarkus.load.shedding.runtime;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;

@ConfigMapping(prefix = "quarkus.load-shedding")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface LoadSheddingRuntimeConfig {
/**
* Whether load shedding should be enabled.
* Currently, this only applies to incoming HTTP requests.
*/
@WithDefault("true")
boolean enabled();

/**
* The maximum number of concurrent requests allowed.
*/
@WithDefault("1000")
int maxLimit();

/**
* The {@code alpha} factor of the Vegas overload detection algorithm.
*/
@WithDefault("3")
int alphaFactor();

/**
* The {@code beta} factor of the Vegas overload detection algorithm.
*/
@WithDefault("6")
int betaFactor();

/**
* The probe factor of the Vegas overload detection algorithm.
*/
@WithDefault("30.0")
double probeFactor();

/**
* The initial limit of concurrent requests allowed.
*/
@WithDefault("100")
int initialLimit();

/**
* Configuration of priority load shedding.
*/
PriorityLoadShedding priority();

@ConfigGroup
interface PriorityLoadShedding {
/**
* Whether priority load shedding should be enabled.
*/
@WithDefault("true")
boolean enabled();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.quarkus.load.shedding.runtime;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import io.quarkus.load.shedding.RequestPrioritizer;
import io.quarkus.load.shedding.RequestPriority;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.quarkus.vertx.http.runtime.management.ManagementInterfaceBuildTimeConfig;
import io.vertx.core.http.HttpServerRequest;

@Singleton
public class ManagementRequestPrioritizer implements RequestPrioritizer<HttpServerRequest> {
private final String managementPath;

@Inject
public ManagementRequestPrioritizer(HttpBuildTimeConfig httpConfig,
ManagementInterfaceBuildTimeConfig managementInterfaceConfig) {
if (managementInterfaceConfig.enabled) {
managementPath = null;
return;
}
if (httpConfig.nonApplicationRootPath.startsWith("/")) {
managementPath = httpConfig.nonApplicationRootPath;
return;
}
managementPath = httpConfig.rootPath + httpConfig.nonApplicationRootPath;
}

@Override
public boolean appliesTo(Object request) {
if (managementPath != null && request instanceof HttpServerRequest httpRequest) {
return httpRequest.path().startsWith(managementPath);
}
return false;
}

@Override
public RequestPriority priority(HttpServerRequest request) {
return RequestPriority.CRITICAL;
}
}
Loading

0 comments on commit 7a4c509

Please sign in to comment.