Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited threads resiliency fix durability nonblock #573

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.common;

import org.slf4j.Logger;

import java.time.Duration;
import java.time.Instant;

import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE;

public class DiagnosticUtils {

/**
* Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action.
* @param shardId of the shard that is having delayed delivery
* @param enqueueTimestamp of the event submitted to the executor service
* @param log Slf4j Logger from RecordPublisher to log the events
*/
public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) {
final long durationBetweenEnqueueAndAckInMillis = Duration
.between(enqueueTimestamp, Instant.now()).toMillis();
if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) {
// The above condition logs the warn msg if the delivery time exceeds 11 seconds.
log.warn(
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can we specify what to look for in ExecutorStateEvent logs and what actions to take? Something maybe like "Check the ExecutorStateEvent logs to see if many threads are running concurrently. Consider using the default configuration."

Also can we specify where to check for RecordProcessor running time and also what to do if it's too high?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm there are different executor states which might lead to this situation. This can happen even with deault executor. I feel it is better we leave it to the customer to evaluate from the state information available.

+ " to see the state of the executor service. Also check if the RecordProcessor's processing "
+ "time is high. ",
shardId, durationBetweenEnqueueAndAckInMillis);
} else if (log.isDebugEnabled()) {
log.debug("{}: Record delivery time to shard consumer is {} millis", shardId,
durationBetweenEnqueueAndAckInMillis);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.lifecycle;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;

/**
* Subscriber that notifies its publisher on receipt of the onNext event.
*/
public interface NotifyingSubscriber extends Subscriber<RecordsRetrieved> {

/**
* Return the actual subscriber to which the events needs to be delegated.
* @return Subscriber<T> to be delegated
*/
Subscriber<RecordsRetrieved> getDelegateSubscriber();

/**
* Return the publisher to be notified
* @return RecordsPublisher to be notified.
*/
RecordsPublisher getRecordsPublisher();

/**
* Construct RecordsDeliveryAck object from the incoming data and return it
* @param recordsRetrieved for which we need the ack.
* @return getRecordsDeliveryAck
*/
RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved);

@Override
default void onSubscribe(Subscription subscription) {
getDelegateSubscriber().onSubscribe(subscription);
}

@Override
default void onNext(RecordsRetrieved recordsRetrieved) {
getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved));
getDelegateSubscriber().onNext(recordsRetrieved);
}

@Override
default void onError(Throwable throwable) {
getDelegateSubscriber().onError(throwable);
}

@Override
default void onComplete() {
getDelegateSubscriber().onComplete();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.lifecycle;

import lombok.AllArgsConstructor;
import org.reactivestreams.Subscriber;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;

@AllArgsConstructor
public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber {

private final Subscriber<RecordsRetrieved> delegate;

private final RecordsPublisher recordsPublisher;

@Override
public Subscriber<RecordsRetrieved> getDelegateSubscriber() {
return delegate;
}

@Override
public RecordsPublisher getRecordsPublisher() {
return recordsPublisher;
}

@Override
public RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved) {
return () -> recordsRetrieved.batchUniqueIdentifier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void startSubscriptions() {
recordsPublisher.restartFrom(lastAccepted);
}
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(this);
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cooment: Maybe add a comment here to explain that the ShardConsumerNotifyingSubscriber is the new subscriber that we introduced the Ack mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new subscriber, but just a wrapper on top of the existing one.

}
}

Expand Down Expand Up @@ -215,4 +215,5 @@ public void cancel() {
subscription.cancel();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.retrieval;

import lombok.Data;

@Data
public class BatchUniqueIdentifier {
private final String recordBatchIdentifier;
private final String flowIdentifier;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.retrieval;

public interface RecordsDeliveryAck {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a @FunctionalInterface annotation here to make the usage in ShardConsumerNotifyingSubscriber a bit clearer?

Also a javadoc comment on this class would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might add more state information in future to this interface, which might need more than one abstract method. Added javadoc comment.


/**
* Unique record batch identifier used to ensure the durability and ordering guarantees.
* @return id that uniquely determines a record batch and its source.
*/
BatchUniqueIdentifier batchUniqueIdentifier();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.reactivestreams.Publisher;

import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/**
Expand Down Expand Up @@ -47,4 +46,12 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/
void shutdown();

/**
* Notify the publisher on receipt of a data event.
* @param ack acknowledgement received from the subscriber.
*/
default void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why is the desired default behavior to throw an exception here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why is the desired default behavior to throw an exception here?
I think this is because the logic for evictAckedEventAndScheduleNextEvent is implemented in FanOutRecordsPublisher, which extends this class. FanOutRecordsPublisher is the publisher that we are using and requires the Ack mechanism.

Copy link
Contributor Author

@ashwing ashwing Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to inform the NotifyingSubscriber that the Publisher it is subscribing to, has not implemented notify() method. Rather allowing it to be a no-op, we throw exception back.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;

import java.util.UUID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like you use this import in this file, maybe a remnant of earlier commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Removed.


public interface RecordsRetrieved {

/**
Expand All @@ -24,4 +26,13 @@ public interface RecordsRetrieved {
* @return the processRecordsInput received
*/
ProcessRecordsInput processRecordsInput();

/**
* Returns the identifier that uniquely identifies this batch.
*
* @return batchUniqueIdentifier that uniquely identifies the records batch and its source.
*/
default BatchUniqueIdentifier batchUniqueIdentifier() {
throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported");
}
}
Loading