Skip to content

Commit

Permalink
feat: Using simple duration args
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Feb 12, 2025
1 parent 8ac93b6 commit 69874e0
Show file tree
Hide file tree
Showing 20 changed files with 144 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.redis.riot.core;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -68,7 +67,7 @@ public abstract class AbstractJobCommand extends AbstractCallableCommand {
private String jobName;

@Option(names = "--repeat", description = "After the job completes keep repeating it on a fixed interval (ex 5m, 1h)", paramLabel = "<dur>")
private Duration repeatEvery;
private RiotDuration repeatEvery;

@ArgGroup(exclusive = false, heading = "Job options%n")
private StepArgs stepArgs = new StepArgs();
Expand Down Expand Up @@ -208,7 +207,7 @@ public void afterJob(JobExecution jobExecution) {

log.info("Finished job, will run again in {}", repeatEvery);
try {
Thread.sleep(repeatEvery.toMillis());
Thread.sleep(repeatEvery.getValue().toMillis());
if (lastJob == null) {
lastJob = job.build();
}
Expand Down Expand Up @@ -346,9 +345,9 @@ private <I, O> ItemWriter<? super O> writer(Step<I, O> step) {
log.info("Using no-op writer");
writer = new NoopItemWriter<>();
}
if (stepArgs.getSleep() > 0) {
log.info("Throttling writer with sleep={}ms", stepArgs.getSleep());
writer = new ThrottledItemWriter<>(writer, stepArgs.getSleep());
if (stepArgs.getSleep() != null) {
log.info("Throttling writer with sleep={}", stepArgs.getSleep());
writer = new ThrottledItemWriter<>(writer, stepArgs.getSleep().getValue().toMillis());
}
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.redis.riot.core;

import java.io.PrintWriter;
import java.time.Duration;
import java.util.concurrent.Callable;

import org.springframework.boot.convert.DurationStyle;
import org.springframework.util.unit.DataSize;

import picocli.CommandLine;
Expand Down Expand Up @@ -44,7 +42,7 @@ protected int executionStrategy(ParseResult parseResult) {
}

protected void registerConverters(CommandLine commandLine) {
commandLine.registerConverter(Duration.class, DurationStyle.SIMPLE::parse);
commandLine.registerConverter(RiotDuration.class, RiotDuration::parse);
commandLine.registerConverter(DataSize.class, DataSize::parse);
commandLine.registerConverter(Expression.class, Expression::parse);
commandLine.registerConverter(TemplateExpression.class, Expression::parseTemplate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package com.redis.riot.core;

import java.time.Duration;

import lombok.ToString;
import picocli.CommandLine.Option;

@ToString
public class ProgressArgs {

public static final Duration DEFAULT_UPDATE_INTERVAL = Duration.ofSeconds(1);
public static final RiotDuration DEFAULT_UPDATE_INTERVAL = RiotDuration.ofSeconds(1);
public static final ProgressStyle DEFAULT_STYLE = ProgressStyle.ASCII;

@Option(names = "--progress", description = "Progress style: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<style>")
private ProgressStyle style = DEFAULT_STYLE;

@Option(names = "--progress-rate", description = "Progress update interval in millis (default: ${DEFAULT-VALUE}).", paramLabel = "<dur>", hidden = true)
private Duration updateInterval = DEFAULT_UPDATE_INTERVAL;
@Option(names = "--progress-rate", description = "Progress update interval (default: ${DEFAULT-VALUE}).", paramLabel = "<dur>", hidden = true)
private RiotDuration updateInterval = DEFAULT_UPDATE_INTERVAL;

public ProgressStyle getStyle() {
return style;
Expand All @@ -25,11 +23,11 @@ public void setStyle(ProgressStyle style) {
this.style = style;
}

public Duration getUpdateInterval() {
public RiotDuration getUpdateInterval() {
return updateInterval;
}

public void setUpdateInterval(Duration interval) {
public void setUpdateInterval(RiotDuration interval) {
this.updateInterval = interval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public void beforeStep(StepExecution stepExecution) {
ProgressBarBuilder progressBarBuilder = new ProgressBarBuilder();
progressBarBuilder.setTaskName(step.getTaskName());
progressBarBuilder.setStyle(progressBarStyle());
progressBarBuilder.setUpdateIntervalMillis(Math.toIntExact(progressArgs.getUpdateInterval().toMillis()));
progressBarBuilder
.setUpdateIntervalMillis(Math.toIntExact(progressArgs.getUpdateInterval().getValue().toMillis()));
progressBarBuilder.showSpeed();
if (progressArgs.getStyle() == ProgressStyle.LOG) {
Logger logger = LoggerFactory.getLogger(getClass());
Expand Down
53 changes: 53 additions & 0 deletions core/riot-core/src/main/java/com/redis/riot/core/RiotDuration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.redis.riot.core;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.springframework.boot.convert.DurationStyle;

/**
* Wrapper around java.time.Duration with a custom toString
*/
public class RiotDuration {

private final Duration value;
private final ChronoUnit displayUnit;

public RiotDuration(long value, ChronoUnit unit) {
this(Duration.of(value, unit), unit);
}

public RiotDuration(Duration duration, ChronoUnit displayUnit) {
this.value = duration;
this.displayUnit = displayUnit;
}

public Duration getValue() {
return value;
}

@Override
public String toString() {
return DurationStyle.SIMPLE.print(value, displayUnit);
}

public static RiotDuration parse(String string) {
return new RiotDuration(DurationStyle.SIMPLE.parse(string), ChronoUnit.MILLIS);
}

public static RiotDuration of(long value, ChronoUnit unit) {
return new RiotDuration(value, unit);
}

public static RiotDuration of(Duration duration, ChronoUnit unit) {
return new RiotDuration(duration, unit);
}

public static RiotDuration ofSeconds(long seconds) {
return new RiotDuration(seconds, ChronoUnit.SECONDS);
}

public static RiotDuration ofMillis(long millis) {
return new RiotDuration(millis, ChronoUnit.MILLIS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public class StepArgs {
public static final SkipPolicy DEFAULT_SKIP_POLICY = SkipPolicy.NEVER;
public static final int DEFAULT_RETRY_LIMIT = MaxAttemptsRetryPolicy.DEFAULT_MAX_ATTEMPTS;

@Option(names = "--sleep", description = "Duration in millis to wait after each batch write (default: no sleep).", paramLabel = "<ms>")
private long sleep;
@Option(names = "--sleep", description = "Duration to wait after each batch write, e.g. 1ms or 3s (default: no sleep).", paramLabel = "<dur>")
private RiotDuration sleep;

@Option(names = "--threads", description = "Number of concurrent threads to use for batch processing (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int threads = DEFAULT_THREADS;
Expand All @@ -42,11 +42,11 @@ public class StepArgs {
@ArgGroup(exclusive = false)
private ProgressArgs progressArgs = new ProgressArgs();

public long getSleep() {
public RiotDuration getSleep() {
return sleep;
}

public void setSleep(long sleep) {
public void setSleep(RiotDuration sleep) {
this.sleep = sleep;
}

Expand Down
12 changes: 0 additions & 12 deletions core/riot-faker/riot-faker.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,3 @@ dependencies {
implementation 'org.springframework.batch:spring-batch-infrastructure'
api group: 'net.datafaker', name: 'datafaker', version: datafakerVersion
}

compileJava {
options.compilerArgs += ["-AprojectPath=${project.group}/${project.name}"]
}

if (!(project.findProperty('automatic.module.name.skip') ?: false).toBoolean()) {
jar {
manifest {
attributes('Automatic-Module-Name': project.findProperty('automatic.module.name'))
}
}
}
12 changes: 0 additions & 12 deletions core/riot-file/riot-file.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,3 @@ dependencies {
api group: 'com.redis', name: 'spring-batch-resource', version: springBatchRedisVersion
testImplementation 'org.slf4j:slf4j-simple'
}

compileJava {
options.compilerArgs += ["-AprojectPath=${project.group}/${project.name}"]
}

if (!(project.findProperty('automatic.module.name.skip') ?: false).toBoolean()) {
jar {
manifest {
attributes('Automatic-Module-Name': project.findProperty('automatic.module.name'))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.StringTokenizer;
import java.util.Vector;

Expand All @@ -11,14 +10,15 @@
import org.slf4j.simple.SimpleLogger;

import com.redis.riot.core.MainCommand;
import com.redis.riot.core.RiotDuration;
import com.redis.spring.batch.test.AbstractTargetTestBase;

import picocli.CommandLine.IExecutionStrategy;
import picocli.CommandLine.ParseResult;

public abstract class AbstractRiotTestBase extends AbstractTargetTestBase {

public static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(1);
public static final RiotDuration DEFAULT_IDLE_TIMEOUT = RiotDuration.ofSeconds(1);
public static final int DEFAULT_EVENT_QUEUE_CAPACITY = 100000;

static {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.redis.riot;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;

import org.springframework.batch.item.ItemProcessor;
Expand All @@ -9,6 +9,7 @@
import org.springframework.util.Assert;

import com.redis.riot.CompareStatusItemWriter.StatusCount;
import com.redis.riot.core.RiotDuration;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.Step;
import com.redis.riot.function.StringKeyValue;
Expand All @@ -27,7 +28,8 @@

public abstract class AbstractReplicateCommand extends AbstractRedisTargetExportCommand {

public static final Duration DEFAULT_TTL_TOLERANCE = DefaultKeyComparator.DEFAULT_TTL_TOLERANCE;
public static final RiotDuration DEFAULT_TTL_TOLERANCE = RiotDuration.of(DefaultKeyComparator.DEFAULT_TTL_TOLERANCE,
ChronoUnit.SECONDS);
public static final boolean DEFAULT_COMPARE_STREAM_MESSAGE_ID = true;

private static final String COMPARE_TASK_NAME = "Comparing";
Expand All @@ -36,8 +38,8 @@ public abstract class AbstractReplicateCommand extends AbstractRedisTargetExport
@Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.")
private boolean showDiffs;

@Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private long ttlToleranceMillis = DEFAULT_TTL_TOLERANCE.toMillis();
@Option(names = "--ttl-tolerance", description = "Max TTL delta to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<dur>")
private RiotDuration ttlTolerance = DEFAULT_TTL_TOLERANCE;

@ArgGroup(exclusive = false)
private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs();
Expand Down Expand Up @@ -121,11 +123,10 @@ protected KeyComparisonItemReader<byte[], byte[]> compareReader() {

private KeyComparator<byte[]> keyComparator() {
boolean ignoreStreamId = isIgnoreStreamMessageId();
Duration ttlTolerance = Duration.ofMillis(ttlToleranceMillis);
log.info("Creating KeyComparator with ttlTolerance={} ignoreStreamMessageId={}", ttlTolerance, ignoreStreamId);
DefaultKeyComparator<byte[], byte[]> comparator = new DefaultKeyComparator<>(ByteArrayCodec.INSTANCE);
comparator.setIgnoreStreamMessageId(ignoreStreamId);
comparator.setTtlTolerance(ttlTolerance);
comparator.setTtlTolerance(ttlTolerance.getValue());
return comparator;
}

Expand Down Expand Up @@ -153,12 +154,12 @@ public void setShowDiffs(boolean showDiffs) {
this.showDiffs = showDiffs;
}

public long getTtlToleranceMillis() {
return ttlToleranceMillis;
public RiotDuration getTtlTolerance() {
return ttlTolerance;
}

public void setTtlToleranceMillis(long tolerance) {
this.ttlToleranceMillis = tolerance;
public void setTtlTolerance(RiotDuration tolerance) {
this.ttlTolerance = tolerance;
}

public KeyValueProcessorArgs getProcessorArgs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected JdbcCursorItemReader<Map<String, Object>> reader() {
reader.rowMapper(new ColumnMapRowMapper());
reader.fetchSize(readerArgs.getFetchSize());
reader.maxRows(readerArgs.getMaxRows());
reader.queryTimeout(readerArgs.getQueryTimeout());
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
if (readerArgs.getMaxItemCount() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.springframework.batch.item.database.AbstractCursorItemReader;

import com.redis.riot.core.RiotDuration;

import lombok.ToString;
import picocli.CommandLine.Option;

Expand All @@ -10,7 +12,8 @@ public class DatabaseReaderArgs {

public static final int DEFAULT_FETCH_SIZE = AbstractCursorItemReader.VALUE_NOT_SET;
public static final int DEFAULT_MAX_RESULT_SET_ROWS = AbstractCursorItemReader.VALUE_NOT_SET;
public static final int DEFAULT_QUERY_TIMEOUT = AbstractCursorItemReader.VALUE_NOT_SET;
public static final RiotDuration DEFAULT_QUERY_TIMEOUT = RiotDuration
.ofMillis(AbstractCursorItemReader.VALUE_NOT_SET);

@Option(names = "--max", description = "Max number of rows to import.", paramLabel = "<count>")
private int maxItemCount;
Expand All @@ -21,8 +24,8 @@ public class DatabaseReaderArgs {
@Option(names = "--rows", description = "Max number of rows the ResultSet can contain.", paramLabel = "<count>")
private int maxRows = DEFAULT_MAX_RESULT_SET_ROWS;

@Option(names = "--query-timeout", description = "The time in milliseconds for the query to timeout.", paramLabel = "<ms>")
private int queryTimeout = DEFAULT_QUERY_TIMEOUT;
@Option(names = "--query-timeout", description = "The duration for the query to timeout.", paramLabel = "<dur>")
private RiotDuration queryTimeout = DEFAULT_QUERY_TIMEOUT;

@Option(names = "--shared-connection", description = "Use same connection for cursor and other processing.", hidden = true)
private boolean useSharedExtendedConnection;
Expand Down Expand Up @@ -59,11 +62,11 @@ public void setMaxRows(int maxResultSetRows) {
this.maxRows = maxResultSetRows;
}

public int getQueryTimeout() {
public RiotDuration getQueryTimeout() {
return queryTimeout;
}

public void setQueryTimeout(int queryTimeout) {
public void setQueryTimeout(RiotDuration queryTimeout) {
this.queryTimeout = queryTimeout;
}

Expand Down
9 changes: 5 additions & 4 deletions plugins/riot/src/main/java/com/redis/riot/RedisArgs.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.redis.riot;

import java.io.File;
import java.time.Duration;

import com.redis.riot.core.RiotDuration;

import io.lettuce.core.RedisURI;
import io.lettuce.core.protocol.ProtocolVersion;
Expand Down Expand Up @@ -34,7 +35,7 @@ public class RedisArgs implements RedisClientArgs {
private char[] password;

@Option(names = "--timeout", description = "Redis command timeout, e.g. 30s or 5m (default: ${DEFAULT-VALUE}).", paramLabel = "<dur>")
private Duration timeout = DEFAULT_TIMEOUT;
private RiotDuration timeout = DEFAULT_TIMEOUT;

@Option(names = { "-n", "--db" }, description = "Redis database number.", paramLabel = "<db>")
private int database = DEFAULT_DATABASE;
Expand Down Expand Up @@ -228,11 +229,11 @@ public void setPassword(char[] password) {
}

@Override
public Duration getTimeout() {
public RiotDuration getTimeout() {
return timeout;
}

public void setTimeout(Duration timeout) {
public void setTimeout(RiotDuration timeout) {
this.timeout = timeout;
}

Expand Down
Loading

0 comments on commit 69874e0

Please sign in to comment.