Skip to content

Commit

Permalink
core: delay retriable stream master listener close until all sub stre…
Browse files Browse the repository at this point in the history
…ams are closed (grpc#9754)

This helps to prevent retryable stream from using calloptions.executor when it shouldn't, e.g. call is already notified closed. It is done by delaying notifying upper layer (through masterListener).
  • Loading branch information
YifeiZhuang authored Jan 12, 2023
1 parent ce86090 commit 2b9bd6c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
57 changes: 40 additions & 17 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
private Status savedCancellationReason;
private SavedCloseMasterListenerReason savedCloseMasterListenerReason;

// Used for recording the share of buffer used for the current call out of the channel buffer.
// This field would not be necessary if there is no channel buffer limit.
Expand Down Expand Up @@ -222,9 +222,10 @@ private void commitAndRun(Substream winningSubstream) {
}
}

@Nullable // returns null when cancelled
// returns null means we should not create new sub streams, e.g. cancelled or
// other close condition is met for retriableStream.
@Nullable
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
// increment only when >= 0, i.e. not cancelled
int inFlight;
do {
inFlight = inFlightSubStreams.get();
Expand Down Expand Up @@ -506,11 +507,8 @@ public final void cancel(final Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
savedCancellationReason = reason;
runnable.run();
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
}
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

Expand Down Expand Up @@ -816,14 +814,30 @@ private void freezeHedging() {
}

private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
metadata);
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
}
}

private static final class SavedCloseMasterListenerReason {
private final Status status;
private final RpcProgress progress;
private final Metadata metadata;

SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
this.status = status;
this.progress = progress;
this.metadata = metadata;
}
}

private interface BufferEntry {
Expand Down Expand Up @@ -864,8 +878,17 @@ public void closed(
}

if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCancellationReason != null;
safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata());
assert savedCloseMasterListenerReason != null;
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(savedCloseMasterListenerReason.status,
savedCloseMasterListenerReason.progress,
savedCloseMasterListenerReason.metadata);
}
});
return;
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2151,14 +2151,19 @@ public Void answer(InvocationOnMock in) {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(
Status.CANCELLED, PROCESSED, new Metadata());
sublistenerCaptor4.getValue().closed(
Status.CANCELLED, PROCESSED, new Metadata());
inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();

insight = new InsightBuilder();
hedgingStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:81]]");
"[closed=[UNAVAILABLE, INTERNAL, CANCELLED, CANCELLED], "
+ "committed=[remote_addr=2.2.2.2:81]]");
}

@Test
Expand Down Expand Up @@ -2425,6 +2430,7 @@ public void hedging_pushback_positive() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor3.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
inOrder.verify(masterListener).closed(fatal, PROCESSED, metadata);
inOrder.verifyNoMoreInteractions();
}
Expand Down Expand Up @@ -2605,6 +2611,8 @@ public void hedging_transparentRetry() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
sublistenerCaptor4.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
verify(masterListener).closed(status, REFUSED, metadata);
}

Expand Down Expand Up @@ -2645,6 +2653,9 @@ public void hedging_transparentRetryNotAllowed() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue()
.closed(Status.CANCELLED, REFUSED, new Metadata());
//master listener close should wait until all substreams are closed
verify(masterListener).closed(status, REFUSED, metadata);
}

Expand Down

0 comments on commit 2b9bd6c

Please sign in to comment.