-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limited threads resiliency fix durability nonblock #573
Changes from 10 commits
d1f188a
9f7cec6
cd8307b
ff51d64
27d9c5c
5dd1adf
6b20206
a2e0104
ada1089
9ae0728
4627b09
e0d388f
aaa3f8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.common; | ||
|
||
import org.slf4j.Logger; | ||
|
||
import java.time.Duration; | ||
import java.time.Instant; | ||
|
||
import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE; | ||
|
||
public class DiagnosticUtils { | ||
|
||
/** | ||
* Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action. | ||
* @param shardId of the shard that is having delayed delivery | ||
* @param enqueueTimestamp of the event submitted to the executor service | ||
* @param log Slf4j Logger from RecordPublisher to log the events | ||
*/ | ||
public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { | ||
final long durationBetweenEnqueueAndAckInMillis = Duration | ||
.between(enqueueTimestamp, Instant.now()).toMillis(); | ||
if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) { | ||
// The above condition logs the warn msg if the delivery time exceeds 11 seconds. | ||
log.warn( | ||
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs" | ||
+ " to see the state of the executor service. Also check if the RecordProcessor's processing " | ||
+ "time is high. ", | ||
shardId, durationBetweenEnqueueAndAckInMillis); | ||
} else if (log.isDebugEnabled()) { | ||
log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, | ||
durationBetweenEnqueueAndAckInMillis); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.lifecycle; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import software.amazon.kinesis.retrieval.RecordsPublisher; | ||
import software.amazon.kinesis.retrieval.RecordsRetrieved; | ||
import software.amazon.kinesis.retrieval.RecordsDeliveryAck; | ||
|
||
/** | ||
* Subscriber that notifies its publisher on receipt of the onNext event. | ||
*/ | ||
public interface NotifyingSubscriber extends Subscriber<RecordsRetrieved> { | ||
|
||
/** | ||
* Return the actual subscriber to which the events needs to be delegated. | ||
* @return Subscriber<T> to be delegated | ||
*/ | ||
Subscriber<RecordsRetrieved> getDelegateSubscriber(); | ||
|
||
/** | ||
* Return the publisher to be notified | ||
* @return RecordsPublisher to be notified. | ||
*/ | ||
RecordsPublisher getRecordsPublisher(); | ||
|
||
/** | ||
* Construct RecordsDeliveryAck object from the incoming data and return it | ||
* @param recordsRetrieved for which we need the ack. | ||
* @return getRecordsDeliveryAck | ||
*/ | ||
RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved); | ||
|
||
@Override | ||
default void onSubscribe(Subscription subscription) { | ||
getDelegateSubscriber().onSubscribe(subscription); | ||
} | ||
|
||
@Override | ||
default void onNext(RecordsRetrieved recordsRetrieved) { | ||
getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); | ||
getDelegateSubscriber().onNext(recordsRetrieved); | ||
} | ||
|
||
@Override | ||
default void onError(Throwable throwable) { | ||
getDelegateSubscriber().onError(throwable); | ||
} | ||
|
||
@Override | ||
default void onComplete() { | ||
getDelegateSubscriber().onComplete(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.lifecycle; | ||
|
||
import lombok.AllArgsConstructor; | ||
import org.reactivestreams.Subscriber; | ||
import software.amazon.kinesis.retrieval.RecordsPublisher; | ||
import software.amazon.kinesis.retrieval.RecordsRetrieved; | ||
import software.amazon.kinesis.retrieval.RecordsDeliveryAck; | ||
|
||
@AllArgsConstructor | ||
public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { | ||
|
||
private final Subscriber<RecordsRetrieved> delegate; | ||
|
||
private final RecordsPublisher recordsPublisher; | ||
|
||
@Override | ||
public Subscriber<RecordsRetrieved> getDelegateSubscriber() { | ||
return delegate; | ||
} | ||
|
||
@Override | ||
public RecordsPublisher getRecordsPublisher() { | ||
return recordsPublisher; | ||
} | ||
|
||
@Override | ||
public RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved) { | ||
return () -> recordsRetrieved.batchUniqueIdentifier(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,7 +77,7 @@ void startSubscriptions() { | |
recordsPublisher.restartFrom(lastAccepted); | ||
} | ||
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) | ||
.subscribe(this); | ||
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor cooment: Maybe add a comment here to explain that the ShardConsumerNotifyingSubscriber is the new subscriber that we introduced the Ack mechanism. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a new subscriber, but just a wrapper on top of the existing one. |
||
} | ||
} | ||
|
||
|
@@ -215,4 +215,5 @@ public void cancel() { | |
subscription.cancel(); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.retrieval; | ||
|
||
import lombok.Data; | ||
|
||
@Data | ||
public class BatchUniqueIdentifier { | ||
private final String recordBatchIdentifier; | ||
private final String flowIdentifier; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.retrieval; | ||
|
||
public interface RecordsDeliveryAck { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a Also a javadoc comment on this class would be helpful There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might add more state information in future to this interface, which might need more than one abstract method. Added javadoc comment. |
||
|
||
/** | ||
* Unique record batch identifier used to ensure the durability and ordering guarantees. | ||
* @return id that uniquely determines a record batch and its source. | ||
*/ | ||
BatchUniqueIdentifier batchUniqueIdentifier(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
import org.reactivestreams.Publisher; | ||
|
||
import software.amazon.kinesis.common.InitialPositionInStreamExtended; | ||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; | ||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
/** | ||
|
@@ -47,4 +46,12 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> { | |
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. | ||
*/ | ||
void shutdown(); | ||
|
||
/** | ||
* Notify the publisher on receipt of a data event. | ||
* @param ack acknowledgement received from the subscriber. | ||
*/ | ||
default void notify(RecordsDeliveryAck ack) { | ||
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my understanding: why is the desired default behavior to throw an exception here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to inform the NotifyingSubscriber that the Publisher it is subscribing to, has not implemented notify() method. Rather allowing it to be a no-op, we throw exception back. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; | ||
|
||
import java.util.UUID; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't look like you use this import in this file, maybe a remnant of earlier commit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Removed. |
||
|
||
public interface RecordsRetrieved { | ||
|
||
/** | ||
|
@@ -24,4 +26,13 @@ public interface RecordsRetrieved { | |
* @return the processRecordsInput received | ||
*/ | ||
ProcessRecordsInput processRecordsInput(); | ||
|
||
/** | ||
* Returns the identifier that uniquely identifies this batch. | ||
* | ||
* @return batchUniqueIdentifier that uniquely identifies the records batch and its source. | ||
*/ | ||
default BatchUniqueIdentifier batchUniqueIdentifier() { | ||
throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Can we specify what to look for in ExecutorStateEvent logs and what actions to take? Something maybe like "Check the ExecutorStateEvent logs to see if many threads are running concurrently. Consider using the default configuration."
Also can we specify where to check for RecordProcessor running time and also what to do if it's too high?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm there are different executor states which might lead to this situation. This can happen even with deault executor. I feel it is better we leave it to the customer to evaluate from the state information available.