From c03b8a4245beab7258b8ed8ec76153e6a0275211 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 7 Oct 2022 12:59:46 -0400 Subject: [PATCH] fix: catch all throwables so version mismatch won't hang the client (#1402) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: catch all throwables so version mismatch won't hang the client * create a SafeResponseObserver * format * extend SafeResponseObserver * catch stream cancellation * update error log * update * throw on onStart * 🦉 Updates from OwlBot post-processor See /~https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix version Co-authored-by: Owl Bot --- .../clirr-ignored-differences.xml | 7 +- .../v2/stub/ConvertExceptionCallable.java | 11 +- .../data/v2/stub/SafeResponseObserver.java | 123 ++++++++++++++++++ .../BigtableTracerStreamingCallable.java | 17 ++- .../readrows/FilterMarkerRowsCallable.java | 12 +- .../reframing/ReframingResponseObserver.java | 6 +- 6 files changed, 157 insertions(+), 19 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 8a3edd69c0..2f7631c873 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -49,7 +49,7 @@ 8001 com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer$Builder - change method args is ok because HeaderTracerStreamingCallable is InternalApi + 7004 com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable @@ -76,4 +76,9 @@ 8001 com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable + + 5001 + com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver + com/google/api/gax/rpc/StateCheckingResponseObserver + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java index ed50532fae..d3ff88af7e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java @@ -42,31 +42,32 @@ public void call( innerCallable.call(request, observer, context); } - private class ReadRowsConvertExceptionResponseObserver implements ResponseObserver { + private class ReadRowsConvertExceptionResponseObserver extends SafeResponseObserver { private final ResponseObserver outerObserver; ReadRowsConvertExceptionResponseObserver(ResponseObserver outerObserver) { + super(outerObserver); this.outerObserver = outerObserver; } @Override - public void onStart(StreamController controller) { + protected void onStartImpl(StreamController controller) { outerObserver.onStart(controller); } @Override - public void onResponse(RowT response) { + protected void onResponseImpl(RowT response) { outerObserver.onResponse(response); } @Override - public void onError(Throwable t) { + protected void onErrorImpl(Throwable t) { outerObserver.onError(convertException(t)); } @Override - public void onComplete() { + protected void onCompleteImpl() { outerObserver.onComplete(); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java new file mode 100644 index 0000000000..7c65bdf95a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java @@ -0,0 +1,123 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Base implementation of {@link ResponseObserver} that checks the state and catches all the + * throwables. + */ +@InternalApi +public abstract class SafeResponseObserver implements ResponseObserver { + + private static final Logger LOGGER = Logger.getLogger(SafeResponseObserver.class.getName()); + private AtomicBoolean isStarted = new AtomicBoolean(false); + private AtomicBoolean isClosed = new AtomicBoolean(false); + private StreamController streamController; + private ResponseObserver outerObserver; + + public SafeResponseObserver(ResponseObserver outerObserver) { + this.outerObserver = outerObserver; + } + + @Override + public final void onStart(StreamController streamController) { + if (!isStarted.compareAndSet(false, true)) { + throw new IllegalStateException("A stream is already started"); + } + + this.streamController = streamController; + try { + onStartImpl(streamController); + } catch (Throwable t) { + if (!isClosed.compareAndSet(false, true)) { + logException("Tried to cancel a closed stream"); + return; + } + streamController.cancel(); + outerObserver.onError(t); + } + } + + @Override + public final void onResponse(ResponseT response) { + if (isClosed.get()) { + logException("Received a response after the stream is closed"); + return; + } + try { + onResponseImpl(response); + } catch (Throwable t1) { + try { + if (!isClosed.compareAndSet(false, true)) { + logException("Tried to cancel a closed stream"); + return; + } + streamController.cancel(); + } catch (Throwable t2) { + t1.addSuppressed(t2); + } + outerObserver.onError(t1); + } + } + + @Override + public final void onError(Throwable throwable) { + if (!isClosed.compareAndSet(false, true)) { + logException("Received error after the stream is closed"); + return; + } + + try { + onErrorImpl(throwable); + } catch (Throwable t) { + throwable.addSuppressed(t); + outerObserver.onError(throwable); + } + } + + @Override + public final void onComplete() { + if (!isClosed.compareAndSet(false, true)) { + logException("Tried to double close the stream"); + return; + } + + try { + onCompleteImpl(); + } catch (Throwable t) { + outerObserver.onError(t); + } + } + + private void logException(String message) { + LOGGER.log(Level.WARNING, message, new IllegalStateException(message)); + } + + protected abstract void onStartImpl(StreamController streamController); + + protected abstract void onResponseImpl(ResponseT response); + + protected abstract void onErrorImpl(Throwable throwable); + + protected abstract void onCompleteImpl(); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index c7f09c4db1..5ec4c726e4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.ResponseParams; +import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.protobuf.InvalidProtocolBufferException; @@ -68,7 +69,7 @@ public void call( } } - private class BigtableTracerResponseObserver implements ResponseObserver { + private class BigtableTracerResponseObserver extends SafeResponseObserver { private final BigtableTracer tracer; private final ResponseObserver outerObserver; @@ -78,26 +79,28 @@ private class BigtableTracerResponseObserver implements ResponseObser ResponseObserver observer, BigtableTracer tracer, GrpcResponseMetadata metadata) { + super(observer); + this.tracer = tracer; this.outerObserver = observer; this.responseMetadata = metadata; } @Override - public void onStart(final StreamController controller) { + protected void onStartImpl(final StreamController controller) { TracedStreamController tracedController = new TracedStreamController(controller, tracer); outerObserver.onStart(tracedController); } @Override - public void onResponse(ResponseT response) { + protected void onResponseImpl(ResponseT response) { Stopwatch stopwatch = Stopwatch.createStarted(); outerObserver.onResponse(response); tracer.afterResponse(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } @Override - public void onError(Throwable t) { + protected void onErrorImpl(Throwable t) { // server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata), // so it's not checking trailing metadata here. Metadata metadata = responseMetadata.getMetadata(); @@ -122,13 +125,14 @@ public void onError(Throwable t) { } } } catch (InvalidProtocolBufferException e) { + t.addSuppressed(t); } outerObserver.onError(t); } @Override - public void onComplete() { + protected void onCompleteImpl() { Metadata metadata = responseMetadata.getMetadata(); Long latency = Util.getGfeLatency(metadata); tracer.recordGfeMetadata(latency, null); @@ -151,6 +155,9 @@ public void onComplete() { } } } catch (InvalidProtocolBufferException e) { + // InvalidProtocolBufferException will only throw if something changed on + // the server side. Location info won't be populated as a result. Ignore + // this error and don't bubble it up to user. } outerObserver.onComplete(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallable.java index 57f987fb7c..181006b6c3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallable.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.ReadRowsRequest; import com.google.cloud.bigtable.data.v2.models.RowAdapter; +import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver; /** * Remove the special marker rows generated by {@link RowMergingCallable}. @@ -47,17 +48,18 @@ public void call( innerCallable.call(request, innerObserver, context); } - private class FilteringResponseObserver implements ResponseObserver { + private class FilteringResponseObserver extends SafeResponseObserver { private final ResponseObserver outerObserver; private StreamController innerController; private boolean autoFlowControl = true; FilteringResponseObserver(ResponseObserver outerObserver) { + super(outerObserver); this.outerObserver = outerObserver; } @Override - public void onStart(final StreamController controller) { + protected void onStartImpl(final StreamController controller) { innerController = controller; outerObserver.onStart( @@ -81,7 +83,7 @@ public void request(int count) { } @Override - public void onResponse(RowT response) { + protected void onResponseImpl(RowT response) { if (rowAdapter.isScanMarkerRow(response)) { if (!autoFlowControl) { innerController.request(1); @@ -92,12 +94,12 @@ public void onResponse(RowT response) { } @Override - public void onError(Throwable t) { + protected void onErrorImpl(Throwable t) { outerObserver.onError(t); } @Override - public void onComplete() { + protected void onCompleteImpl() { outerObserver.onComplete(); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java index c46eb55ff9..6f2440fff7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java @@ -17,8 +17,8 @@ import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.StateCheckingResponseObserver; import com.google.api.gax.rpc.StreamController; +import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver; import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import java.util.concurrent.CancellationException; @@ -56,8 +56,7 @@ * } */ @InternalApi -public class ReframingResponseObserver - extends StateCheckingResponseObserver { +public class ReframingResponseObserver extends SafeResponseObserver { // Used as a nonblocking mutex for deliver(). // 0 means unlocked // 1 means locked without contention @@ -97,6 +96,7 @@ public class ReframingResponseObserver public ReframingResponseObserver( ResponseObserver observer, Reframer reframer) { + super(observer); this.outerResponseObserver = observer; this.reframer = reframer; }