Skip to content

Commit

Permalink
feat: TxLog write, improve resource creation
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Jul 23, 2021
1 parent 4651ea8 commit 9c02a87
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 31 deletions.
16 changes: 13 additions & 3 deletions src/main/java/com/blockchaintp/daml/stores/qldb/QldbStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.blockchaintp.daml.stores.qldb;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -37,10 +38,10 @@
import io.vavr.collection.Stream;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import software.amazon.awssdk.services.qldbsession.model.QldbSessionException;
import software.amazon.qldb.Executor;
import software.amazon.qldb.QldbDriver;
import software.amazon.qldb.Result;
import software.amazon.qldb.exceptions.QldbDriverException;

/**
* A K/V store using Amazon QLDB as a backend.
Expand All @@ -54,6 +55,7 @@ public final class QldbStore implements Store<ByteString, ByteString> {
private final QldbDriver driver;
private final String table;
private final IonSystem ion;
private final RequiresTables tables;

/**
* Constructor for QldbStore.
Expand All @@ -67,6 +69,7 @@ public QldbStore(final QldbDriver qldbDriver, final String tableName) {
this.driver = qldbDriver;
this.table = tableName;
this.ion = IonSystemBuilder.standard().build();
this.tables = new RequiresTables(Arrays.asList(Tuple.of(table, ID_FIELD)), qldbDriver);
}

/**
Expand All @@ -83,6 +86,8 @@ public static QldbStoreBuilder forDriver(final QldbDriver driver) {
@Override
@SuppressWarnings("java:S1905")
public Optional<Value<ByteString>> get(final Key<ByteString> key) throws StoreReadException {
this.tables.checkTables();

LOG.info("get id={} in table={}", () -> key.toNative().toStringUtf8(), () -> table);

try {
Expand All @@ -96,7 +101,7 @@ public Optional<Value<ByteString>> get(final Key<ByteString> key) throws StoreRe
return Optional.of(Value.of(ByteString.copyFrom(hash.getBytes())));
}
return Optional.empty();
} catch (QldbDriverException e) {
} catch (QldbSessionException e) {
throw new StoreReadException(e);
}
}
Expand Down Expand Up @@ -128,6 +133,8 @@ private IonBlob getHashFromRecord(final IonValue struct) throws StoreReadExcepti
@Override
@SuppressWarnings("java:S1905")
public Map<Key<ByteString>, Value<ByteString>> get(final List<Key<ByteString>> listOfKeys) throws StoreReadException {
this.tables.checkTables();

LOG.info("get ids=({}) in table={}", () -> listOfKeys.stream().map(k -> k.toNative().toStringUtf8()), () -> table);

final var query = String.format("select o.* from %s as o where o.%s in ( %s )", table, ID_FIELD,
Expand All @@ -141,7 +148,7 @@ public Map<Key<ByteString>, Value<ByteString>> get(final List<Key<ByteString>> l
return Stream.ofAll(r).toJavaMap(
k -> Tuple.of(Key.of(ByteString.copyFrom(API.unchecked(() -> getIdFromRecord(k)).get().getBytes())),
Value.of(ByteString.copyFrom(API.unchecked(() -> getHashFromRecord(k)).get().getBytes()))));
} catch (QldbDriverException e) {
} catch (QldbSessionException e) {
throw new StoreReadException(e);
}
}
Expand All @@ -168,6 +175,7 @@ private IonStruct makeRecord(final Key<ByteString> key, final Value<ByteString>
*/
@Override
public void put(final Key<ByteString> key, final Value<ByteString> value) throws StoreWriteException {
this.tables.checkTables();

LOG.info("upsert id={} in table={}", () -> key.toNative().toStringUtf8(), () -> table);

Expand Down Expand Up @@ -199,6 +207,8 @@ public void put(final Key<ByteString> key, final Value<ByteString> value) throws
*/
@Override
public void put(final List<Map.Entry<Key<ByteString>, Value<ByteString>>> listOfPairs) throws StoreWriteException {
this.tables.checkTables();

LOG.debug("upsert ids={} in table={}",
() -> listOfPairs.stream().map(Map.Entry::getKey).collect(Collectors.toList()), () -> table);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.blockchaintp.daml.stores.qldb;

import com.amazon.ion.IonValue;
import com.blockchaintp.daml.stores.exception.StoreException;

/**
*
*/
public class QldbTransactionException extends StoreException {
/**
* A transaction log exception with an originating cause.
*
* @param cause
* the cause
*/
protected QldbTransactionException(final Throwable cause) {
super(cause);
}

/**
* An exception with a message.
*
* @param message
* Pertinent message text.
*/
protected QldbTransactionException(final String message) {
super(message);
}

/**
* We have retrieved a qldb record with an unexpected schema.
*
* @param value
* @return The constructed exception
*/
public static QldbTransactionException invalidSchema(final IonValue value) {
return new QldbTransactionException(String.format("Invalid result from qldb %s", value.toPrettyString()));
}

/**
* We have not managed to fetch metadata for a document.
*
* @param query
* The query that failed to fetch metadata
* @return The constructed exception
*/
public static QldbTransactionException noMetadata(final String query) {
return new QldbTransactionException(String.format("Metadata query '%s' returned no results", query));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2021 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.blockchaintp.daml.stores.qldb;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;

import com.amazon.ion.IonInt;
import com.amazon.ion.IonString;
import com.amazon.ion.IonStruct;
import com.amazon.ion.IonSystem;
import com.blockchaintp.daml.stores.exception.StoreWriteException;
import com.blockchaintp.daml.stores.service.TransactionLog;
import com.google.protobuf.ByteString;

import io.vavr.Tuple;
import software.amazon.awssdk.services.qldb.model.QldbException;
import software.amazon.awssdk.services.qldbsession.model.QldbSessionException;
import software.amazon.qldb.ExecutorNoReturn;
import software.amazon.qldb.QldbDriver;

/**
* Implements a transaction log using 2 QLDB tables.
*
* daml_tx_log: i: UUID as a blob, indexed v: transaction value
*
* daml_tx_seq: s: Long sequence field, indexed d: docid of a daml_tx_log entry
*/
public final class QldbTransactionLog implements TransactionLog<UUID, ByteString, Long> {
private static final String ID_FIELD = "i";
private static final String SEQ_FIELD = "s";
private static final String DOCID_FIELD = "d";
private static final String DATA_FIELD = "v";
private static final int UUID_LENGTH_IN_BYTES = 16;
private final RequiresTables tables;
private final String txLogTable;
private final String seqTable;
private final QldbDriver driver;
private final IonSystem ion;
private QldbTxSeq seqSource;

/**
* Construct a new QLDB transaction log for an id, sequence and opaque blob value.
*
* @param tableName
* A common prefix for the transaction log tables names
* @param theDriver
* @param ionSystem
*/
public QldbTransactionLog(final String tableName, final QldbDriver theDriver, final IonSystem ionSystem) {
this.driver = theDriver;
this.ion = ionSystem;
this.txLogTable = String.format("%s_tx_log", tableName);
this.seqTable = String.format("%s_seq", tableName);
this.tables = new RequiresTables(Arrays.asList(Tuple.of(txLogTable, ID_FIELD), Tuple.of(seqTable, SEQ_FIELD)),
theDriver);
}

private static UUID asUuid(final byte[] bytes) {
var bb = ByteBuffer.wrap(bytes);
var firstLong = bb.getLong();
var secondLong = bb.getLong();
return new UUID(firstLong, secondLong);
}

private static byte[] asBytes(final UUID uuid) {
var bb = ByteBuffer.wrap(new byte[UUID_LENGTH_IN_BYTES]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}

@Override
public UUID begin() throws StoreWriteException {
var uuid = UUID.randomUUID();
try {
var uuidBytes = asBytes(uuid);

driver.execute(tx -> {
var struct = ion.newEmptyStruct();
struct.add(ID_FIELD, ion.newBlob(uuidBytes));
tx.execute(String.format("insert into %s value ?", txLogTable), struct);
});
} catch (QldbException e) {
throw new StoreWriteException(e);
}
return uuid;
}

@Override
public void sendEvent(final UUID id, final ByteString data) throws StoreWriteException {
try {
var uuidBytes = asBytes(id);

driver.execute(tx -> {
var struct = ion.newEmptyStruct();
struct.add(ID_FIELD, ion.newBlob(uuidBytes));
tx.execute(String.format("update %s as o set %s = ? where o.%s = ?", txLogTable, DATA_FIELD, ID_FIELD),
ion.newBlob(data.toByteArray()), ion.newBlob(uuidBytes));
});
} catch (QldbException e) {
throw new StoreWriteException(e);
}
}

private void ensureSequence() throws QldbSessionException {
if (seqSource != null) {
return;
}

driver.execute(tx -> {
var res = tx.execute(String.format("select max(%s) from %s", SEQ_FIELD, seqTable));
if (res.isEmpty()) {
this.seqSource = new QldbTxSeq(0L);
} else {
var s = (IonStruct) res.iterator().next();
var i = (IonInt) s.get("_1");
if (i == null) {
throw QldbSessionException.create("", QldbTransactionException.invalidSchema(s));
}

this.seqSource = new QldbTxSeq(i.longValue());
}
});
}

@Override
public Long commit(final UUID txId) throws StoreWriteException {
try {
ensureSequence();

var uuidBytes = asBytes(txId);

driver.execute((ExecutorNoReturn) tx -> {
var query = String.format("select metadata.id from _ql_committed_%s as o where o.%s = ?", txLogTable, ID_FIELD);
var r = tx.execute(query, ion.newBlob(uuidBytes));

if (r.isEmpty()) {
throw QldbException.create("", QldbTransactionException.noMetadata(query));
}

var metaData = (IonStruct) r.iterator().next();
var docid = (IonString) metaData.get("id");

if (docid == null) {
throw QldbException.create("", QldbTransactionException.invalidSchema(metaData));
}

var struct = ion.newEmptyStruct();
struct.add(SEQ_FIELD, ion.newInt(seqSource.peekNext()));
struct.add(DOCID_FIELD, docid);

tx.execute(String.format("insert into %s value ?", txLogTable, DATA_FIELD, ID_FIELD), struct);
});
} catch (QldbException e) {
throw new StoreWriteException(e);
}
return seqSource.takeNext();
}

@Override
public void abort(final UUID txId) {
}
}
60 changes: 60 additions & 0 deletions src/main/java/com/blockchaintp/daml/stores/qldb/QldbTxSeq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.blockchaintp.daml.stores.qldb;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import com.blockchaintp.daml.stores.service.SeqSource;

/**
* A long sequence, initialisable from a QLDB tx log sequence table or an explicit point.
*/
public final class QldbTxSeq implements SeqSource<Long> {
private long current;

/**
* Initialise the sequence at a particular point.
*
* @param start
*/
public QldbTxSeq(final Long start) {
this.current = start;
}

@Override
public Long peekNext() {
return current;
}

@Override
public Long takeNext() {
var then = current;
current = current + 1;
return then;
}

@Override
public List<Long> peekRange(final long size) {
return LongStream.range(current, current + size).boxed().collect(Collectors.toList());
}

@Override
public List<Long> takeRange(final long size) {
var seq = peekRange(size);
current += size;
return seq;
}
}
Loading

0 comments on commit 9c02a87

Please sign in to comment.