Skip to content

Commit

Permalink
Fix RealGrpcCall timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
JGulbronson committed Feb 1, 2024
1 parent 0dc11fb commit 62e9398
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 34 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.Callback
import okhttp3.Response
import okio.ForwardingTimeout
import okio.IOException
import okio.Timeout

Expand All @@ -37,7 +38,7 @@ internal class RealGrpcCall<S : Any, R : Any>(
private var call: Call? = null
private var canceled = false

override val timeout: Timeout = LateInitTimeout()
override val timeout: Timeout = ForwardingTimeout(Timeout())

override var requestMetadata: Map<String, String> = mapOf()

Expand Down Expand Up @@ -142,7 +143,7 @@ internal class RealGrpcCall<S : Any, R : Any>(
val result = grpcClient.newCall(method, requestMetadata, requestBody)
this.call = result
if (canceled) result.cancel()
(timeout as LateInitTimeout).init(result.timeout())
(timeout as ForwardingTimeout).setDelegate(result.timeout())
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import okio.ForwardingTimeout
import okio.Timeout

internal class RealGrpcStreamingCall<S : Any, R : Any>(
Expand All @@ -42,7 +43,7 @@ internal class RealGrpcStreamingCall<S : Any, R : Any>(
private var call: Call? = null
private var canceled = false

override val timeout: Timeout = LateInitTimeout()
override val timeout: Timeout = ForwardingTimeout(Timeout())

init {
timeout.clearTimeout()
Expand Down Expand Up @@ -132,7 +133,7 @@ internal class RealGrpcStreamingCall<S : Any, R : Any>(
val result = grpcClient.newCall(method, requestMetadata, requestBody)
this.call = result
if (canceled) result.cancel()
(timeout as LateInitTimeout).init(result.timeout())
(timeout as ForwardingTimeout).setDelegate(result.timeout())
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.squareup.wire

import com.google.common.util.concurrent.SettableFuture
import com.squareup.wire.MockRouteGuideService.Action.Delay
import com.squareup.wire.MockRouteGuideService.Action.ReceiveCall
import com.squareup.wire.MockRouteGuideService.Action.ReceiveComplete
Expand All @@ -26,7 +27,6 @@ import com.squareup.wire.MockRouteGuideService.Action.SendMessage
import io.grpc.Metadata
import io.grpc.Status
import io.grpc.StatusException
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -53,6 +53,7 @@ import okhttp3.ResponseBody
import okhttp3.ResponseBody.Companion.toResponseBody
import okio.Buffer
import okio.ByteString
import okio.IOException
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Assert.assertThrows
Expand Down Expand Up @@ -85,6 +86,9 @@ class GrpcClientTest {
private lateinit var incompatibleRouteGuideService: IncompatibleRouteGuideClient
private var callReference = AtomicReference<Call>()

// Just set to a large enough value we won't timeout in tests
private val okHttpClientTimeout = Duration.ofSeconds(117)

/** This is a pass through interceptor that tests can replace without extra plumbing. */
private var interceptor: Interceptor = object : Interceptor {
override fun intercept(chain: Chain) = chain.proceed(chain.request())
Expand All @@ -98,6 +102,7 @@ class GrpcClientTest {
interceptor.intercept(chain)
}
.protocols(listOf(H2_PRIOR_KNOWLEDGE))
.callTimeout(okHttpClientTimeout)
.build()
grpcClient = GrpcClient.Builder()
.client(okhttpClient)
Expand Down Expand Up @@ -1656,6 +1661,34 @@ class GrpcClientTest {
}
}

@Test
fun grpcCallCallTimeoutIsPreservedFromHttpClient() {
val call = routeGuideService.GetFeature()

val callbackCallFuture = SettableFuture.create<GrpcCall<Point, Feature>>()

call.enqueue(
Point(1, 1),
object : GrpcCall.Callback<Point, Feature> {
override fun onFailure(
call: GrpcCall<Point, Feature>,
exception: IOException,
) {
callbackCallFuture.set(call)
}

override fun onSuccess(
call: GrpcCall<Point, Feature>,
response: Feature,
) {
callbackCallFuture.set(call)
}
},
)

assertThat(callbackCallFuture.get().timeout.timeoutNanos()).isEqualTo(okHttpClientTimeout.toNanos())
}

private fun removeGrpcStatusInterceptor(): Interceptor {
val noTrailersResponse = noTrailersResponse()
assertThat(noTrailersResponse.trailers().size).isEqualTo(0)
Expand Down

0 comments on commit 62e9398

Please sign in to comment.