Skip to content

Commit

Permalink
fix: Guarantee commit order on begin, rather than commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Sep 15, 2021
1 parent 84fbb0d commit 276d3f0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@

import io.vavr.CheckedFunction0;
import io.vavr.CheckedRunnable;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;

/**
* A PostgresStore is a store backed by a postgres interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class QldbTransactionLog implements TransactionLog<UUID, ByteString
private static final Long PAGE_SIZE = 100L;
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(QldbTransactionLog.class);
private static final String ID_FIELD = "i";
private static final String COMPLETE_FIELD = "c";
private static final String SEQ_FIELD = "s";
private static final String DOCID_FIELD = "d";
private static final String DATA_FIELD = "v";
Expand Down Expand Up @@ -106,19 +107,42 @@ public QldbTransactionLog(final String tableName, final QldbDriver theDriver, fi
@Override
public UUID begin() throws StoreWriteException {
tables.checkTables();
ensureSequence();
var next = seqSource.takeNext();
var uuid = UUID.randomUUID();
LOG.info("Begin transaction {}", () -> uuid);
try {
var uuidBytes = UuidConverter.asBytes(uuid);
var uuidBytes = UuidConverter.asBytes(uuid);

try {
driver.execute(tx -> {
var struct = ion.newEmptyStruct();
struct.add(ID_FIELD, ion.newBlob(uuidBytes));
tx.execute(String.format("insert into %s value ?", txLogTable), struct);
struct.add(COMPLETE_FIELD, ion.newBool(false));
var query = String.format("insert into %s value ?", txLogTable);
var r = tx.execute(query, struct);

if (r.isEmpty()) {
throw create("", QldbTransactionException.noMetadata(query));
}

var metaData = (IonStruct) r.iterator().next();
var docid = metaData.get("documentId");

if (docid == null || docid instanceof IonNull) {
throw create("", QldbTransactionException.invalidSchema(metaData));
}

var seq = ion.newEmptyStruct();
seq.add(SEQ_FIELD, ion.newInt(next));
seq.add(DOCID_FIELD, ion.newString(((IonString) docid).stringValue()));

tx.execute(String.format("insert into %s value ?", seqTable), seq);

});
} catch (QldbException e) {
throw new StoreWriteException(e);
}

return uuid;
}

Expand Down Expand Up @@ -176,42 +200,45 @@ private Long ensureSequence() throws QldbSessionException {
@Override
public Long commit(final UUID txId) throws StoreWriteException {
tables.checkTables();
ensureSequence();
var next = seqSource.takeNext();
LOG.info("Commit transaction {} as seq {}", () -> txId, () -> next);
LOG.info("Commit transaction {}", () -> txId);
try {

var uuidBytes = UuidConverter.asBytes(txId);

driver.execute((ExecutorNoReturn) tx -> API.unchecked(() -> {
var query = String.format("select metadata.id from _ql_committed_%s as o where o.data.%s = ?", txLogTable,
ID_FIELD);
var r = tx.execute(query, ion.newBlob(uuidBytes));
return driver.execute(tx -> {
tx.execute(String.format("update %s as o set %s = ? where o.%s = ?", txLogTable, COMPLETE_FIELD, ID_FIELD),
ion.newBool(true), ion.newBlob(uuidBytes));

if (r.isEmpty()) {
throw new StoreWriteException(QldbTransactionException.noMetadata(query));
}
var query = String.format("select s.%s from %s as s, %s as d BY d_id where d_id = s.%s and d.%s = ?", SEQ_FIELD,
seqTable, txLogTable, DOCID_FIELD, ID_FIELD);

var metaData = (IonStruct) r.iterator().next();
var docid = metaData.get("id");
LOG.debug("Query {}", query);

if (docid == null || docid instanceof IonNull) {
throw new StoreWriteException(QldbTransactionException.invalidSchema(metaData));
var rx = tx.execute(query, ion.newBlob(uuidBytes));

if (rx.isEmpty()) {
throw create("", QldbTransactionException.noMetadata(query));
}

var struct = ion.newEmptyStruct();
struct.add(SEQ_FIELD, ion.newInt(next));
struct.add(DOCID_FIELD, ion.newString(((IonString) docid).stringValue()));
var res = rx.iterator().next();

tx.execute(String.format("insert into %s value ?", seqTable), struct);
if (res instanceof IonStruct) {
var ress = (IonStruct) res;

return null;
}).get());
var seq = ress.get(SEQ_FIELD);

if (seq == null || !(seq instanceof IonInt)) {
throw create("", QldbTransactionException.invalidSchema(res));
}

return ((IonInt) seq).longValue();
}

throw create("", QldbTransactionException.invalidSchema(res));
});
} catch (QldbException e) {
throw new StoreWriteException(e);
}

return next;
}

@Override
Expand Down Expand Up @@ -247,7 +274,7 @@ public Stream<Tuple3<Long, UUID, ByteString>> from(final Long startExclusive, fi
LOG.info("From {} to {}", startExclusive, endInclusive);
tables.checkTables();

var queryPattern = "select s.%s,d.%s,d.%s from %s as d BY d_id, %s as s where d_id = s.%s and s.%s in ( %s )";
var qP = "select s.%s,d.%s,d.%s from %s as d BY d_id, %s as s where d_id = s.%s and d.%s = true and s.%s in ( %s )";

var rx = new Iterator<Tuple3<Long, UUID, ByteString>>() {
private Long position = startExclusive;
Expand All @@ -262,8 +289,8 @@ private Deque<Tuple3<Long, UUID, ByteString>> nextPage(final LongStream range) {
if (ids.isEmpty()) {
return new ArrayDeque<>();
}
var query = String.format(queryPattern, SEQ_FIELD, ID_FIELD, DATA_FIELD, txLogTable, seqTable, DOCID_FIELD,
SEQ_FIELD, ids);
var query = String.format(qP, SEQ_FIELD, ID_FIELD, DATA_FIELD, txLogTable, seqTable, DOCID_FIELD,
COMPLETE_FIELD, SEQ_FIELD, ids);
var r = tx.execute(query);

return StreamSupport.stream(r.spliterator(), false).map(x -> API.unchecked(() -> fromResult(x)).get())
Expand Down

0 comments on commit 276d3f0

Please sign in to comment.