Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposed convenience method of `ExtendedSequenceNumber#isSentinelCheck… #1053

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ private boolean checkAndSubmitNextTask() {

private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
if (exception != null || taskResult.getException() != null) {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
log.error("Caught exception running {} task: {}", currentTask.taskType(),
exception != null ? exception : taskResult.getException());
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// submitShardSyncTask is invoked, before completion stage exits (future completes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
* and begun processing its child shards.
*
* <p>NOTE: This class is deprecated and will be removed in a future release.</p>
*/
@Deprecated
public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
private static final boolean garbageCollectLeases = true;

/**
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ProcessTask(@NonNull ShardInfo shardInfo,
*/
@Override
public TaskResult call() {
/**
/*
* NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
* which is the app-level MillsBehindLatest metric.
Expand Down Expand Up @@ -180,8 +180,6 @@ public TaskResult call() {
}
}



private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> records) {
if (shard == null) {
return aggregatorUtil.deaggregate(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package software.amazon.kinesis.retrieval.kpl;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

//import com.amazonaws.services.kinesis.clientlibrary.lib.worker.String;
import lombok.EqualsAndHashCode;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;

/**
Expand All @@ -28,10 +32,8 @@
* user record therefore has an integer sub-sequence number, in addition to the
* regular sequence number of the Kinesis record. The sub-sequence number is
* used to checkpoint within an aggregated record.
*
* @author daphnliu
*
*/
@EqualsAndHashCode
public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber> {
private final String sequenceNumber;
private final long subSequenceNumber;
Expand Down Expand Up @@ -65,6 +67,15 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
public static final ExtendedSequenceNumber AT_TIMESTAMP =
new ExtendedSequenceNumber(SentinelCheckpoint.AT_TIMESTAMP.toString());

/**
* Cache of {@link SentinelCheckpoint} values that avoids expensive
* try-catch and Exception handling.
*
* @see #isSentinelCheckpoint()
*/
private static final Set<String> SENTINEL_VALUES = Collections.unmodifiableSet(
Arrays.stream(SentinelCheckpoint.values()).map(SentinelCheckpoint::name).collect(Collectors.toSet()));

/**
* Construct an ExtendedSequenceNumber. The sub-sequence number defaults to
* 0.
Expand All @@ -87,7 +98,7 @@ public ExtendedSequenceNumber(String sequenceNumber) {
*/
public ExtendedSequenceNumber(String sequenceNumber, Long subSequenceNumber) {
this.sequenceNumber = sequenceNumber;
this.subSequenceNumber = subSequenceNumber == null ? 0 : subSequenceNumber.longValue();
this.subSequenceNumber = subSequenceNumber == null ? 0L : subSequenceNumber;
}

/**
Expand All @@ -104,7 +115,7 @@ public ExtendedSequenceNumber(String sequenceNumber, Long subSequenceNumber) {
public int compareTo(ExtendedSequenceNumber extendedSequenceNumber) {
String secondSequenceNumber = extendedSequenceNumber.sequenceNumber();

if (!isDigitsOrSentinelValue(sequenceNumber) || !isDigitsOrSentinelValue(secondSequenceNumber)) {
if (!isDigitsOrSentinelValue(this) || !isDigitsOrSentinelValue(extendedSequenceNumber)) {
throw new IllegalArgumentException("Expected a sequence number or a sentinel checkpoint value but "
+ "received: first=" + sequenceNumber + " and second=" + secondSequenceNumber);
}
Expand Down Expand Up @@ -141,57 +152,24 @@ public long subSequenceNumber() {
return subSequenceNumber;
}


public boolean isShardEnd() {
return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString());
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append('{');
if (sequenceNumber() != null) {
sb.append("SequenceNumber: " + sequenceNumber() + ",");
sb.append("SequenceNumber: ").append(sequenceNumber()).append(',');
}
if (subSequenceNumber >= 0) {
sb.append("SubsequenceNumber: " + subSequenceNumber());
sb.append("SubsequenceNumber: ").append(subSequenceNumber());
}
sb.append("}");
sb.append('}');
return sb.toString();
}

@Override
public int hashCode() {
final int prime = 31;
final int shift = 32;
int hashCode = 1;
hashCode = prime * hashCode + ((sequenceNumber == null) ? 0 : sequenceNumber.hashCode());
hashCode = prime * hashCode + ((subSequenceNumber < 0)
? 0
: (int) (subSequenceNumber ^ (subSequenceNumber >>> shift)));
return hashCode;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}

if (!(obj instanceof ExtendedSequenceNumber)) {
return false;
}
ExtendedSequenceNumber other = (ExtendedSequenceNumber) obj;

if (!sequenceNumber.equals(other.sequenceNumber())) {
return false;
}
return subSequenceNumber == other.subSequenceNumber();
}

/**
* Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special
* logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map
Expand All @@ -217,30 +195,23 @@ private static BigInteger bigIntegerValue(String sequenceNumber) {
}

/**
* Checks if the string is all digits or one of the SentinelCheckpoint values.
* Checks if a sequence number is all digits or a {@link SentinelCheckpoint}.
*
* @param string
* @param esn {@code ExtendedSequenceNumber} to validate its sequence number
* @return true if and only if the string is all digits or one of the SentinelCheckpoint values
*/
private static boolean isDigitsOrSentinelValue(String string) {
return isDigits(string) || isSentinelValue(string);
private static boolean isDigitsOrSentinelValue(final ExtendedSequenceNumber esn) {
return isDigits(esn.sequenceNumber()) || esn.isSentinelCheckpoint();
}

/**
* Checks if the string is a SentinelCheckpoint value.
*
* @param string
* @return true if and only if the string can be converted to a SentinelCheckpoint
* Returns true if-and-only-if the sequence number is a {@link SentinelCheckpoint}.
* Subsequence numbers are ignored when making this determination.
*/
private static boolean isSentinelValue(String string) {
try {
SentinelCheckpoint.valueOf(string);
return true;
} catch (Exception e) {
return false;
}
public boolean isSentinelCheckpoint() {
return SENTINEL_VALUES.contains(sequenceNumber);
}

/**
* Checks if the string is composed of only digits.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.kpl;

import static org.junit.Assert.assertTrue;

import org.junit.Test;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;

public class ExtendedSequenceNumberTest {

@Test
public void testSentinelCheckpoints() {
for (final SentinelCheckpoint sentinel : SentinelCheckpoint.values()) {
final ExtendedSequenceNumber esn = new ExtendedSequenceNumber(sentinel.name());
assertTrue(sentinel.name(), esn.isSentinelCheckpoint());

// For backwards-compatibility, sentinels should ignore subsequences
final ExtendedSequenceNumber esnWithSubsequence = new ExtendedSequenceNumber(sentinel.name(), 42L);
assertTrue(sentinel.name(), esnWithSubsequence.isSentinelCheckpoint());
}
}

}