From d1f188a4da1a99f6d07967ca3a701b5cf4375b8f Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 24 Jun 2019 21:05:09 -0700 Subject: [PATCH 01/13] Adding unit test case for record delivery validation --- .../fanout/FanOutRecordsPublisher.java | 4 +- .../fanout/FanOutRecordsPublisherTest.java | 118 ++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index e738f0c64..b5105dac3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -65,7 +67,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final AtomicInteger subscribeToShardId = new AtomicInteger(0); private RecordFlow flow; - + @Getter @VisibleForTesting private String currentSequenceNumber; private InitialPositionInStreamExtended initialPositionInStreamExtended; private boolean isFirstConnection = true; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index b12fc1a51..631f2e030 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -5,9 +5,11 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -18,10 +20,17 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.schedulers.Schedulers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -29,7 +38,10 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -142,6 +154,108 @@ public void onComplete() { } + @Test + public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = + new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override + public void onComplete() { + fail("OnComplete called when not expected"); + } + }; + + Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor())); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + Stream.of("1000", "2000", "3000") + .map(contSeqNum -> + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum) + .records(records).build()) + .forEach(batchEvent -> captor.getValue().onNext(batchEvent)); + + verify(subscription, times(4)).request(1); + assertThat(receivedInput.size(), equalTo(3)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo("3000")); + + } + + private Scheduler getScheduler(ExecutorService executorService) { + return Schedulers.from(executorService); + } + + private ExecutorService getSpiedExecutor() { + ExecutorService executorService = Executors.newFixedThreadPool(8, + new ThreadFactoryBuilder().setNameFormat("test-fanout-record-publisher-%04d").setDaemon(true).build()); + return spy(executorService); + } + + private ExecutorService getBlockingExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)).when(executorService).execute(any()); + return executorService; + } + + private Object directlyExecuteRunnable(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + @Test public void largeRequestTest() throws Exception { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); @@ -477,6 +591,10 @@ public void onComplete() { } } + private Record makeRecord(String sequenceNumber) { + return makeRecord(Integer.parseInt(sequenceNumber)); + } + private Record makeRecord(int sequenceNumber) { SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 }); return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now()) From 9f7cec6b5439fd6610d4c3668055abcc9bce89d1 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 10 Jul 2019 12:34:54 -0700 Subject: [PATCH 02/13] Initial prototype for notification mechanism between ShardConsumerSubscriber and FanoutPublisher. The SDK Threads are made to block wait on the ack from the ShardConsumerSubscriber --- .../lifecycle/NotifyingSubscriber.java | 53 +++++++++++ .../lifecycle/ShardConsumerSubscriber.java | 25 ++++- .../kinesis/retrieval/RecordsPublisher.java | 8 ++ .../kinesis/retrieval/RecordsRetrieved.java | 9 ++ .../retrieval/RecordsRetrievedAck.java | 11 +++ .../fanout/FanOutRecordsPublisher.java | 46 +++++++++- .../fanout/FanOutRecordsPublisherTest.java | 92 +++++++++++++++++++ 7 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java new file mode 100644 index 000000000..b5829ca0d --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java @@ -0,0 +1,53 @@ +package software.amazon.kinesis.lifecycle; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsRetrievedAck; + +/** + * Subscriber that notifies its publisher on receipt of the onNext event. + */ +public interface NotifyingSubscriber extends Subscriber { + + /** + * Return the actual subscriber to which the events needs to be delegated. + * @return Subscriber to be delegated + */ + Subscriber getDelegateSubscriber(); + + /** + * Return the publisher to be notified + * @return RecordsPublisher to be notified. + */ + RecordsPublisher getWaitingRecordsPublisher(); + + /** + * Construct RecordsRetrievedAck object from the incoming data and return it + * @param t type of data + * @return RecordsRetrievedAck + */ + RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved); + + @Override + default void onSubscribe(Subscription subscription) { + getDelegateSubscriber().onSubscribe(subscription); + } + + @Override + default void onNext(RecordsRetrieved recordsRetrieved) { + getWaitingRecordsPublisher().notify(getRecordsRetrievedAck(recordsRetrieved)); + getDelegateSubscriber().onNext(recordsRetrieved); + } + + @Override + default void onError(Throwable throwable) { + getDelegateSubscriber().onError(throwable); + } + + @Override + default void onComplete() { + getDelegateSubscriber().onComplete(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index afc22f705..d11674dbb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -19,6 +19,7 @@ import io.reactivex.Scheduler; import io.reactivex.schedulers.Schedulers; import lombok.AccessLevel; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -26,6 +27,7 @@ import org.reactivestreams.Subscription; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsRetrievedAck; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import java.time.Duration; @@ -77,7 +79,7 @@ void startSubscriptions() { recordsPublisher.restartFrom(lastAccepted); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) - .subscribe(this); + .subscribe(new ShardConsumerNotifyingSubscriber(this)); } } @@ -215,4 +217,25 @@ public void cancel() { subscription.cancel(); } } + + @AllArgsConstructor + private class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { + + private final Subscriber delegate; + + @Override + public Subscriber getDelegateSubscriber() { + return delegate; + } + + @Override + public RecordsPublisher getWaitingRecordsPublisher() { + return recordsPublisher; + } + + @Override + public RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { + return () -> recordsRetrieved.batchSequenceNumber(); + } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index fffeee08f..d04c12a5c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -47,4 +47,12 @@ public interface RecordsPublisher extends Publisher { * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ void shutdown(); + + /** + * Notify the publisher on receipt of a data event. + * @param ack + */ + default void notify(RecordsRetrievedAck ack) { + throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index 7db9d05f1..eee55fe40 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -24,4 +24,13 @@ public interface RecordsRetrieved { * @return the processRecordsInput received */ ProcessRecordsInput processRecordsInput(); + + /** + * Returns the sequence number that can be used as a pagination token for next batch of records. + * + * @return sequenceNumber to checkpoint + */ + default String batchSequenceNumber() { + throw new UnsupportedOperationException("Retrieval of batch sequence number is not supported"); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java new file mode 100644 index 000000000..9761192f7 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java @@ -0,0 +1,11 @@ +package software.amazon.kinesis.retrieval; + +public interface RecordsRetrievedAck { + + /** + * Sequence Number of the record batch that was delivered to the Subscriber/Observer. + * @return deliveredSequenceNumber + */ + String deliveredSequenceNumber(); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index b5105dac3..0a0d73519 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -18,6 +18,11 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -47,6 +52,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsRetrievedAck; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -75,6 +81,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private Subscriber subscriber; private long availableQueueSpace = 0; + private BlockingQueue> recordsDeliveryQueue = new LinkedBlockingQueue<>(1); + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { @@ -112,10 +120,38 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { throw new IllegalArgumentException( "Provided ProcessRecordsInput not created from the FanOutRecordsPublisher"); } - currentSequenceNumber = ((FanoutRecordsRetrieved) recordsRetrieved).continuationSequenceNumber(); + currentSequenceNumber = recordsRetrieved.batchSequenceNumber(); + } + } + + @Override + public void notify(RecordsRetrievedAck ack) { + final CompletableFuture future = recordsDeliveryQueue.poll(); + if(future != null) { + future.complete(ack); + } else { + log.warn("{}: Received records delivery notification for not waiting publisher. " + + "SequenceNumber - {}", shardId, ack.deliveredSequenceNumber()); } } + private Optional> tryDeliver(Runnable onNext) { + if (recordsDeliveryQueue.remainingCapacity() > 0) { + final CompletableFuture future = new CompletableFuture<>(); + if(recordsDeliveryQueue.offer(future)) { + onNext.run(); + return Optional.of(future); + } + } + return Optional.empty(); + } + + private RecordsRetrievedAck blockOnAck(Runnable onNext) throws Exception { + return tryDeliver(onNext) + .orElseThrow(() -> new RuntimeException("Attempted to deliver an event while previous event delivery is still pending")) + .get(1000, TimeUnit.MILLISECONDS); + } + private boolean hasValidSubscriber() { return subscriber != null; } @@ -299,11 +335,10 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re recordBatchEvent.continuationSequenceNumber()); try { - subscriber.onNext(recordsRetrieved); // // Only advance the currentSequenceNumber if we successfully dispatch the last received input // - currentSequenceNumber = recordBatchEvent.continuationSequenceNumber(); + currentSequenceNumber = blockOnAck(() -> subscriber.onNext(recordsRetrieved)).deliveredSequenceNumber(); } catch (Throwable t) { log.warn("{}: Unable to call onNext for subscriber. Failing publisher.", shardId); errorOccurred(triggeringFlow, t); @@ -496,6 +531,11 @@ static class FanoutRecordsRetrieved implements RecordsRetrieved { public ProcessRecordsInput processRecordsInput() { return processRecordsInput; } + + @Override + public String batchSequenceNumber() { + return continuationSequenceNumber; + } } @RequiredArgsConstructor diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 631f2e030..22da2b757 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -31,6 +32,7 @@ import io.reactivex.Flowable; import io.reactivex.Scheduler; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.SafeSubscriber; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -234,6 +236,86 @@ public void onComplete() { } + @Test + public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = + new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override + public void onComplete() { + fail("OnComplete called when not expected"); + } + }; + + Scheduler testScheduler = getScheduler(getOverwhelmedExecutor(getSpiedExecutor())); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(new SafeSubscriber<>(shardConsumerSubscriber)); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + Stream.of("1000", "2000", "3000") + .map(contSeqNum -> + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum) + .records(records).build()) + .forEach(batchEvent -> captor.getValue().onNext(batchEvent)); + + verify(subscription, times(4)).request(1); + assertThat(receivedInput.size(), equalTo(3)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo("3000")); + + } + private Scheduler getScheduler(ExecutorService executorService) { return Schedulers.from(executorService); } @@ -249,6 +331,16 @@ private ExecutorService getBlockingExecutor(ExecutorService executorService) { return executorService; } + private ExecutorService getOverwhelmedExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doThrow(new RejectedExecutionException()) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .when(executorService).execute(any()); + return executorService; + } + private Object directlyExecuteRunnable(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); Runnable runnable = (Runnable) args[0]; From cd8307b921a561493e71d7ec25cb7b52c33c2a1a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jul 2019 18:06:46 -0700 Subject: [PATCH 03/13] initial non blocking prototype --- .../lifecycle/ShardConsumerSubscriber.java | 11 +- .../kinesis/retrieval/RecordsRetrieved.java | 11 ++ .../retrieval/RecordsRetrievedAck.java | 8 + .../fanout/FanOutRecordsPublisher.java | 146 +++++++++++++----- 4 files changed, 140 insertions(+), 36 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index d11674dbb..9de9e7980 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -32,6 +32,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.UUID; import java.util.concurrent.ExecutorService; @Slf4j @@ -235,7 +236,15 @@ public RecordsPublisher getWaitingRecordsPublisher() { @Override public RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { - return () -> recordsRetrieved.batchSequenceNumber(); + return new RecordsRetrievedAck() { + @Override public String deliveredSequenceNumber() { + return recordsRetrieved.batchSequenceNumber(); + } + + @Override public UUID batchUniqueIdentifier() { + return recordsRetrieved.batchUniqueIdentifier(); + } + }; } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index eee55fe40..b30a7b838 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -16,6 +16,8 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import java.util.UUID; + public interface RecordsRetrieved { /** @@ -33,4 +35,13 @@ public interface RecordsRetrieved { default String batchSequenceNumber() { throw new UnsupportedOperationException("Retrieval of batch sequence number is not supported"); } + + /** + * Returns the identifier that uniquely identifies this batch. + * + * @return UUID + */ + default UUID batchUniqueIdentifier() { + throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported"); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java index 9761192f7..fc74f27ed 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java @@ -1,5 +1,7 @@ package software.amazon.kinesis.retrieval; +import java.util.UUID; + public interface RecordsRetrievedAck { /** @@ -8,4 +10,10 @@ public interface RecordsRetrievedAck { */ String deliveredSequenceNumber(); + /** + * Unique record batch identifier used to validate the ordering guarantees. + * @return UUID + */ + UUID batchUniqueIdentifier(); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 0a0d73519..28b61f848 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -19,10 +19,10 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -63,6 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); + private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; private final KinesisAsyncClient kinesis; private final String shardId; @@ -81,7 +82,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private Subscriber subscriber; private long availableQueueSpace = 0; - private BlockingQueue> recordsDeliveryQueue = new LinkedBlockingQueue<>(1); + private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -125,33 +126,97 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { } @Override - public void notify(RecordsRetrievedAck ack) { - final CompletableFuture future = recordsDeliveryQueue.poll(); - if(future != null) { - future.complete(ack); - } else { - log.warn("{}: Received records delivery notification for not waiting publisher. " - + "SequenceNumber - {}", shardId, ack.deliveredSequenceNumber()); + public void notify(RecordsRetrievedAck recordsRetrievedAck) { + boolean isNextEventScheduled = false; + CompletableFuture triggeringFlowFuture = new CompletableFuture<>(); + try { + isNextEventScheduled = evictAckedEventAndScheduleNextEvent(recordsRetrievedAck, triggeringFlowFuture); + } catch (Throwable t) { + // TODO : Need to maintain the flow information + errorOccurred(triggeringFlowFuture.getNow(null), t); + } + if (isNextEventScheduled) { + final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null); + if (triggeringFlow != null) { + updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); + } } } - private Optional> tryDeliver(Runnable onNext) { - if (recordsDeliveryQueue.remainingCapacity() > 0) { - final CompletableFuture future = new CompletableFuture<>(); - if(recordsDeliveryQueue.offer(future)) { - onNext.run(); - return Optional.of(future); + private boolean evictAckedEventAndScheduleNextEvent(RecordsRetrievedAck recordsRetrievedAck, + CompletableFuture triggeringFlowFuture) { + boolean isNextEventScheduled = false; + // Remove the head of the queue on receiving the ack. + // Note : This does not block wait to retrieve an element. + RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.poll(); + // Check if the ack corresponds to the head of the delivery queue. + if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier() + .equals(recordsRetrievedAck.batchUniqueIdentifier())) { + // Update current sequence number for the successfully delivered event. + currentSequenceNumber = recordsRetrievedAck.deliveredSequenceNumber(); + // Update the triggering flow for post scheduling upstream request. + triggeringFlowFuture.complete(recordsRetrievedContext.getRecordFlow()); + // Try scheduling the next event in the queue, if available. + if (recordsDeliveryQueue.peek() != null) { + subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); + isNextEventScheduled = true; } + } else { + // TODO : toString implementation for recordsRetrievedAck + log.error("{}: KCL BUG: Found mismatched payload {} in the delivery queue for the ack {} ", shardId, + recordsRetrievedContext.getRecordsRetrieved(), recordsRetrievedAck); + throw new IllegalStateException("KCL BUG: Record delivery ack mismatch"); } - return Optional.empty(); + return isNextEventScheduled; } - private RecordsRetrievedAck blockOnAck(Runnable onNext) throws Exception { - return tryDeliver(onNext) - .orElseThrow(() -> new RuntimeException("Attempted to deliver an event while previous event delivery is still pending")) - .get(1000, TimeUnit.MILLISECONDS); + private boolean bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) { + boolean isCurrentEventScheduled = false; + final RecordsRetrievedContext recordsRetrievedContext = new RecordsRetrievedContext(recordsRetrieved, triggeringFlow); + try { + // Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure. + // Note: This does not block wait to enqueue. + recordsDeliveryQueue.add(recordsRetrievedContext); + // If the current batch is the only element in the queue, then try scheduling the event delivery. + if (recordsDeliveryQueue.size() == 1) { + subscriber.onNext(recordsRetrieved); + isCurrentEventScheduled = true; + } + } catch (IllegalStateException e) { + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", + shardId, recordsDeliveryQueue.remainingCapacity()); + throw e; + } catch (Throwable t) { + recordsDeliveryQueue.remove(recordsRetrievedContext); + throw t; + } + return isCurrentEventScheduled; + } + + @Data + private static class RecordsRetrievedContext { + private final RecordsRetrieved recordsRetrieved; + private final RecordFlow recordFlow; } + +// private Optional> tryDeliver(Runnable onNext) { +// if (recordsDeliveryQueue.remainingCapacity() > 0) { +// final CompletableFuture future = new CompletableFuture<>(); +// if(recordsDeliveryQueue.offer(future)) { +// onNext.run(); +// return Optional.of(future); +// } +// } +// return Optional.empty(); +// } +// +// private RecordsRetrievedAck blockOnAck(Runnable onNext) throws Exception { +// return tryDeliver(onNext) +// .orElseThrow(() -> new RuntimeException("Attempted to deliver an event while previous event delivery is still pending")) +// .get(1000, TimeUnit.MILLISECONDS); +// } + private boolean hasValidSubscriber() { return subscriber != null; } @@ -334,25 +399,31 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, recordBatchEvent.continuationSequenceNumber()); + boolean isCurrentEventScheduled = false; + try { - // - // Only advance the currentSequenceNumber if we successfully dispatch the last received input - // - currentSequenceNumber = blockOnAck(() -> subscriber.onNext(recordsRetrieved)).deliveredSequenceNumber(); + isCurrentEventScheduled = bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { - log.warn("{}: Unable to call onNext for subscriber. Failing publisher.", shardId); + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); errorOccurred(triggeringFlow, t); } - if (availableQueueSpace <= 0) { - log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); - } else { - availableQueueSpace--; - if (availableQueueSpace > 0) { - triggeringFlow.request(1); - } + if(isCurrentEventScheduled) { + updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); + } + + } + } + + private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { + if (availableQueueSpace <= 0) { + log.debug( + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", + shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); } } } @@ -526,6 +597,7 @@ static class FanoutRecordsRetrieved implements RecordsRetrieved { private final ProcessRecordsInput processRecordsInput; private final String continuationSequenceNumber; + private final UUID batchUniqueIdentifier = UUID.randomUUID(); @Override public ProcessRecordsInput processRecordsInput() { @@ -536,6 +608,11 @@ public ProcessRecordsInput processRecordsInput() { public String batchSequenceNumber() { return continuationSequenceNumber; } + + @Override + public UUID batchUniqueIdentifier() { + return batchUniqueIdentifier; + } } @RequiredArgsConstructor @@ -776,5 +853,4 @@ public void onComplete() { } } - } From ff51d645a70d667eba5a721630105d76934f7389 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 18 Jul 2019 16:22:49 -0700 Subject: [PATCH 04/13] Refactoring src and test --- .../ShardConsumerNotifyingSubscriber.java | 42 +++++++++++ .../lifecycle/ShardConsumerSubscriber.java | 33 +-------- .../fanout/FanOutRecordsPublisher.java | 46 ++++-------- .../fanout/FanOutRecordsPublisherTest.java | 73 ++++++++----------- 4 files changed, 86 insertions(+), 108 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java new file mode 100644 index 000000000..e22fa3d27 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java @@ -0,0 +1,42 @@ +package software.amazon.kinesis.lifecycle; + +import lombok.AllArgsConstructor; +import org.reactivestreams.Subscriber; +import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsRetrievedAck; + +import java.util.UUID; + +@AllArgsConstructor +public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { + + private final Subscriber delegate; + + private final RecordsPublisher recordsPublisher; + + @Override + public Subscriber getDelegateSubscriber() { + return delegate; + } + + @Override + public RecordsPublisher getWaitingRecordsPublisher() { + return recordsPublisher; + } + + @Override + public RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { + return new RecordsRetrievedAck() { + @Override + public String deliveredSequenceNumber() { + return recordsRetrieved.batchSequenceNumber(); + } + + @Override + public UUID batchUniqueIdentifier() { + return recordsRetrieved.batchUniqueIdentifier(); + } + }; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 9de9e7980..12061bedf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -19,7 +19,6 @@ import io.reactivex.Scheduler; import io.reactivex.schedulers.Schedulers; import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -27,12 +26,10 @@ import org.reactivestreams.Subscription; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; -import software.amazon.kinesis.retrieval.RecordsRetrievedAck; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import java.time.Duration; import java.time.Instant; -import java.util.UUID; import java.util.concurrent.ExecutorService; @Slf4j @@ -80,7 +77,7 @@ void startSubscriptions() { recordsPublisher.restartFrom(lastAccepted); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) - .subscribe(new ShardConsumerNotifyingSubscriber(this)); + .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); } } @@ -219,32 +216,4 @@ public void cancel() { } } - @AllArgsConstructor - private class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { - - private final Subscriber delegate; - - @Override - public Subscriber getDelegateSubscriber() { - return delegate; - } - - @Override - public RecordsPublisher getWaitingRecordsPublisher() { - return recordsPublisher; - } - - @Override - public RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { - return new RecordsRetrievedAck() { - @Override public String deliveredSequenceNumber() { - return recordsRetrieved.batchSequenceNumber(); - } - - @Override public UUID batchUniqueIdentifier() { - return recordsRetrieved.batchUniqueIdentifier(); - } - }; - } - } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 28b61f848..d734c301b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -127,15 +127,15 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { @Override public void notify(RecordsRetrievedAck recordsRetrievedAck) { - boolean isNextEventScheduled = false; - CompletableFuture triggeringFlowFuture = new CompletableFuture<>(); - try { - isNextEventScheduled = evictAckedEventAndScheduleNextEvent(recordsRetrievedAck, triggeringFlowFuture); - } catch (Throwable t) { - // TODO : Need to maintain the flow information - errorOccurred(triggeringFlowFuture.getNow(null), t); - } - if (isNextEventScheduled) { + synchronized (lockObject) { + boolean isNextEventScheduled = false; + CompletableFuture triggeringFlowFuture = new CompletableFuture<>(); + try { + isNextEventScheduled = evictAckedEventAndScheduleNextEvent(recordsRetrievedAck, triggeringFlowFuture); + } catch (Throwable t) { + errorOccurred(triggeringFlowFuture.getNow(null), t); + } + // TODO : debug log on isNextEventScheduled final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null); if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); @@ -199,24 +199,6 @@ private static class RecordsRetrievedContext { private final RecordFlow recordFlow; } - -// private Optional> tryDeliver(Runnable onNext) { -// if (recordsDeliveryQueue.remainingCapacity() > 0) { -// final CompletableFuture future = new CompletableFuture<>(); -// if(recordsDeliveryQueue.offer(future)) { -// onNext.run(); -// return Optional.of(future); -// } -// } -// return Optional.empty(); -// } -// -// private RecordsRetrievedAck blockOnAck(Runnable onNext) throws Exception { -// return tryDeliver(onNext) -// .orElseThrow(() -> new RuntimeException("Attempted to deliver an event while previous event delivery is still pending")) -// .get(1000, TimeUnit.MILLISECONDS); -// } - private boolean hasValidSubscriber() { return subscriber != null; } @@ -246,6 +228,10 @@ private void subscribeToShard(String sequenceNumber) { private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { synchronized (lockObject) { + + // Clean the delivery buffer + recordsDeliveryQueue.clear(); + if (!hasValidSubscriber()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", @@ -403,15 +389,11 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re try { isCurrentEventScheduled = bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); + // TODO: debug log on isCurrentEventScheduled } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); errorOccurred(triggeringFlow, t); } - - if(isCurrentEventScheduled) { - updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); - } - } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 22da2b757..5879de6c3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -63,6 +63,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; @@ -103,32 +104,28 @@ public void simpleTest() throws Exception { List receivedInput = new ArrayList<>(); - source.subscribe(new Subscriber() { + source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }); + }, source)); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); @@ -172,33 +169,29 @@ public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Except List receivedInput = new ArrayList<>(); - Subscriber shardConsumerSubscriber = + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }; + }, source); Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor())); int bufferSize = 8; @@ -252,33 +245,29 @@ public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryT List receivedInput = new ArrayList<>(); - Subscriber shardConsumerSubscriber = + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }; + }, source); Scheduler testScheduler = getScheduler(getOverwhelmedExecutor(getSpiedExecutor())); int bufferSize = 8; @@ -302,8 +291,8 @@ public void onComplete() { .records(records).build()) .forEach(batchEvent -> captor.getValue().onNext(batchEvent)); - verify(subscription, times(4)).request(1); - assertThat(receivedInput.size(), equalTo(3)); + verify(subscription, times(2)).request(1); + assertThat(receivedInput.size(), equalTo(1)); receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { assertThat(clientRecordsList.size(), equalTo(matchers.size())); @@ -312,7 +301,7 @@ public void onComplete() { } }); - assertThat(source.getCurrentSequenceNumber(), equalTo("3000")); + assertThat(source.getCurrentSequenceNumber(), equalTo("1000")); } @@ -364,32 +353,28 @@ public void largeRequestTest() throws Exception { List receivedInput = new ArrayList<>(); - source.subscribe(new Subscriber() { + source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(3); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }); + }, source)); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); @@ -476,7 +461,7 @@ public void testContinuesAfterSequence() { NonFailingSubscriber nonFailingSubscriber = new NonFailingSubscriber(); - source.subscribe(nonFailingSubscriber); + source.subscribe(new ShardConsumerNotifyingSubscriber(nonFailingSubscriber, source)); SubscribeToShardRequest expected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID) .startingPosition(StartingPosition.builder().sequenceNumber("0") From 27d9c5c88772a9315b039ed48c7abb657c6a360d Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 1 Aug 2019 01:05:19 -0700 Subject: [PATCH 05/13] Added unit test cases. Addressed review comments. Handled edge cases --- .../kinesis/common/DiagnosticUtils.java | 45 ++ .../lifecycle/NotifyingSubscriber.java | 21 +- .../ShardConsumerNotifyingSubscriber.java | 33 +- .../retrieval/BatchUniqueIdentifier.java | 24 + .../kinesis/retrieval/RecordsDeliveryAck.java | 26 + .../kinesis/retrieval/RecordsPublisher.java | 3 +- .../kinesis/retrieval/RecordsRetrieved.java | 11 +- .../retrieval/RecordsRetrievedAck.java | 19 - .../fanout/FanOutRecordsPublisher.java | 156 +++-- .../polling/PrefetchRecordsPublisher.java | 16 + .../ShardConsumerSubscriberTest.java | 6 + .../kinesis/lifecycle/ShardConsumerTest.java | 6 + .../fanout/FanOutRecordsPublisherTest.java | 564 ++++++++++++++++-- .../polling/PrefetchRecordsPublisherTest.java | 5 +- 14 files changed, 785 insertions(+), 150 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java new file mode 100644 index 000000000..1e2ee3556 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import org.slf4j.Logger; + +import java.time.Duration; +import java.time.Instant; + +import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE; + +public class DiagnosticUtils { + + /** + * Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action. + * @param shardId of the shard that is having delayed delivery + * @param enqueueTimestamp of the event submitted to the executor service + * @param log Slf4j Logger from RecordPublisher to log the events + */ + public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { + final long durationBetweenEnqueueAndAckInMillis = Duration + .between(enqueueTimestamp, Instant.now()).toMillis(); + if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 2) { + // TODO : Use DelayedDeliveryEvent to aggregate and notify on this delay event. + log.debug("{}: Record delivery time to shard consumer is high at {} millis", shardId, + durationBetweenEnqueueAndAckInMillis); + } else if (log.isDebugEnabled()) { + log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, + durationBetweenEnqueueAndAckInMillis); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java index b5829ca0d..5f4199ed2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java @@ -1,10 +1,25 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.lifecycle; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; -import software.amazon.kinesis.retrieval.RecordsRetrievedAck; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; /** * Subscriber that notifies its publisher on receipt of the onNext event. @@ -25,10 +40,10 @@ public interface NotifyingSubscriber extends Subscriber { /** * Construct RecordsRetrievedAck object from the incoming data and return it - * @param t type of data + * @param recordsRetrieved for which we need the ack. * @return RecordsRetrievedAck */ - RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved); + RecordsDeliveryAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved); @Override default void onSubscribe(Subscription subscription) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java index e22fa3d27..687c4e7ca 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java @@ -1,12 +1,25 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.lifecycle; import lombok.AllArgsConstructor; import org.reactivestreams.Subscriber; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; -import software.amazon.kinesis.retrieval.RecordsRetrievedAck; - -import java.util.UUID; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @AllArgsConstructor public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { @@ -26,17 +39,7 @@ public RecordsPublisher getWaitingRecordsPublisher() { } @Override - public RecordsRetrievedAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { - return new RecordsRetrievedAck() { - @Override - public String deliveredSequenceNumber() { - return recordsRetrieved.batchSequenceNumber(); - } - - @Override - public UUID batchUniqueIdentifier() { - return recordsRetrieved.batchUniqueIdentifier(); - } - }; + public RecordsDeliveryAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { + return () -> recordsRetrieved.batchUniqueIdentifier(); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java new file mode 100644 index 000000000..ab7661710 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import lombok.Data; + +@Data +public class BatchUniqueIdentifier { + private final String recordBatchIdentifier; + private final String flowIdentifier; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java new file mode 100644 index 000000000..c2dda0764 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +public interface RecordsDeliveryAck { + + /** + * Unique record batch identifier used to ensure the durability and ordering guarantees. + * @return id that uniquely determines a record batch and its source. + */ + BatchUniqueIdentifier batchUniqueIdentifier(); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index d04c12a5c..032ec04ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -18,7 +18,6 @@ import org.reactivestreams.Publisher; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -52,7 +51,7 @@ public interface RecordsPublisher extends Publisher { * Notify the publisher on receipt of a data event. * @param ack */ - default void notify(RecordsRetrievedAck ack) { + default void notify(RecordsDeliveryAck ack) { throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index b30a7b838..c349b6184 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -27,21 +27,12 @@ public interface RecordsRetrieved { */ ProcessRecordsInput processRecordsInput(); - /** - * Returns the sequence number that can be used as a pagination token for next batch of records. - * - * @return sequenceNumber to checkpoint - */ - default String batchSequenceNumber() { - throw new UnsupportedOperationException("Retrieval of batch sequence number is not supported"); - } - /** * Returns the identifier that uniquely identifies this batch. * * @return UUID */ - default UUID batchUniqueIdentifier() { + default BatchUniqueIdentifier batchUniqueIdentifier() { throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java deleted file mode 100644 index fc74f27ed..000000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrievedAck.java +++ /dev/null @@ -1,19 +0,0 @@ -package software.amazon.kinesis.retrieval; - -import java.util.UUID; - -public interface RecordsRetrievedAck { - - /** - * Sequence Number of the record batch that was delivered to the Subscriber/Observer. - * @return deliveredSequenceNumber - */ - String deliveredSequenceNumber(); - - /** - * Unique record batch identifier used to validate the ordering guarantees. - * @return UUID - */ - UUID batchUniqueIdentifier(); - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index d734c301b..ac31bce20 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -15,27 +15,16 @@ package software.amazon.kinesis.retrieval.fanout; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - import com.google.common.annotations.VisibleForTesting; -import lombok.Getter; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import lombok.Data; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; @@ -48,14 +37,27 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; -import software.amazon.kinesis.retrieval.RecordsRetrievedAck; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; + @RequiredArgsConstructor @Slf4j @KinesisClientInternalApi @@ -72,7 +74,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final Object lockObject = new Object(); private final AtomicInteger subscribeToShardId = new AtomicInteger(0); - + @Setter @VisibleForTesting private RecordFlow flow; @Getter @VisibleForTesting private String currentSequenceNumber; @@ -110,6 +112,8 @@ public void shutdown() { @Override public void restartFrom(RecordsRetrieved recordsRetrieved) { synchronized (lockObject) { + // Clear the delivery buffer so that current subscription don't yield duplicate records. + clearRecordsDeliveryQueue(); if (flow != null) { // // The flow should not be running at this time @@ -121,21 +125,19 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { throw new IllegalArgumentException( "Provided ProcessRecordsInput not created from the FanOutRecordsPublisher"); } - currentSequenceNumber = recordsRetrieved.batchSequenceNumber(); + currentSequenceNumber = ((FanoutRecordsRetrieved) recordsRetrieved).continuationSequenceNumber(); } } @Override - public void notify(RecordsRetrievedAck recordsRetrievedAck) { + public void notify(RecordsDeliveryAck recordsDeliveryAck) { synchronized (lockObject) { - boolean isNextEventScheduled = false; CompletableFuture triggeringFlowFuture = new CompletableFuture<>(); try { - isNextEventScheduled = evictAckedEventAndScheduleNextEvent(recordsRetrievedAck, triggeringFlowFuture); + evictAckedEventAndScheduleNextEvent(recordsDeliveryAck, triggeringFlowFuture); } catch (Throwable t) { errorOccurred(triggeringFlowFuture.getNow(null), t); } - // TODO : debug log on isNextEventScheduled final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null); if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); @@ -143,36 +145,52 @@ public void notify(RecordsRetrievedAck recordsRetrievedAck) { } } - private boolean evictAckedEventAndScheduleNextEvent(RecordsRetrievedAck recordsRetrievedAck, + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + @VisibleForTesting + void evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck, CompletableFuture triggeringFlowFuture) { - boolean isNextEventScheduled = false; - // Remove the head of the queue on receiving the ack. + // Peek the head of the queue on receiving the ack. // Note : This does not block wait to retrieve an element. - RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.poll(); + final RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.peek(); // Check if the ack corresponds to the head of the delivery queue. if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier() - .equals(recordsRetrievedAck.batchUniqueIdentifier())) { + .equals(recordsDeliveryAck.batchUniqueIdentifier())) { + // It is now safe to remove the element + recordsDeliveryQueue.poll(); + // Take action based on the time spent by the event in queue. + takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log); // Update current sequence number for the successfully delivered event. - currentSequenceNumber = recordsRetrievedAck.deliveredSequenceNumber(); + currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. triggeringFlowFuture.complete(recordsRetrievedContext.getRecordFlow()); // Try scheduling the next event in the queue, if available. if (recordsDeliveryQueue.peek() != null) { subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); - isNextEventScheduled = true; } } else { - // TODO : toString implementation for recordsRetrievedAck - log.error("{}: KCL BUG: Found mismatched payload {} in the delivery queue for the ack {} ", shardId, - recordsRetrievedContext.getRecordsRetrieved(), recordsRetrievedAck); - throw new IllegalStateException("KCL BUG: Record delivery ack mismatch"); + // Check if the mismatched event belongs to active flow. If publisher receives an ack for a + // missing event in active flow, then it means the event was already acked or cleared + // from the queue due to a potential bug. + if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() + .equals(flow.getSubscribeToShardId())) { + log.error( + "{}: KCL BUG: Publisher found mismatched ack for subscription {} ", + shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + throw new IllegalStateException("KCL BUG: Record delivery ack mismatch"); + } + // Otherwise publisher received a stale ack. + else { + log.info("{}: Publisher received duplicate ack or an ack for stale subscription {}. Ignoring.", shardId, + recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + } } - return isNextEventScheduled; } - private boolean bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) { - boolean isCurrentEventScheduled = false; - final RecordsRetrievedContext recordsRetrievedContext = new RecordsRetrievedContext(recordsRetrieved, triggeringFlow); + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + @VisibleForTesting + void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) { + final RecordsRetrievedContext recordsRetrievedContext = + new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now()); try { // Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure. // Note: This does not block wait to enqueue. @@ -180,7 +198,6 @@ private boolean bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved records // If the current batch is the only element in the queue, then try scheduling the event delivery. if (recordsDeliveryQueue.size() == 1) { subscriber.onNext(recordsRetrieved); - isCurrentEventScheduled = true; } } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", @@ -190,19 +207,28 @@ private boolean bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved records recordsDeliveryQueue.remove(recordsRetrievedContext); throw t; } - return isCurrentEventScheduled; + } + + @VisibleForTesting + void setSubscriberForTesting(Subscriber s) { + this.subscriber = s; } @Data private static class RecordsRetrievedContext { private final RecordsRetrieved recordsRetrieved; private final RecordFlow recordFlow; + private final Instant enqueueTimestamp; } private boolean hasValidSubscriber() { return subscriber != null; } + private boolean hasValidFlow() { + return flow != null; + } + private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() @@ -229,15 +255,22 @@ private void subscribeToShard(String sequenceNumber) { private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { synchronized (lockObject) { - // Clean the delivery buffer - recordsDeliveryQueue.clear(); - if (!hasValidSubscriber()) { - log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + if(hasValidFlow()) { + log.warn( + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", + shardId, flow.connectionStartedAt, flow.subscribeToShardId); + } else { + log.warn( + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null", + shardId); + } return; } + + // Clear the delivery buffer so that next subscription don't yield duplicate records. + clearRecordsDeliveryQueue(); + Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -267,7 +300,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { availableQueueSpace = 0; try { - handleFlowError(propagationThrowable); + handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } @@ -286,6 +319,10 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } } + private void clearRecordsDeliveryQueue() { + recordsDeliveryQueue.clear(); + } + protected void logAcquireTimeoutMessage(Throwable t) { log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " + "low maximum streams limit. " + @@ -293,13 +330,16 @@ protected void logAcquireTimeoutMessage(Throwable t) { "or refer to the class to setup the client manually."); } - private void handleFlowError(Throwable t) { + private void handleFlowError(Throwable t, RecordFlow triggeringFlow) { if (t.getCause() instanceof ResourceNotFoundException) { log.debug( "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", shardId); + // The ack received for this onNext event will be ignored by the publisher as the global flow object should + // be either null or renewed when the ack's flow identifier is evaluated. FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( - ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null); + ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null, + triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found"); subscriber.onNext(response); subscriber.onComplete(); } else { @@ -383,13 +423,10 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re .millisBehindLatest(recordBatchEvent.millisBehindLatest()) .isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build(); FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, - recordBatchEvent.continuationSequenceNumber()); - - boolean isCurrentEventScheduled = false; + recordBatchEvent.continuationSequenceNumber(), triggeringFlow.subscribeToShardId); try { - isCurrentEventScheduled = bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); - // TODO: debug log on isCurrentEventScheduled + bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); errorOccurred(triggeringFlow, t); @@ -579,7 +616,8 @@ static class FanoutRecordsRetrieved implements RecordsRetrieved { private final ProcessRecordsInput processRecordsInput; private final String continuationSequenceNumber; - private final UUID batchUniqueIdentifier = UUID.randomUUID(); + private final String flowIdentifier; + private final String batchUniqueIdentifier = UUID.randomUUID().toString(); @Override public ProcessRecordsInput processRecordsInput() { @@ -587,13 +625,8 @@ public ProcessRecordsInput processRecordsInput() { } @Override - public String batchSequenceNumber() { - return continuationSequenceNumber; - } - - @Override - public UUID batchUniqueIdentifier() { - return batchUniqueIdentifier; + public BatchUniqueIdentifier batchUniqueIdentifier() { + return new BatchUniqueIdentifier(batchUniqueIdentifier, flowIdentifier); } } @@ -603,6 +636,7 @@ static class RecordFlow implements SubscribeToShardResponseHandler { private final FanOutRecordsPublisher parent; private final Instant connectionStartedAt; + @Getter @VisibleForTesting private final String subscribeToShardId; private RecordSubscription subscription; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 83991be93..f5aaf0518 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,11 +50,14 @@ import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; + /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the * next set of records and stores it in the cache. The size of the cache is limited by setting @@ -93,6 +97,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); private boolean wasReset = false; + private final Semaphore eventDeliveryLock = new Semaphore(1); + private Instant eventDeliveryLockAcquireTime = Instant.EPOCH; + /** * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a * LinkedBlockingQueue. @@ -216,6 +223,13 @@ public void cancel() { }); } + @Override + public void notify(RecordsDeliveryAck ack) { + eventDeliveryLock.release(); + // Take action based on the time spent by the event in queue. + takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log); + } + private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { wasReset = false; while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { @@ -236,6 +250,8 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t private synchronized void drainQueueForRequests() { while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) { + eventDeliveryLock.acquireUninterruptibly(); + eventDeliveryLockAcquireTime = Instant.now(); subscriber.onNext(getNextResult()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index bfe508e68..e6889c8a6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -62,6 +62,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; @@ -426,6 +427,11 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { } + @Override + public void notify(RecordsDeliveryAck ack) { + + } + @Override public void shutdown() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 320512e69..533a200af 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -74,6 +74,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -198,6 +199,11 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, } + @Override + public void notify(RecordsDeliveryAck ack) { + + } + @Override public void shutdown() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 5879de6c3..b0152cab0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,38 +1,14 @@ package software.amazon.kinesis.retrieval.fanout; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.handler.timeout.ReadTimeoutException; import io.reactivex.Flowable; import io.reactivex.Scheduler; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.SafeSubscriber; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -40,16 +16,10 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; - -import io.netty.handler.timeout.ReadTimeoutException; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; @@ -65,11 +35,49 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(MockitoJUnitRunner.class) @Slf4j public class FanOutRecordsPublisherTest { @@ -193,7 +201,7 @@ public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Except } }, source); - Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor())); + Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor(getTestExecutor()))); int bufferSize = 8; Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) @@ -269,7 +277,7 @@ public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryT } }, source); - Scheduler testScheduler = getScheduler(getOverwhelmedExecutor(getSpiedExecutor())); + Scheduler testScheduler = getScheduler(getOverwhelmedBlockingExecutor(getSpiedExecutor(getTestExecutor()))); int bufferSize = 8; Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) @@ -305,13 +313,288 @@ public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryT } + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 0; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == totalServicePublisherEvents) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + "")); + + } + + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstWithinLimit() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 9; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == totalServicePublisherEvents) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + "")); + + } + + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstOverLimit() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(1); + int totalServicePublisherEvents = 1000; + int initialDemand = 10; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + AtomicBoolean onErrorSet = new AtomicBoolean(false); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + onErrorSet.set(true); + servicePublisherTaskCompletionLatch.countDown(); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertTrue("onError should have triggered", onErrorSet.get()); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + } + private Scheduler getScheduler(ExecutorService executorService) { return Schedulers.from(executorService); } - private ExecutorService getSpiedExecutor() { - ExecutorService executorService = Executors.newFixedThreadPool(8, + private ExecutorService getTestExecutor() { + return Executors.newFixedThreadPool(8, new ThreadFactoryBuilder().setNameFormat("test-fanout-record-publisher-%04d").setDaemon(true).build()); + } + + private ExecutorService getSpiedExecutor(ExecutorService executorService) { return spy(executorService); } @@ -320,7 +603,15 @@ private ExecutorService getBlockingExecutor(ExecutorService executorService) { return executorService; } - private ExecutorService getOverwhelmedExecutor(ExecutorService executorService) { + private ExecutorService getInitiallyBlockingExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doCallRealMethod() + .when(executorService).execute(any()); + return executorService; + } + + private ExecutorService getOverwhelmedBlockingExecutor(ExecutorService executorService) { doAnswer(invocation -> directlyExecuteRunnable(invocation)) .doAnswer(invocation -> directlyExecuteRunnable(invocation)) .doAnswer(invocation -> directlyExecuteRunnable(invocation)) @@ -518,6 +809,175 @@ public void testContinuesAfterSequence() { } + @Test + public void testIfBufferingRecordsWithinCapacityPublishesOneEvent() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build; + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow)); + assertEquals(1, totalRecordsRetrieved[0]); + } + + @Test + public void testIfBufferingRecordsOverCapacityPublishesOneEventAndThrows() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build; + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + try { + IntStream.rangeClosed(1, 11).forEach( + i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow)); + fail("Should throw Queue full exception"); + } catch (IllegalStateException e) { + assertEquals("Queue full", e.getMessage()); + } + assertEquals(1, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherAlwaysPublishesWhenQueueIsEmpty() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // This makes sure the queue is immediately made empty, so that the next event enqueued will + // be the only element in the queue. + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier(), new CompletableFuture<>()); + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 137).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + assertEquals(137, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlow() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // This makes sure the queue is immediately made empty, so that the next event enqueued will + // be the only element in the queue. + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier(), new CompletableFuture<>()); + // Send stale event periodically + if(totalRecordsRetrieved[0] % 10 == 0) { + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"), + new CompletableFuture<>()); + } + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 100).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + assertEquals(100, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty() + throws InterruptedException { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + BlockingQueue ackQueue = new LinkedBlockingQueue<>(); + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // Enqueue the ack for bursty delivery + ackQueue.add(recordsRetrieved.batchUniqueIdentifier()); + // Send stale event periodically + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + BatchUniqueIdentifier batchUniqueIdentifierQueued; + int count = 0; + // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records + // delivered as expected. + while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) { + final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal, new CompletableFuture<>()); + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"), + new CompletableFuture<>()); + } + assertEquals(10, totalRecordsRetrieved[0]); + } + + @Test(expected = IllegalStateException.class) + public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + fanOutRecordsPublisher.setFlow(recordFlow); + final int[] totalRecordsRetrieved = { 0 }; + BlockingQueue ackQueue = new LinkedBlockingQueue<>(); + fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // Enqueue the ack for bursty delivery + ackQueue.add(recordsRetrieved.batchUniqueIdentifier()); + // Send stale event periodically + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + BatchUniqueIdentifier batchUniqueIdentifierQueued; + int count = 0; + // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records + // delivered as expected. + while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { + final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()), + new CompletableFuture<>()); + } + } + @Test public void acquireTimeoutTriggersLogMethodForActiveFlow() { AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false); @@ -668,6 +1128,32 @@ public void onComplete() { } } + @RequiredArgsConstructor + private static class BackpressureAdheringServicePublisher implements Runnable { + + private final Consumer action; + private final Integer numOfTimes; + private final CountDownLatch taskCompletionLatch; + private final Semaphore demandNotifier; + + BackpressureAdheringServicePublisher(Consumer action, Integer numOfTimes, + CountDownLatch taskCompletionLatch, Integer initialDemand) { + this(action, numOfTimes, taskCompletionLatch, new Semaphore(initialDemand)); + } + + public void request(int n) { + demandNotifier.release(n); + } + + public void run() { + for (int i = 1; i <= numOfTimes; ) { + demandNotifier.acquireUninterruptibly(); + action.accept(i++); + } + taskCompletionLatch.countDown(); + } + } + private Record makeRecord(String sequenceNumber) { return makeRecord(Integer.parseInt(sequenceNumber)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 57d550d14..e569921ac 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -71,6 +71,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -299,7 +300,7 @@ public void testNoDeadlockOnFullQueue() { Object lock = new Object(); - Subscriber subscriber = new Subscriber() { + Subscriber delegateSubscriber = new Subscriber() { Subscription sub; @Override @@ -334,6 +335,8 @@ public void onComplete() { } }; + Subscriber subscriber = new ShardConsumerNotifyingSubscriber(delegateSubscriber, getRecordsCache); + synchronized (lock) { log.info("Awaiting notification"); Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation()) From 5dd1adfb83fe80a7eed27b5f184e60a058d9209d Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 1 Aug 2019 01:27:40 -0700 Subject: [PATCH 06/13] Minor code changes. Note that the previous commit has blocking impl of PrefetchPublisher --- .../amazon/kinesis/common/DiagnosticUtils.java | 11 +++++++---- .../amazon/kinesis/retrieval/RecordsPublisher.java | 2 +- .../amazon/kinesis/retrieval/RecordsRetrieved.java | 2 +- .../retrieval/fanout/FanOutRecordsPublisher.java | 8 ++++++-- .../retrieval/fanout/FanOutRecordsPublisherTest.java | 2 +- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java index 1e2ee3556..ebdebb935 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -33,10 +33,13 @@ public class DiagnosticUtils { public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { final long durationBetweenEnqueueAndAckInMillis = Duration .between(enqueueTimestamp, Instant.now()).toMillis(); - if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 2) { - // TODO : Use DelayedDeliveryEvent to aggregate and notify on this delay event. - log.debug("{}: Record delivery time to shard consumer is high at {} millis", shardId, - durationBetweenEnqueueAndAckInMillis); + if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) { + // The above condition logs the warn msg if the delivery time exceeds 11 seconds. + log.warn( + "{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs" + + " to see the state of the executor service. Also check if the RecordProcessor's processing " + + "time is high. ", + shardId, durationBetweenEnqueueAndAckInMillis); } else if (log.isDebugEnabled()) { log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, durationBetweenEnqueueAndAckInMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index 032ec04ec..cd1e04f1a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -49,7 +49,7 @@ public interface RecordsPublisher extends Publisher { /** * Notify the publisher on receipt of a data event. - * @param ack + * @param ack acknowledgement received from the subscriber. */ default void notify(RecordsDeliveryAck ack) { throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index c349b6184..77ba17487 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -30,7 +30,7 @@ public interface RecordsRetrieved { /** * Returns the identifier that uniquely identifies this batch. * - * @return UUID + * @return batchUniqueIdentifier that uniquely identifies the records batch and its source. */ default BatchUniqueIdentifier batchUniqueIdentifier() { throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index ac31bce20..f3ba23c4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -74,7 +74,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final Object lockObject = new Object(); private final AtomicInteger subscribeToShardId = new AtomicInteger(0); - @Setter @VisibleForTesting private RecordFlow flow; @Getter @VisibleForTesting private String currentSequenceNumber; @@ -214,8 +213,13 @@ void setSubscriberForTesting(Subscriber s) { this.subscriber = s; } + @VisibleForTesting + void setFlowForTesting(RecordFlow flow) { + this.flow = flow; + } + @Data - private static class RecordsRetrievedContext { + private static final class RecordsRetrievedContext { private final RecordsRetrieved recordsRetrieved; private final RecordFlow recordFlow; private final Instant enqueueTimestamp; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index b0152cab0..7b4ec228d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -949,7 +949,7 @@ public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws Inter FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher.RecordFlow recordFlow = new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); - fanOutRecordsPublisher.setFlow(recordFlow); + fanOutRecordsPublisher.setFlowForTesting(recordFlow); final int[] totalRecordsRetrieved = { 0 }; BlockingQueue ackQueue = new LinkedBlockingQueue<>(); fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { From 6b2020698661434f28603b04dcc1ca650ca808b1 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 1 Aug 2019 18:21:04 -0700 Subject: [PATCH 07/13] Refactored the cleanup logic --- .../kinesis/retrieval/fanout/FanOutRecordsPublisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index f3ba23c4e..6e776be8e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -111,8 +111,6 @@ public void shutdown() { @Override public void restartFrom(RecordsRetrieved recordsRetrieved) { synchronized (lockObject) { - // Clear the delivery buffer so that current subscription don't yield duplicate records. - clearRecordsDeliveryQueue(); if (flow != null) { // // The flow should not be running at this time @@ -235,6 +233,8 @@ private boolean hasValidFlow() { private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { + // Clear the queue so that any stale entries from previous subscription are discarded. + clearRecordsDeliveryQueue(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; From a2e0104d9a2ce46e2c27796dc9792f3e29459f82 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 8 Aug 2019 12:26:44 -0700 Subject: [PATCH 08/13] Fix for Cloudwatch exception handling and other revioew comment fixes --- .../lifecycle/NotifyingSubscriber.java | 10 +++--- .../ShardConsumerNotifyingSubscriber.java | 4 +-- .../metrics/CloudWatchMetricsPublisher.java | 20 ++++++----- .../fanout/FanOutRecordsPublisher.java | 34 +++++++------------ .../fanout/FanOutRecordsPublisherTest.java | 30 +++++++--------- 5 files changed, 44 insertions(+), 54 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java index 5f4199ed2..f3599c714 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java @@ -36,14 +36,14 @@ public interface NotifyingSubscriber extends Subscriber { * Return the publisher to be notified * @return RecordsPublisher to be notified. */ - RecordsPublisher getWaitingRecordsPublisher(); + RecordsPublisher getRecordsPublisher(); /** - * Construct RecordsRetrievedAck object from the incoming data and return it + * Construct RecordsDeliveryAck object from the incoming data and return it * @param recordsRetrieved for which we need the ack. - * @return RecordsRetrievedAck + * @return getRecordsDeliveryAck */ - RecordsDeliveryAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved); + RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved); @Override default void onSubscribe(Subscription subscription) { @@ -52,7 +52,7 @@ default void onSubscribe(Subscription subscription) { @Override default void onNext(RecordsRetrieved recordsRetrieved) { - getWaitingRecordsPublisher().notify(getRecordsRetrievedAck(recordsRetrieved)); + getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); getDelegateSubscriber().onNext(recordsRetrieved); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java index 687c4e7ca..672c3d897 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java @@ -34,12 +34,12 @@ public Subscriber getDelegateSubscriber() { } @Override - public RecordsPublisher getWaitingRecordsPublisher() { + public RecordsPublisher getRecordsPublisher() { return recordsPublisher; } @Override - public RecordsDeliveryAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved) { + public RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved) { return () -> recordsRetrieved.batchUniqueIdentifier(); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java index f931cb322..03dd9c597 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java @@ -14,15 +14,16 @@ */ package software.amazon.kinesis.metrics; -import java.util.ArrayList; -import java.util.List; - import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; -import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit;d + /** * Publisher that contains the logic to publish metrics. */ @@ -30,6 +31,7 @@ public class CloudWatchMetricsPublisher { // CloudWatch API has a limit of 20 MetricDatums per request private static final int BATCH_SIZE = 20; + private static final int PUT_TIMEOUT_MILLIS = 5000; private final String namespace; private final CloudWatchAsyncClient cloudWatchClient; @@ -60,11 +62,11 @@ public void publishMetrics(List> dataToP request = request.metricData(metricData); try { - cloudWatchClient.putMetricData(request.build()); - - log.debug("Successfully published {} datums.", endIndex - startIndex); - } catch (CloudWatchException e) { - log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, e); + // This needs to be blocking. Making it asynchronous leads to increased throttling. + cloudWatchClient.putMetricData(request.build()).get(PUT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, + e instanceof ExecutionException ? e.getCause() : e); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 6e776be8e..3f1fad87d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -129,13 +129,12 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { @Override public void notify(RecordsDeliveryAck recordsDeliveryAck) { synchronized (lockObject) { - CompletableFuture triggeringFlowFuture = new CompletableFuture<>(); + RecordFlow triggeringFlow = null; try { - evictAckedEventAndScheduleNextEvent(recordsDeliveryAck, triggeringFlowFuture); + triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); } catch (Throwable t) { - errorOccurred(triggeringFlowFuture.getNow(null), t); + errorOccurred(triggeringFlow, t); } - final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null); if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); } @@ -144,11 +143,13 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. @VisibleForTesting - void evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck, - CompletableFuture triggeringFlowFuture) { + RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) { // Peek the head of the queue on receiving the ack. // Note : This does not block wait to retrieve an element. final RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.peek(); + // RecordFlow of the current event that needs to be returned + RecordFlow flowToBeReturned = null; + // Check if the ack corresponds to the head of the delivery queue. if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier() .equals(recordsDeliveryAck.batchUniqueIdentifier())) { @@ -159,7 +160,7 @@ void evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck, // Update current sequence number for the successfully delivered event. currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. - triggeringFlowFuture.complete(recordsRetrievedContext.getRecordFlow()); + flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue, if available. if (recordsDeliveryQueue.peek() != null) { subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); @@ -171,16 +172,17 @@ void evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck, if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() .equals(flow.getSubscribeToShardId())) { log.error( - "{}: KCL BUG: Publisher found mismatched ack for subscription {} ", + "{}: Received unexpected ack for the active subscription {}. Throwing. ", shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); - throw new IllegalStateException("KCL BUG: Record delivery ack mismatch"); + throw new IllegalStateException("Unexpected ack for the active subscription"); } // Otherwise publisher received a stale ack. else { - log.info("{}: Publisher received duplicate ack or an ack for stale subscription {}. Ignoring.", shardId, + log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); } } + return flowToBeReturned; } // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. @@ -201,21 +203,11 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, shardId, recordsDeliveryQueue.remainingCapacity()); throw e; } catch (Throwable t) { - recordsDeliveryQueue.remove(recordsRetrievedContext); + log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); throw t; } } - @VisibleForTesting - void setSubscriberForTesting(Subscriber s) { - this.subscriber = s; - } - - @VisibleForTesting - void setFlowForTesting(RecordFlow flow) { - this.flow = flow; - } - @Data private static final class RecordsRetrievedContext { private final RecordsRetrieved recordsRetrieved; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 7b4ec228d..1da65f2df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -816,7 +816,7 @@ public void testIfBufferingRecordsWithinCapacityPublishesOneEvent() { FanOutRecordsPublisher.RecordFlow recordFlow = new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001-001"); final int[] totalRecordsRetrieved = { 0 }; - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; @@ -835,7 +835,7 @@ public void testIfBufferingRecordsOverCapacityPublishesOneEventAndThrows() { FanOutRecordsPublisher.RecordFlow recordFlow = new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); final int[] totalRecordsRetrieved = { 0 }; - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; @@ -859,14 +859,14 @@ public void testIfPublisherAlwaysPublishesWhenQueueIsEmpty() { FanOutRecordsPublisher.RecordFlow recordFlow = new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); final int[] totalRecordsRetrieved = { 0 }; - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; // This makes sure the queue is immediately made empty, so that the next event enqueued will // be the only element in the queue. fanOutRecordsPublisher - .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier(), new CompletableFuture<>()); + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier()); } @Override public void onError(Throwable throwable) {} @Override public void onComplete() {} @@ -883,19 +883,18 @@ public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlow() { FanOutRecordsPublisher.RecordFlow recordFlow = new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); final int[] totalRecordsRetrieved = { 0 }; - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; // This makes sure the queue is immediately made empty, so that the next event enqueued will // be the only element in the queue. fanOutRecordsPublisher - .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier(), new CompletableFuture<>()); + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier()); // Send stale event periodically if(totalRecordsRetrieved[0] % 10 == 0) { fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( - () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"), - new CompletableFuture<>()); + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow")); } } @Override public void onError(Throwable throwable) {} @@ -915,7 +914,7 @@ public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliver new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); final int[] totalRecordsRetrieved = { 0 }; BlockingQueue ackQueue = new LinkedBlockingQueue<>(); - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; @@ -936,10 +935,9 @@ public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliver while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) { final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; fanOutRecordsPublisher - .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal, new CompletableFuture<>()); + .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal); fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( - () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"), - new CompletableFuture<>()); + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow")); } assertEquals(10, totalRecordsRetrieved[0]); } @@ -948,11 +946,10 @@ public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliver public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException { FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher.RecordFlow recordFlow = - new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); - fanOutRecordsPublisher.setFlowForTesting(recordFlow); + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "Shard-001-1"); final int[] totalRecordsRetrieved = { 0 }; BlockingQueue ackQueue = new LinkedBlockingQueue<>(); - fanOutRecordsPublisher.setSubscriberForTesting(new Subscriber() { + fanOutRecordsPublisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) {} @Override public void onNext(RecordsRetrieved recordsRetrieved) { totalRecordsRetrieved[0]++; @@ -973,8 +970,7 @@ public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws Inter while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( - () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()), - new CompletableFuture<>()); + () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier())); } } From ada1089007ab6e3eced30aadab368a47c8b305ad Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 8 Aug 2019 12:29:15 -0700 Subject: [PATCH 09/13] Typo fix --- .../amazon/kinesis/metrics/CloudWatchMetricsPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java index 03dd9c597..80712fdb1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit;d +import java.util.concurrent.TimeUnit; /** * Publisher that contains the logic to publish metrics. From 9ae07284215d9fe4d55fea367ffa79566411f2da Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 8 Aug 2019 13:37:29 -0700 Subject: [PATCH 10/13] Removing cloudwatch fix. Will be released in a separate commit. --- .../metrics/CloudWatchMetricsPublisher.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java index 80712fdb1..f931cb322 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricsPublisher.java @@ -14,16 +14,15 @@ */ package software.amazon.kinesis.metrics; +import java.util.ArrayList; +import java.util.List; + import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - /** * Publisher that contains the logic to publish metrics. */ @@ -31,7 +30,6 @@ public class CloudWatchMetricsPublisher { // CloudWatch API has a limit of 20 MetricDatums per request private static final int BATCH_SIZE = 20; - private static final int PUT_TIMEOUT_MILLIS = 5000; private final String namespace; private final CloudWatchAsyncClient cloudWatchClient; @@ -62,11 +60,11 @@ public void publishMetrics(List> dataToP request = request.metricData(metricData); try { - // This needs to be blocking. Making it asynchronous leads to increased throttling. - cloudWatchClient.putMetricData(request.build()).get(PUT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (Exception e) { - log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, - e instanceof ExecutionException ? e.getCause() : e); + cloudWatchClient.putMetricData(request.build()); + + log.debug("Successfully published {} datums.", endIndex - startIndex); + } catch (CloudWatchException e) { + log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, e); } } } From 4627b09e34e311f8223e038b2bcfab9c95946fce Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 14 Aug 2019 16:47:02 -0700 Subject: [PATCH 11/13] Changing RejectedTaskEvent log message for the release --- .../software/amazon/kinesis/coordinator/RejectedTaskEvent.java | 2 +- .../amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index d78b64b6c..7dc8dfaf5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -26,7 +26,7 @@ @KinesisClientInternalApi class RejectedTaskEvent implements DiagnosticEvent { private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " + - "Until next release, KCL will not be resilient to task rejections. "; + "Task rejections will slow down your application and some shards may stop processing. "; private ExecutorStateEvent executorStateEvent; private Throwable throwable; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 3f1fad87d..53c3d715b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -20,7 +20,6 @@ import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscriber; @@ -51,7 +50,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; From e0d388f069cad59138475bf73b8e2784ae04c4f3 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 14 Aug 2019 23:28:58 -0700 Subject: [PATCH 12/13] Added javadoc to RecordsDeliveryAck and optimized imports --- .../software/amazon/kinesis/retrieval/RecordsDeliveryAck.java | 3 +++ .../software/amazon/kinesis/retrieval/RecordsRetrieved.java | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java index c2dda0764..487e16370 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java @@ -15,6 +15,9 @@ package software.amazon.kinesis.retrieval; +/** + * Interface to supply all the meta information for record delivery ack. + */ public interface RecordsDeliveryAck { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index 77ba17487..f6f5bb7f4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -16,8 +16,6 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; -import java.util.UUID; - public interface RecordsRetrieved { /** From aaa3f8cf89f2cbe876f06d9b8de3d8c389a920c2 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 16 Aug 2019 12:01:31 -0700 Subject: [PATCH 13/13] Adding Kinesis Internal API tag for new concrete implementations --- .../java/software/amazon/kinesis/common/DiagnosticUtils.java | 2 ++ .../kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java | 2 ++ .../amazon/kinesis/retrieval/BatchUniqueIdentifier.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java index ebdebb935..db0393e15 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -16,12 +16,14 @@ package software.amazon.kinesis.common; import org.slf4j.Logger; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import java.time.Duration; import java.time.Instant; import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE; +@KinesisClientInternalApi public class DiagnosticUtils { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java index 672c3d897..3ef9fc1d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java @@ -17,10 +17,12 @@ import lombok.AllArgsConstructor; import org.reactivestreams.Subscriber; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; +@KinesisClientInternalApi @AllArgsConstructor public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java index ab7661710..0a0e2aeb6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java @@ -16,7 +16,9 @@ package software.amazon.kinesis.retrieval; import lombok.Data; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +@KinesisClientInternalApi @Data public class BatchUniqueIdentifier { private final String recordBatchIdentifier;