Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty:Auto adjust BDP ping frequency #9650

Merged
merged 13 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 51 additions & 19 deletions netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import io.grpc.ChannelLogger;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand All @@ -44,6 +45,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
private boolean initialWindowSent = false;
private final Ticker ticker;

private static final long BDP_MEASUREMENT_PING = 1234;

Expand All @@ -55,19 +57,31 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
ChannelLogger negotiationLogger,
boolean autoFlowControl,
PingLimiter pingLimiter) {
this(channelUnused, decoder, encoder, initialSettings, negotiationLogger, autoFlowControl,
pingLimiter, Ticker.systemTicker());
}

AbstractNettyHandler(
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
ChannelPromise channelUnused,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings,
ChannelLogger negotiationLogger,
boolean autoFlowControl,
PingLimiter pingLimiter,
Ticker ticker) {
super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);

// During a graceful shutdown, wait until all streams are closed.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);

// Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (pingLimiter == null) {
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
pingLimiter = new AllowPingLimiter();
}
this.flowControlPing = new FlowControlPinger(pingLimiter);
PingLimiter activePingLimiter = (pingLimiter != null) ? pingLimiter : new AllowPingLimiter();
this.flowControlPing = new FlowControlPinger(activePingLimiter);
this.ticker = (ticker != null) ? ticker : Ticker.systemTicker();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -131,6 +145,7 @@ void setAutoTuneFlowControl(boolean isOn) {
final class FlowControlPinger {

private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
public static final int MAX_BACKOFF = 10;

private final PingLimiter pingLimiter;
private int pingCount;
Expand All @@ -139,6 +154,8 @@ final class FlowControlPinger {
private int dataSizeSincePing;
private float lastBandwidth; // bytes per second
private long lastPingTime;
private int lastTargetWindow;
private int pingFrequencyMultiplier = 1;

public FlowControlPinger(PingLimiter pingLimiter) {
Preconditions.checkNotNull(pingLimiter, "pingLimiter");
Expand All @@ -157,7 +174,12 @@ public void onDataRead(int dataLength, int paddingLength) {
if (!autoTuneFlowControlOn) {
return;
}
if (!isPinging() && pingLimiter.isPingAllowed()) {
if (lastTargetWindow == 0 && pingCount > 0) {
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
lastTargetWindow =
decoder().flowController().initialWindowSize(connection().connectionStream());
}
if (!isPinging() && pingLimiter.isPingAllowed()
&& getDataSincePing() * 2 >= lastTargetWindow * pingFrequencyMultiplier) {
setPinging(true);
sendPing(ctx());
}
Expand All @@ -169,25 +191,32 @@ public void updateWindow() throws Http2Exception {
return;
}
pingReturn++;
long elapsedTime = (System.nanoTime() - lastPingTime);
setPinging(false);

long elapsedTime = (ticker.read() - lastPingTime);
if (elapsedTime == 0) {
elapsedTime = 1;
}

long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
Http2LocalFlowController fc = decoder().flowController();
// Calculate new window size by doubling the observed BDP, but cap at max window
int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
setPinging(false);
Http2LocalFlowController fc = decoder().flowController();
int currentWindow = fc.initialWindowSize(connection().connectionStream());
if (targetWindow > currentWindow && bandwidth > lastBandwidth) {
lastBandwidth = bandwidth;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) {
pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF);
return;
}

pingFrequencyMultiplier = 1; // react more quickly when size is changing
lastBandwidth = bandwidth;
lastTargetWindow = targetWindow;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}

private boolean isPinging() {
Expand All @@ -200,7 +229,7 @@ private void setPinging(boolean pingOut) {

private void sendPing(ChannelHandlerContext ctx) {
setDataSizeSincePing(0);
lastPingTime = System.nanoTime();
lastPingTime = ticker.read();
encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
pingCount++;
}
Expand Down Expand Up @@ -232,7 +261,10 @@ private void setDataSizeSincePing(int dataSize) {
@VisibleForTesting
void setDataSizeAndSincePing(int dataSize) {
setDataSizeSincePing(dataSize);
lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1);
pingFrequencyMultiplier = 1;
lastPingTime = (ticker.read() > TimeUnit.SECONDS.toNanos(1))
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
? ticker.read() - TimeUnit.SECONDS.toNanos(1)
: ticker.read() - 1;
}
}

Expand Down
18 changes: 12 additions & 6 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz;
Expand Down Expand Up @@ -143,7 +144,8 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger) {
ChannelLogger negotiationLogger,
Ticker ticker) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Expand All @@ -169,7 +171,8 @@ static NettyClientHandler newHandler(
transportTracer,
eagAttributes,
authority,
negotiationLogger);
negotiationLogger,
ticker);
}

@VisibleForTesting
Expand All @@ -187,7 +190,8 @@ static NettyClientHandler newHandler(
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger) {
ChannelLogger negotiationLogger,
Ticker ticker) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Expand Down Expand Up @@ -237,7 +241,8 @@ static NettyClientHandler newHandler(
eagAttributes,
authority,
autoFlowControl,
pingCounter);
pingCounter,
ticker);
}

private NettyClientHandler(
Expand All @@ -253,9 +258,10 @@ private NettyClientHandler(
Attributes eagAttributes,
String authority,
boolean autoFlowControl,
PingLimiter pingLimiter) {
PingLimiter pingLimiter,
Ticker ticker) {
super(/* channelUnused= */ null, decoder, encoder, settings,
negotiationLogger, autoFlowControl, pingLimiter);
negotiationLogger, autoFlowControl, pingLimiter, ticker);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
Expand Down
25 changes: 24 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;


larry-safran marked this conversation as resolved.
Show resolved Hide resolved
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand Down Expand Up @@ -102,6 +104,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final LocalSocketPicker localSocketPicker;
private final ChannelLogger channelLogger;
private final boolean useGetForSafeMethods;
private final Ticker ticker;

NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Expand All @@ -113,6 +116,24 @@ class NettyClientTransport implements ConnectionClientTransport {
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
boolean useGetForSafeMethods) {
this(address,channelFactory, channelOptions, group, negotiator, autoFlowControl,
flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanos,
keepAliveTimeoutNanos, keepAliveWithoutCalls, authority, userAgent, tooManyPingsRunnable,
transportTracer, eagAttributes, localSocketPicker, channelLogger, useGetForSafeMethods,
Ticker.systemTicker());
}

NettyClientTransport(
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ProtocolNegotiator negotiator, boolean autoFlowControl, int flowControlWindow,
int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
boolean useGetForSafeMethods, Ticker ticker) {

this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
Expand All @@ -137,6 +158,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
this.ticker = (ticker != null) ? ticker : Ticker.systemTicker();
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -225,7 +247,8 @@ public Runnable start(Listener transportListener) {
transportTracer,
eagAttributes,
authorityString,
channelLogger);
channelLogger,
ticker);

ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Expand Down
42 changes: 38 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Ticker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
Expand Down Expand Up @@ -213,6 +214,35 @@ static NettyServerHandler newHandler(
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos,
Attributes eagAttributes) {
return newHandler(channelUnused, frameReader,frameWriter, transportListener,
streamTracerFactories, transportTracer, maxStreams, autoFlowControl, flowControlWindow,
maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, eagAttributes,
Ticker.systemTicker());
}

static NettyServerHandler newHandler(
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
ChannelPromise channelUnused,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
ServerTransportListener transportListener,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer,
int maxStreams,
boolean autoFlowControl,
int flowControlWindow,
int maxHeaderListSize,
int maxMessageSize,
long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos,
Attributes eagAttributes,
Ticker ticker) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
Expand Down Expand Up @@ -245,6 +275,10 @@ static NettyServerHandler newHandler(
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);

if (ticker == null) {
ticker = Ticker.systemTicker();
}

return new NettyServerHandler(
channelUnused,
connection,
Expand All @@ -258,7 +292,7 @@ static NettyServerHandler newHandler(
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer,
autoFlowControl,
eagAttributes);
eagAttributes, ticker);
}

private NettyServerHandler(
Expand All @@ -278,9 +312,10 @@ private NettyServerHandler(
long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer,
boolean autoFlowControl,
Attributes eagAttributes) {
Attributes eagAttributes,
Ticker ticker) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
autoFlowControl, null, ticker);

final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
Expand Down Expand Up @@ -325,7 +360,6 @@ public void onStreamClosed(Http2Stream stream) {
this.transportListener = checkNotNull(transportListener, "transportListener");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");

// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
}
Expand Down
Loading