Skip to content

Commit

Permalink
feat: Allow optional seq for end-of-sequence marker
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Jul 26, 2021
1 parent 44728e8 commit 3b89293
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -242,9 +243,10 @@ private Tuple2<Long, Map.Entry<UUID, ByteString>> fromResult(final IonValue resu
}

@Override
public Observable<Map.Entry<UUID, ByteString>> from(final Long offset) {
public Observable<Map.Entry<UUID, ByteString>> from(final Optional<Long> offset) {
tables.checkTables();
var readSeq = new QldbTxSeq(offset);

final var readSeq = getQldbTxSeq(offset);
var queryPattern = "select s.%s,d.%s,d.%s from %s as d BY d_id, %s as s where d_id = s.%s and s.%s in ( %s )";
return Observable.interval(pollInterval, TimeUnit.MILLISECONDS).map(x -> readSeq.peekRange(pageSize))
.flatMap(seq -> driver.execute(tx -> {
Expand All @@ -263,4 +265,14 @@ public Observable<Map.Entry<UUID, ByteString>> from(final Long offset) {
}));
}

private QldbTxSeq getQldbTxSeq(final Optional<Long> offset) {
QldbTxSeq readSeq;
if (offset.isEmpty()) {
readSeq = new QldbTxSeq(this.seqSource.peekNext());
} else {
readSeq = new QldbTxSeq(offset.get());
}
return readSeq;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
* A long sequence, initialisable from a QLDB tx log sequence table or an explicit point.
*/
public final class QldbTxSeq implements SeqSource<Long> {
/**
* Marker for the start of a sequence.
*/
private long current;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.blockchaintp.daml.stores.service;

import java.util.Map;
import java.util.Optional;

import io.reactivex.rxjava3.core.Observable;

Expand All @@ -31,7 +32,8 @@ public interface TransactionLogReader<I, K, V> {
* Stream committed log entries starting at offset.
*
* @param offset
* - use None to start at the last commit
* @return A stream of comitted log entires.
*/
Observable<Map.Entry<K, V>> from(I offset);
Observable<Map.Entry<K, V>> from(Optional<I> offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.qldb.QldbDriver;

import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -79,7 +80,7 @@ final void committed_transactions_are_read_in_commit_order() throws StoreWriteEx
txLog.commit(id);
}

var stream = txLog.from(0L);
var stream = txLog.from(Optional.of(0L));

Assertions.assertIterableEquals(ids,
stream.take(30).map(x -> x.getKey()).collect(Collectors.toList()).blockingGet());
Expand Down

0 comments on commit 3b89293

Please sign in to comment.