diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java index 5eed2e1038169..7b265df20de7d 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java @@ -12,8 +12,6 @@ import com.azure.core.amqp.exception.AmqpException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.reactivestreams.Subscription; @@ -37,7 +35,6 @@ /** * Tests for {@link AmqpChannelProcessor}. */ -@Execution(ExecutionMode.SAME_THREAD) class AmqpChannelProcessorTest { private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(30); private final TestObject connection1 = new TestObject(); @@ -55,15 +52,19 @@ void createsNewConnection() { final AmqpChannelProcessor processor = publisher.flux() .subscribeWith(createChannelProcessor()); - // Act & Assert - StepVerifier.create(processor) - .then(() -> publisher.next(connection1)) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - publisher.assertMaxRequested(1L); + try { + // Act & Assert + StepVerifier.create(processor) + .then(() -> publisher.next(connection1)) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + publisher.assertMaxRequested(1L); + } finally { + processor.dispose(); + } } /** @@ -76,17 +77,21 @@ void sameConnectionReturned() { final AmqpChannelProcessor processor = publisher.next(connection1).flux() .subscribeWith(createChannelProcessor()); - // Act & Assert - StepVerifier.create(processor) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - StepVerifier.create(processor) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); + try { + // Act & Assert + StepVerifier.create(processor) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + StepVerifier.create(processor) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + } finally { + processor.dispose(); + } } /** @@ -99,45 +104,49 @@ void newConnectionOnClose() { final AmqpChannelProcessor processor = publisher.next(connection1).flux() .subscribeWith(createChannelProcessor()); - // Act & Assert - // Verify that we get the first connection. - StepVerifier.create(processor) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - // Close that connection. - connection1.getSink().complete(); - - // Expect that the next connection is returned to us. - StepVerifier.create(processor) - .then(() -> { - publisher.next(connection2); - connection2.getSink().next(AmqpEndpointState.ACTIVE); - }) - .expectNext(connection2) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - // Close connection 2 - connection2.getSink().complete(); - - // Expect that the new connection is returned again. - StepVerifier.create(processor) - .then(() -> { - publisher.next(connection3); - connection3.getSink().next(AmqpEndpointState.ACTIVE); - }) - .expectNext(connection3) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - // Expect that the new connection is returned again. - StepVerifier.create(processor) - .expectNext(connection3) - .expectComplete() - .verify(VERIFY_TIMEOUT); + try { + // Act & Assert + // Verify that we get the first connection. + StepVerifier.create(processor) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Close that connection. + connection1.getSink().complete(); + + // Expect that the next connection is returned to us. + StepVerifier.create(processor) + .then(() -> { + publisher.next(connection2); + connection2.getSink().next(AmqpEndpointState.ACTIVE); + }) + .expectNext(connection2) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Close connection 2 + connection2.getSink().complete(); + + // Expect that the new connection is returned again. + StepVerifier.create(processor) + .then(() -> { + publisher.next(connection3); + connection3.getSink().next(AmqpEndpointState.ACTIVE); + }) + .expectNext(connection3) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Expect that the new connection is returned again. + StepVerifier.create(processor) + .expectNext(connection3) + .expectComplete() + .verify(VERIFY_TIMEOUT); + } finally { + processor.dispose(); + } } public static Stream newConnectionOnRetriableError() { @@ -152,19 +161,18 @@ public static Stream newConnectionOnRetriableError() { /** * Verifies that we can get the next connection when the first one encounters a retryable error. */ - @MethodSource + @MethodSource("newConnectionOnRetriableError") @ParameterizedTest - void newConnectionOnRetriableError(Throwable exception) { + void newConnectionOnRetriableErrorTest(Throwable exception) { // Arrange - final TestPublisher publisher = TestPublisher.createCold(); - publisher.next(connection1); - publisher.next(connection2); + final AtomicInteger failedTries = new AtomicInteger(); AmqpRetryPolicy retryPolicy = new AmqpRetryPolicy(new AmqpRetryOptions()) { @Override protected Duration calculateRetryDelay(int retryCount, Duration baseDelay, Duration baseJitter, - ThreadLocalRandom random) { - return null; + ThreadLocalRandom random) { + failedTries.incrementAndGet(); + throw new RuntimeException("Unexpected call to calculateRetryDelay"); } @Override @@ -177,6 +185,7 @@ public Duration calculateRetryDelay(Throwable lastException, int retryCount) { // that wasn't expected. Previously, this was returning a delay longer than StepVerifer's timeout which // failed the test but didn't provide a clear indication of why it failed (specifically why or what // caused an additional retry). + failedTries.getAndIncrement(); if ((lastException == exception || lastException.getCause() == exception) && retryCount == 0) { return Duration.ofMillis(1); } else { @@ -185,34 +194,43 @@ public Duration calculateRetryDelay(Throwable lastException, int retryCount) { } }; + final TestPublisher publisher = TestPublisher.createCold(); final AmqpChannelProcessor processor = publisher.flux() .subscribeWith(createChannelProcessor(retryPolicy)); - final long request = 1; - - // Act & Assert - // Verify that we get the first connection. - StepVerifier.create(processor, request) - .thenAwait(Duration.ofMillis(400)) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - connection1.getSink().error(exception); - - // Expect that the next connection is returned to us. - StepVerifier.create(processor, request) - .thenAwait(Duration.ofMillis(400)) - .then(() -> connection2.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection2) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - // Expect that the next connection is returned to us. - StepVerifier.create(processor, request) - .expectNext(connection2) - .expectComplete() - .verify(VERIFY_TIMEOUT); + + try { + // Act & Assert + // Verify that we get the first connection even though endpoints states are failing. + StepVerifier.create(processor, 1) + .then(() -> { + publisher.next(connection1); + connection1.getSink().error(exception); + }) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Expect that the next connection is returned to us. + StepVerifier.create(processor, 1) + .then(() -> { + publisher.next(connection2); + connection2.getSink().next(AmqpEndpointState.ACTIVE); + }) + .expectNext(connection2) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Expect that the next connection is returned to us. + StepVerifier.create(processor, 1) + .expectNext(connection2) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // sanity check - we had one failure and should have called retry policy exactly once. + assertEquals(1, failedTries.get()); + } finally { + processor.dispose(); + } } public static Stream nonRetriableError() { @@ -246,25 +264,29 @@ public Duration calculateRetryDelay(Throwable lastException, int retryCount) { final AmqpChannelProcessor processor = publisher.next(connection1).flux() .subscribeWith(createChannelProcessor(retryPolicy)); - // Act & Assert - // Verify that we get the first connection. - StepVerifier.create(processor) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - connection1.getSink().error(exception); - - // Expect that the error is returned to us. - StepVerifier.create(processor) - .expectErrorMatches(error -> Objects.equals(exception, error)) - .verify(VERIFY_TIMEOUT); - - // Expect that the error is returned to us again. - StepVerifier.create(processor) - .expectErrorMatches(error -> Objects.equals(exception, error)) - .verify(VERIFY_TIMEOUT); + try { + // Act & Assert + // Verify that we get the first connection. + StepVerifier.create(processor) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + connection1.getSink().error(exception); + + // Expect that the error is returned to us. + StepVerifier.create(processor) + .expectErrorMatches(error -> Objects.equals(exception, error)) + .verify(VERIFY_TIMEOUT); + + // Expect that the error is returned to us again. + StepVerifier.create(processor) + .expectErrorMatches(error -> Objects.equals(exception, error)) + .verify(VERIFY_TIMEOUT); + } finally { + processor.dispose(); + } } /** @@ -293,11 +315,17 @@ public void cancel() { } }; - // Act - createChannelProcessor().onSubscribe(subscription); + final AmqpChannelProcessor processor = createChannelProcessor(); + try { + // Act + processor.onSubscribe(subscription); - // Assert - assertEquals(1, requests.get(1L).get()); + // Assert + assertEquals(1, requests.get(1L).get()); + } finally { + subscription.cancel(); + processor.dispose(); + } } /** @@ -312,32 +340,46 @@ void errorsWhenResubscribingOnTerminated() { final AmqpChannelProcessor processor = publisher.next(connection1).flux() .subscribeWith(channelProcessor); - // Act & Assert - // Verify that we get the first connection. - StepVerifier.create(processor) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - processor.dispose(); - - // Verify that it errors without emitting a connection. - StepVerifier.create(processor) - .expectError(IllegalStateException.class) - .verify(VERIFY_TIMEOUT); - - assertTrue(channelProcessor.isChannelClosed()); + try { + // Act & Assert + // Verify that we get the first connection. + StepVerifier.create(processor) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + processor.dispose(); + + // Verify that it errors without emitting a connection. + StepVerifier.create(processor) + .expectError(IllegalStateException.class) + .verify(VERIFY_TIMEOUT); + + assertTrue(channelProcessor.isChannelClosed()); + } finally { + processor.dispose(); + } } @Test void requiresNonNullNext() { - Assertions.assertThrows(NullPointerException.class, () -> createChannelProcessor().onNext(null)); + final AmqpChannelProcessor channelProcessor = createChannelProcessor(); + try { + Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onNext(null)); + } finally { + channelProcessor.dispose(); + } } @Test void requiresNonNullError() { - Assertions.assertThrows(NullPointerException.class, () -> createChannelProcessor().onError(null)); + final AmqpChannelProcessor channelProcessor = createChannelProcessor(); + try { + Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onError(null)); + } finally { + channelProcessor.dispose(); + } } private static AmqpChannelProcessor createChannelProcessor() {