Skip to content

Commit

Permalink
Restore "netty:Auto adjust BDP ping frequency" with fix (grpc#9847)
Browse files Browse the repository at this point in the history
* Revert "Revert "netty:Auto adjust BDP ping frequency (grpc#9650)" (grpc#9821)"

This reverts commit a2bbe84.

* Eliminate half RTT delay in sending BDP Pings when starting up.
* Eliminate delay for bdp ping when current read would push us over the threshold.
  • Loading branch information
larry-safran authored Jan 19, 2023
1 parent ecc7cf3 commit 3cbd948
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 86 deletions.
65 changes: 48 additions & 17 deletions netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package io.grpc.netty;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import io.grpc.ChannelLogger;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand All @@ -44,6 +46,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
private boolean initialWindowSent = false;
private final Ticker ticker;

private static final long BDP_MEASUREMENT_PING = 1234;

Expand All @@ -54,20 +57,22 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
Http2Settings initialSettings,
ChannelLogger negotiationLogger,
boolean autoFlowControl,
PingLimiter pingLimiter) {
PingLimiter pingLimiter,
Ticker ticker) {
super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);

// During a graceful shutdown, wait until all streams are closed.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);

// Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (pingLimiter == null) {
pingLimiter = new AllowPingLimiter();
}
this.flowControlPing = new FlowControlPinger(pingLimiter);
this.ticker = checkNotNull(ticker, "ticker");
}

@Override
Expand Down Expand Up @@ -131,14 +136,17 @@ void setAutoTuneFlowControl(boolean isOn) {
final class FlowControlPinger {

private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
public static final int MAX_BACKOFF = 10;

private final PingLimiter pingLimiter;
private int pingCount;
private int pingReturn;
private boolean pinging;
private int dataSizeSincePing;
private float lastBandwidth; // bytes per second
private long lastBandwidth; // bytes per nanosecond
private long lastPingTime;
private int lastTargetWindow;
private int pingFrequencyMultiplier;

public FlowControlPinger(PingLimiter pingLimiter) {
Preconditions.checkNotNull(pingLimiter, "pingLimiter");
Expand All @@ -157,10 +165,24 @@ public void onDataRead(int dataLength, int paddingLength) {
if (!autoTuneFlowControlOn) {
return;
}
if (!isPinging() && pingLimiter.isPingAllowed()) {

// Note that we are double counting around the ping initiation as the current data will be
// added at the end of this method, so will be available in the next check. This at worst
// causes us to send a ping one data packet earlier, but makes startup faster if there are
// small packets before big ones.
int dataForCheck = getDataSincePing() + dataLength + paddingLength;
// Need to double the data here to account for targetWindow being set to twice the data below
if (!isPinging() && pingLimiter.isPingAllowed()
&& dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier) {
setPinging(true);
sendPing(ctx());
}

if (lastTargetWindow == 0) {
lastTargetWindow =
decoder().flowController().initialWindowSize(connection().connectionStream());
}

incrementDataSincePing(dataLength + paddingLength);
}

Expand All @@ -169,25 +191,32 @@ public void updateWindow() throws Http2Exception {
return;
}
pingReturn++;
long elapsedTime = (System.nanoTime() - lastPingTime);
setPinging(false);

long elapsedTime = (ticker.read() - lastPingTime);
if (elapsedTime == 0) {
elapsedTime = 1;
}

long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
Http2LocalFlowController fc = decoder().flowController();
// Calculate new window size by doubling the observed BDP, but cap at max window
int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
setPinging(false);
Http2LocalFlowController fc = decoder().flowController();
int currentWindow = fc.initialWindowSize(connection().connectionStream());
if (targetWindow > currentWindow && bandwidth > lastBandwidth) {
lastBandwidth = bandwidth;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) {
pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF);
return;
}

pingFrequencyMultiplier = 0; // react quickly when size is changing
lastBandwidth = bandwidth;
lastTargetWindow = targetWindow;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}

private boolean isPinging() {
Expand All @@ -200,7 +229,7 @@ private void setPinging(boolean pingOut) {

private void sendPing(ChannelHandlerContext ctx) {
setDataSizeSincePing(0);
lastPingTime = System.nanoTime();
lastPingTime = ticker.read();
encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
pingCount++;
}
Expand Down Expand Up @@ -229,10 +258,12 @@ private void setDataSizeSincePing(int dataSize) {
dataSizeSincePing = dataSize;
}

// Only used in testing
@VisibleForTesting
void setDataSizeAndSincePing(int dataSize) {
setDataSizeSincePing(dataSize);
lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1);
pingFrequencyMultiplier = 1;
lastPingTime = ticker.read() ;
}
}

Expand Down
3 changes: 2 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.CheckReturnValue;
import com.google.errorprone.annotations.InlineMe;
Expand Down Expand Up @@ -738,7 +739,7 @@ public void run() {
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(),
localSocketPicker, channelLogger, useGetForSafeMethods);
localSocketPicker, channelLogger, useGetForSafeMethods, Ticker.systemTicker());
return transport;
}

Expand Down
18 changes: 12 additions & 6 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz;
Expand Down Expand Up @@ -143,7 +144,8 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger) {
ChannelLogger negotiationLogger,
Ticker ticker) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Expand All @@ -169,7 +171,8 @@ static NettyClientHandler newHandler(
transportTracer,
eagAttributes,
authority,
negotiationLogger);
negotiationLogger,
ticker);
}

@VisibleForTesting
Expand All @@ -187,7 +190,8 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger) {
ChannelLogger negotiationLogger,
Ticker ticker) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Expand Down Expand Up @@ -237,7 +241,8 @@ static NettyClientHandler newHandler(
eagAttributes,
authority,
autoFlowControl,
pingCounter);
pingCounter,
ticker);
}

private NettyClientHandler(
Expand All @@ -253,9 +258,10 @@ private NettyClientHandler(
Attributes eagAttributes,
String authority,
boolean autoFlowControl,
PingLimiter pingLimiter) {
PingLimiter pingLimiter,
Ticker ticker) {
super(/* channelUnused= */ null, decoder, encoder, settings,
negotiationLogger, autoFlowControl, pingLimiter);
negotiationLogger, autoFlowControl, pingLimiter, ticker);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
Expand Down
9 changes: 7 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand Down Expand Up @@ -102,6 +103,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final LocalSocketPicker localSocketPicker;
private final ChannelLogger channelLogger;
private final boolean useGetForSafeMethods;
private final Ticker ticker;

NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Expand All @@ -112,7 +114,8 @@ class NettyClientTransport implements ConnectionClientTransport {
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
boolean useGetForSafeMethods) {
boolean useGetForSafeMethods, Ticker ticker) {

this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
Expand All @@ -137,6 +140,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
}

@Override
Expand Down Expand Up @@ -225,7 +229,8 @@ public Runnable start(Listener transportListener) {
transportTracer,
eagAttributes,
authorityString,
channelLogger);
channelLogger,
ticker);

ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Expand Down
19 changes: 13 additions & 6 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
Expand Down Expand Up @@ -190,7 +191,8 @@ static NettyServerHandler newHandler(
maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls,
permitKeepAliveTimeInNanos,
eagAttributes);
eagAttributes,
Ticker.systemTicker());
}

static NettyServerHandler newHandler(
Expand All @@ -212,7 +214,8 @@ static NettyServerHandler newHandler(
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos,
Attributes eagAttributes) {
Attributes eagAttributes,
Ticker ticker) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
Expand Down Expand Up @@ -245,6 +248,10 @@ static NettyServerHandler newHandler(
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);

if (ticker == null) {
ticker = Ticker.systemTicker();
}

return new NettyServerHandler(
channelUnused,
connection,
Expand All @@ -258,7 +265,7 @@ static NettyServerHandler newHandler(
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer,
autoFlowControl,
eagAttributes);
eagAttributes, ticker);
}

private NettyServerHandler(
Expand All @@ -278,9 +285,10 @@ private NettyServerHandler(
long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer,
boolean autoFlowControl,
Attributes eagAttributes) {
Attributes eagAttributes,
Ticker ticker) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
autoFlowControl, null, ticker);

final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
Expand Down Expand Up @@ -325,7 +333,6 @@ public void onStreamClosed(Http2Stream stream) {
this.transportListener = checkNotNull(transportListener, "transportListener");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");

// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
}
Expand Down
Loading

0 comments on commit 3cbd948

Please sign in to comment.