Skip to content

Commit

Permalink
Revert "netty:Auto adjust BDP ping frequency (grpc#9650)" (grpc#9821)
Browse files Browse the repository at this point in the history
This reverts commit f5e8459.
  • Loading branch information
larry-safran authored Jan 13, 2023
1 parent 2b9bd6c commit a2bbe84
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 290 deletions.
57 changes: 16 additions & 41 deletions netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package io.grpc.netty;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import io.grpc.ChannelLogger;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand All @@ -46,7 +44,6 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
private boolean initialWindowSent = false;
private final Ticker ticker;

private static final long BDP_MEASUREMENT_PING = 1234;

Expand All @@ -57,22 +54,20 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
Http2Settings initialSettings,
ChannelLogger negotiationLogger,
boolean autoFlowControl,
PingLimiter pingLimiter,
Ticker ticker) {
PingLimiter pingLimiter) {
super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);

// During a graceful shutdown, wait until all streams are closed.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);

// Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (pingLimiter == null) {
pingLimiter = new AllowPingLimiter();
}
this.flowControlPing = new FlowControlPinger(pingLimiter);
this.ticker = checkNotNull(ticker, "ticker");
}

@Override
Expand Down Expand Up @@ -136,7 +131,6 @@ void setAutoTuneFlowControl(boolean isOn) {
final class FlowControlPinger {

private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
public static final int MAX_BACKOFF = 10;

private final PingLimiter pingLimiter;
private int pingCount;
Expand All @@ -145,8 +139,6 @@ final class FlowControlPinger {
private int dataSizeSincePing;
private float lastBandwidth; // bytes per second
private long lastPingTime;
private int lastTargetWindow;
private int pingFrequencyMultiplier = 1;

public FlowControlPinger(PingLimiter pingLimiter) {
Preconditions.checkNotNull(pingLimiter, "pingLimiter");
Expand All @@ -165,18 +157,10 @@ public void onDataRead(int dataLength, int paddingLength) {
if (!autoTuneFlowControlOn) {
return;
}

if (!isPinging() && pingLimiter.isPingAllowed()
&& getDataSincePing() * 2 >= lastTargetWindow * pingFrequencyMultiplier) {
if (!isPinging() && pingLimiter.isPingAllowed()) {
setPinging(true);
sendPing(ctx());
}

if (lastTargetWindow == 0) {
lastTargetWindow =
decoder().flowController().initialWindowSize(connection().connectionStream());
}

incrementDataSincePing(dataLength + paddingLength);
}

Expand All @@ -185,32 +169,25 @@ public void updateWindow() throws Http2Exception {
return;
}
pingReturn++;
setPinging(false);

long elapsedTime = (ticker.read() - lastPingTime);
long elapsedTime = (System.nanoTime() - lastPingTime);
if (elapsedTime == 0) {
elapsedTime = 1;
}

long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
Http2LocalFlowController fc = decoder().flowController();
// Calculate new window size by doubling the observed BDP, but cap at max window
int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
Http2LocalFlowController fc = decoder().flowController();
setPinging(false);
int currentWindow = fc.initialWindowSize(connection().connectionStream());
if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) {
pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF);
return;
if (targetWindow > currentWindow && bandwidth > lastBandwidth) {
lastBandwidth = bandwidth;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}

pingFrequencyMultiplier = 1; // react more quickly when size is changing
lastBandwidth = bandwidth;
lastTargetWindow = targetWindow;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}

private boolean isPinging() {
Expand All @@ -223,7 +200,7 @@ private void setPinging(boolean pingOut) {

private void sendPing(ChannelHandlerContext ctx) {
setDataSizeSincePing(0);
lastPingTime = ticker.read();
lastPingTime = System.nanoTime();
encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
pingCount++;
}
Expand Down Expand Up @@ -252,12 +229,10 @@ private void setDataSizeSincePing(int dataSize) {
dataSizeSincePing = dataSize;
}

// Only used in testing
@VisibleForTesting
void setDataSizeAndSincePing(int dataSize) {
setDataSizeSincePing(dataSize);
pingFrequencyMultiplier = 1;
lastPingTime = ticker.read() ;
lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1);
}
}

Expand Down
3 changes: 1 addition & 2 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.CheckReturnValue;
import com.google.errorprone.annotations.InlineMe;
Expand Down Expand Up @@ -739,7 +738,7 @@ public void run() {
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(),
localSocketPicker, channelLogger, useGetForSafeMethods, Ticker.systemTicker());
localSocketPicker, channelLogger, useGetForSafeMethods);
return transport;
}

Expand Down
18 changes: 6 additions & 12 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz;
Expand Down Expand Up @@ -144,8 +143,7 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
Ticker ticker) {
ChannelLogger negotiationLogger) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Expand All @@ -171,8 +169,7 @@ static NettyClientHandler newHandler(
transportTracer,
eagAttributes,
authority,
negotiationLogger,
ticker);
negotiationLogger);
}

@VisibleForTesting
Expand All @@ -190,8 +187,7 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
Ticker ticker) {
ChannelLogger negotiationLogger) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Expand Down Expand Up @@ -241,8 +237,7 @@ static NettyClientHandler newHandler(
eagAttributes,
authority,
autoFlowControl,
pingCounter,
ticker);
pingCounter);
}

private NettyClientHandler(
Expand All @@ -258,10 +253,9 @@ private NettyClientHandler(
Attributes eagAttributes,
String authority,
boolean autoFlowControl,
PingLimiter pingLimiter,
Ticker ticker) {
PingLimiter pingLimiter) {
super(/* channelUnused= */ null, decoder, encoder, settings,
negotiationLogger, autoFlowControl, pingLimiter, ticker);
negotiationLogger, autoFlowControl, pingLimiter);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
Expand Down
9 changes: 2 additions & 7 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand Down Expand Up @@ -103,7 +102,6 @@ class NettyClientTransport implements ConnectionClientTransport {
private final LocalSocketPicker localSocketPicker;
private final ChannelLogger channelLogger;
private final boolean useGetForSafeMethods;
private final Ticker ticker;

NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Expand All @@ -114,8 +112,7 @@ class NettyClientTransport implements ConnectionClientTransport {
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
boolean useGetForSafeMethods, Ticker ticker) {

boolean useGetForSafeMethods) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
Expand All @@ -140,7 +137,6 @@ class NettyClientTransport implements ConnectionClientTransport {
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
}

@Override
Expand Down Expand Up @@ -229,8 +225,7 @@ public Runnable start(Listener transportListener) {
transportTracer,
eagAttributes,
authorityString,
channelLogger,
ticker);
channelLogger);

ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Expand Down
19 changes: 6 additions & 13 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
Expand Down Expand Up @@ -191,8 +190,7 @@ static NettyServerHandler newHandler(
maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls,
permitKeepAliveTimeInNanos,
eagAttributes,
Ticker.systemTicker());
eagAttributes);
}

static NettyServerHandler newHandler(
Expand All @@ -214,8 +212,7 @@ static NettyServerHandler newHandler(
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos,
Attributes eagAttributes,
Ticker ticker) {
Attributes eagAttributes) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
Expand Down Expand Up @@ -248,10 +245,6 @@ static NettyServerHandler newHandler(
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);

if (ticker == null) {
ticker = Ticker.systemTicker();
}

return new NettyServerHandler(
channelUnused,
connection,
Expand All @@ -265,7 +258,7 @@ static NettyServerHandler newHandler(
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer,
autoFlowControl,
eagAttributes, ticker);
eagAttributes);
}

private NettyServerHandler(
Expand All @@ -285,10 +278,9 @@ private NettyServerHandler(
long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer,
boolean autoFlowControl,
Attributes eagAttributes,
Ticker ticker) {
Attributes eagAttributes) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null, ticker);
autoFlowControl, null);

final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
Expand Down Expand Up @@ -333,6 +325,7 @@ public void onStreamClosed(Http2Stream stream) {
this.transportListener = checkNotNull(transportListener, "transportListener");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");

// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
}
Expand Down
Loading

0 comments on commit a2bbe84

Please sign in to comment.