Skip to content

Commit

Permalink
Merge pull request #804 from kevioke/init-pos-timestamp
Browse files Browse the repository at this point in the history
Allow InitialPositionInStreamExtended to be specified in properties file
  • Loading branch information
AvinashChowdary authored Jun 1, 2021
2 parents 968b47a + afee84d commit 4e9e86e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -27,6 +28,7 @@
import java.util.function.Function;

import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter;
Expand Down Expand Up @@ -197,6 +199,14 @@ public MultiLangDaemonConfiguration(BeanUtilsBean utilsBean, ConvertUtilsBean co
this.utilsBean = utilsBean;
this.convertUtilsBean = convertUtilsBean;

convertUtilsBean.register(new Converter() {
@Override
public <T> T convert(Class<T> type, Object value) {
Date date = new Date(Long.parseLong(value.toString()) * 1000L);
return type.cast(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(date));
}
}, InitialPositionInStreamExtended.class);

convertUtilsBean.register(new Converter() {
@Override
public <T> T convert(Class<T> type, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import org.apache.commons.lang3.StringUtils;
import org.junit.Ignore;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -45,11 +45,8 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

Expand Down Expand Up @@ -93,6 +90,47 @@ public void testWithLongVariables() {
assertEquals(config.getShardSyncIntervalMillis(), 500);
}

@Test
public void testWithInitialPositionInStreamExtended() {
long epochTimeInSeconds = 1617406032;
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"initialPositionInStreamExtended = " + epochTimeInSeconds}, '\n'));

assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(epochTimeInSeconds * 1000L));
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
}

@Test
public void testInvalidInitialPositionInStream() {
// AT_TIMESTAMP cannot be used as initialPositionInStream. If a user wants to specify AT_TIMESTAMP,
// they must specify the time with initialPositionInStreamExtended.
try {
getConfiguration(StringUtils.join(new String[] { "applicationName = app",
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"initialPositionInStream = AT_TIMESTAMP"}, '\n'));
fail("Should have thrown when initialPositionInStream is set to AT_TIMESTAMP");
} catch (Exception e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
assertTrue(rootCause instanceof IllegalArgumentException);
}
}

@Test
public void testInvalidInitialPositionInStreamExtended() {
// initialPositionInStreamExtended takes a long value indicating seconds since epoch. If a non-long
// value is provided, the constructor should throw an IllegalArgumentException exception.
try {
getConfiguration(StringUtils.join(new String[] { "applicationName = app",
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
"initialPositionInStreamExtended = null"}, '\n'));
fail("Should have thrown when initialPositionInStreamExtended is set to null");
} catch (Exception e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
assertTrue(rootCause instanceof IllegalArgumentException);
}
}

@Test
public void testWithUnsupportedClientConfigurationVariables() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
Expand Down Expand Up @@ -159,14 +197,23 @@ public void testWithSetVariables() {
}

@Test
public void testWithInitialPositionInStreamVariables() {
public void testWithInitialPositionInStreamTrimHorizon() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
"initialPositionInStream = TriM_Horizon" }, '\n'));

assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
}

@Test
public void testWithInitialPositionInStreamLatest() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
"initialPositionInStream = LateSt" }, '\n'));

assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST);
}

@Test
public void testSkippingNonKCLVariables() {
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
Expand Down

0 comments on commit 4e9e86e

Please sign in to comment.