From 62e939884a5e3a2258df0c09e0bcd58195191f1e Mon Sep 17 00:00:00 2001 From: Jeff Gulbronson Date: Thu, 1 Feb 2024 15:40:24 -0500 Subject: [PATCH] Fix RealGrpcCall timeout --- .../squareup/wire/internal/LateInitTimeout.kt | 29 --------------- .../squareup/wire/internal/RealGrpcCall.kt | 5 +-- .../wire/internal/RealGrpcStreamingCall.kt | 5 +-- .../java/com/squareup/wire/GrpcClientTest.kt | 35 ++++++++++++++++++- 4 files changed, 40 insertions(+), 34 deletions(-) delete mode 100644 wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/LateInitTimeout.kt diff --git a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/LateInitTimeout.kt b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/LateInitTimeout.kt deleted file mode 100644 index cb9417058e..0000000000 --- a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/LateInitTimeout.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) 2019 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.squareup.wire.internal - -import java.util.concurrent.TimeUnit -import okio.ForwardingTimeout -import okio.Timeout - -internal class LateInitTimeout : ForwardingTimeout(Timeout()) { - fun init(newDelegate: Timeout) { - val oldDelegate = this.delegate - newDelegate.timeout(oldDelegate.timeoutNanos(), TimeUnit.NANOSECONDS) - if (oldDelegate.hasDeadline()) newDelegate.deadlineNanoTime(oldDelegate.deadlineNanoTime()) - delegate = newDelegate - } -} diff --git a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcCall.kt b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcCall.kt index b3b146de05..475c84c1f4 100644 --- a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcCall.kt +++ b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcCall.kt @@ -26,6 +26,7 @@ import kotlin.coroutines.resumeWithException import kotlinx.coroutines.suspendCancellableCoroutine import okhttp3.Callback import okhttp3.Response +import okio.ForwardingTimeout import okio.IOException import okio.Timeout @@ -37,7 +38,7 @@ internal class RealGrpcCall( private var call: Call? = null private var canceled = false - override val timeout: Timeout = LateInitTimeout() + override val timeout: Timeout = ForwardingTimeout(Timeout()) override var requestMetadata: Map = mapOf() @@ -142,7 +143,7 @@ internal class RealGrpcCall( val result = grpcClient.newCall(method, requestMetadata, requestBody) this.call = result if (canceled) result.cancel() - (timeout as LateInitTimeout).init(result.timeout()) + (timeout as ForwardingTimeout).setDelegate(result.timeout()) return result } } diff --git a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcStreamingCall.kt b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcStreamingCall.kt index e69dcc3e02..c84623bbc1 100644 --- a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcStreamingCall.kt +++ b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/internal/RealGrpcStreamingCall.kt @@ -30,6 +30,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.launch +import okio.ForwardingTimeout import okio.Timeout internal class RealGrpcStreamingCall( @@ -42,7 +43,7 @@ internal class RealGrpcStreamingCall( private var call: Call? = null private var canceled = false - override val timeout: Timeout = LateInitTimeout() + override val timeout: Timeout = ForwardingTimeout(Timeout()) init { timeout.clearTimeout() @@ -132,7 +133,7 @@ internal class RealGrpcStreamingCall( val result = grpcClient.newCall(method, requestMetadata, requestBody) this.call = result if (canceled) result.cancel() - (timeout as LateInitTimeout).init(result.timeout()) + (timeout as ForwardingTimeout).setDelegate(result.timeout()) return result } } diff --git a/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcClientTest.kt b/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcClientTest.kt index c755dab0ca..8685c4f25e 100644 --- a/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcClientTest.kt +++ b/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcClientTest.kt @@ -17,6 +17,7 @@ package com.squareup.wire +import com.google.common.util.concurrent.SettableFuture import com.squareup.wire.MockRouteGuideService.Action.Delay import com.squareup.wire.MockRouteGuideService.Action.ReceiveCall import com.squareup.wire.MockRouteGuideService.Action.ReceiveComplete @@ -26,7 +27,6 @@ import com.squareup.wire.MockRouteGuideService.Action.SendMessage import io.grpc.Metadata import io.grpc.Status import io.grpc.StatusException -import java.io.IOException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference @@ -53,6 +53,7 @@ import okhttp3.ResponseBody import okhttp3.ResponseBody.Companion.toResponseBody import okio.Buffer import okio.ByteString +import okio.IOException import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Assert.assertThrows @@ -85,6 +86,9 @@ class GrpcClientTest { private lateinit var incompatibleRouteGuideService: IncompatibleRouteGuideClient private var callReference = AtomicReference() + // Just set to a large enough value we won't timeout in tests + private val okHttpClientTimeout = Duration.ofSeconds(117) + /** This is a pass through interceptor that tests can replace without extra plumbing. */ private var interceptor: Interceptor = object : Interceptor { override fun intercept(chain: Chain) = chain.proceed(chain.request()) @@ -98,6 +102,7 @@ class GrpcClientTest { interceptor.intercept(chain) } .protocols(listOf(H2_PRIOR_KNOWLEDGE)) + .callTimeout(okHttpClientTimeout) .build() grpcClient = GrpcClient.Builder() .client(okhttpClient) @@ -1656,6 +1661,34 @@ class GrpcClientTest { } } + @Test + fun grpcCallCallTimeoutIsPreservedFromHttpClient() { + val call = routeGuideService.GetFeature() + + val callbackCallFuture = SettableFuture.create>() + + call.enqueue( + Point(1, 1), + object : GrpcCall.Callback { + override fun onFailure( + call: GrpcCall, + exception: IOException, + ) { + callbackCallFuture.set(call) + } + + override fun onSuccess( + call: GrpcCall, + response: Feature, + ) { + callbackCallFuture.set(call) + } + }, + ) + + assertThat(callbackCallFuture.get().timeout.timeoutNanos()).isEqualTo(okHttpClientTimeout.toNanos()) + } + private fun removeGrpcStatusInterceptor(): Interceptor { val noTrailersResponse = noTrailersResponse() assertThat(noTrailersResponse.trailers().size).isEqualTo(0)