diff --git a/src/main/java/com/blockchaintp/daml/stores/qldb/QldbStore.java b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbStore.java index e04158cc..d26c2bd4 100644 --- a/src/main/java/com/blockchaintp/daml/stores/qldb/QldbStore.java +++ b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbStore.java @@ -13,6 +13,7 @@ */ package com.blockchaintp.daml.stores.qldb; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -37,10 +38,10 @@ import io.vavr.collection.Stream; import kr.pe.kwonnam.slf4jlambda.LambdaLogger; import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; +import software.amazon.awssdk.services.qldbsession.model.QldbSessionException; import software.amazon.qldb.Executor; import software.amazon.qldb.QldbDriver; import software.amazon.qldb.Result; -import software.amazon.qldb.exceptions.QldbDriverException; /** * A K/V store using Amazon QLDB as a backend. @@ -54,6 +55,7 @@ public final class QldbStore implements Store { private final QldbDriver driver; private final String table; private final IonSystem ion; + private final RequiresTables tables; /** * Constructor for QldbStore. @@ -67,6 +69,7 @@ public QldbStore(final QldbDriver qldbDriver, final String tableName) { this.driver = qldbDriver; this.table = tableName; this.ion = IonSystemBuilder.standard().build(); + this.tables = new RequiresTables(Arrays.asList(Tuple.of(table, ID_FIELD)), qldbDriver); } /** @@ -83,6 +86,8 @@ public static QldbStoreBuilder forDriver(final QldbDriver driver) { @Override @SuppressWarnings("java:S1905") public Optional> get(final Key key) throws StoreReadException { + this.tables.checkTables(); + LOG.info("get id={} in table={}", () -> key.toNative().toStringUtf8(), () -> table); try { @@ -96,7 +101,7 @@ public Optional> get(final Key key) throws StoreRe return Optional.of(Value.of(ByteString.copyFrom(hash.getBytes()))); } return Optional.empty(); - } catch (QldbDriverException e) { + } catch (QldbSessionException e) { throw new StoreReadException(e); } } @@ -128,6 +133,8 @@ private IonBlob getHashFromRecord(final IonValue struct) throws StoreReadExcepti @Override @SuppressWarnings("java:S1905") public Map, Value> get(final List> listOfKeys) throws StoreReadException { + this.tables.checkTables(); + LOG.info("get ids=({}) in table={}", () -> listOfKeys.stream().map(k -> k.toNative().toStringUtf8()), () -> table); final var query = String.format("select o.* from %s as o where o.%s in ( %s )", table, ID_FIELD, @@ -141,7 +148,7 @@ public Map, Value> get(final List> l return Stream.ofAll(r).toJavaMap( k -> Tuple.of(Key.of(ByteString.copyFrom(API.unchecked(() -> getIdFromRecord(k)).get().getBytes())), Value.of(ByteString.copyFrom(API.unchecked(() -> getHashFromRecord(k)).get().getBytes())))); - } catch (QldbDriverException e) { + } catch (QldbSessionException e) { throw new StoreReadException(e); } } @@ -168,6 +175,7 @@ private IonStruct makeRecord(final Key key, final Value */ @Override public void put(final Key key, final Value value) throws StoreWriteException { + this.tables.checkTables(); LOG.info("upsert id={} in table={}", () -> key.toNative().toStringUtf8(), () -> table); @@ -199,6 +207,8 @@ public void put(final Key key, final Value value) throws */ @Override public void put(final List, Value>> listOfPairs) throws StoreWriteException { + this.tables.checkTables(); + LOG.debug("upsert ids={} in table={}", () -> listOfPairs.stream().map(Map.Entry::getKey).collect(Collectors.toList()), () -> table); diff --git a/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionException.java b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionException.java new file mode 100644 index 00000000..576d7f0e --- /dev/null +++ b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionException.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.stores.qldb; + +import com.amazon.ion.IonValue; +import com.blockchaintp.daml.stores.exception.StoreException; + +/** + * + */ +public class QldbTransactionException extends StoreException { + /** + * A transaction log exception with an originating cause. + * + * @param cause + * the cause + */ + protected QldbTransactionException(final Throwable cause) { + super(cause); + } + + /** + * An exception with a message. + * + * @param message + * Pertinent message text. + */ + protected QldbTransactionException(final String message) { + super(message); + } + + /** + * We have retrieved a qldb record with an unexpected schema. + * + * @param value + * @return The constructed exception + */ + public static QldbTransactionException invalidSchema(final IonValue value) { + return new QldbTransactionException(String.format("Invalid result from qldb %s", value.toPrettyString())); + } + + /** + * We have not managed to fetch metadata for a document. + * + * @param query + * The query that failed to fetch metadata + * @return The constructed exception + */ + public static QldbTransactionException noMetadata(final String query) { + return new QldbTransactionException(String.format("Metadata query '%s' returned no results", query)); + } +} diff --git a/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionLog.java b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionLog.java new file mode 100644 index 00000000..370c1c80 --- /dev/null +++ b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTransactionLog.java @@ -0,0 +1,176 @@ +/* + * Copyright 2021 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.stores.qldb; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.UUID; + +import com.amazon.ion.IonInt; +import com.amazon.ion.IonString; +import com.amazon.ion.IonStruct; +import com.amazon.ion.IonSystem; +import com.blockchaintp.daml.stores.exception.StoreWriteException; +import com.blockchaintp.daml.stores.service.TransactionLog; +import com.google.protobuf.ByteString; + +import io.vavr.Tuple; +import software.amazon.awssdk.services.qldb.model.QldbException; +import software.amazon.awssdk.services.qldbsession.model.QldbSessionException; +import software.amazon.qldb.ExecutorNoReturn; +import software.amazon.qldb.QldbDriver; + +/** + * Implements a transaction log using 2 QLDB tables. + * + * daml_tx_log: i: UUID as a blob, indexed v: transaction value + * + * daml_tx_seq: s: Long sequence field, indexed d: docid of a daml_tx_log entry + */ +public final class QldbTransactionLog implements TransactionLog { + private static final String ID_FIELD = "i"; + private static final String SEQ_FIELD = "s"; + private static final String DOCID_FIELD = "d"; + private static final String DATA_FIELD = "v"; + private static final int UUID_LENGTH_IN_BYTES = 16; + private final RequiresTables tables; + private final String txLogTable; + private final String seqTable; + private final QldbDriver driver; + private final IonSystem ion; + private QldbTxSeq seqSource; + + /** + * Construct a new QLDB transaction log for an id, sequence and opaque blob value. + * + * @param tableName + * A common prefix for the transaction log tables names + * @param theDriver + * @param ionSystem + */ + public QldbTransactionLog(final String tableName, final QldbDriver theDriver, final IonSystem ionSystem) { + this.driver = theDriver; + this.ion = ionSystem; + this.txLogTable = String.format("%s_tx_log", tableName); + this.seqTable = String.format("%s_seq", tableName); + this.tables = new RequiresTables(Arrays.asList(Tuple.of(txLogTable, ID_FIELD), Tuple.of(seqTable, SEQ_FIELD)), + theDriver); + } + + private static UUID asUuid(final byte[] bytes) { + var bb = ByteBuffer.wrap(bytes); + var firstLong = bb.getLong(); + var secondLong = bb.getLong(); + return new UUID(firstLong, secondLong); + } + + private static byte[] asBytes(final UUID uuid) { + var bb = ByteBuffer.wrap(new byte[UUID_LENGTH_IN_BYTES]); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + return bb.array(); + } + + @Override + public UUID begin() throws StoreWriteException { + var uuid = UUID.randomUUID(); + try { + var uuidBytes = asBytes(uuid); + + driver.execute(tx -> { + var struct = ion.newEmptyStruct(); + struct.add(ID_FIELD, ion.newBlob(uuidBytes)); + tx.execute(String.format("insert into %s value ?", txLogTable), struct); + }); + } catch (QldbException e) { + throw new StoreWriteException(e); + } + return uuid; + } + + @Override + public void sendEvent(final UUID id, final ByteString data) throws StoreWriteException { + try { + var uuidBytes = asBytes(id); + + driver.execute(tx -> { + var struct = ion.newEmptyStruct(); + struct.add(ID_FIELD, ion.newBlob(uuidBytes)); + tx.execute(String.format("update %s as o set %s = ? where o.%s = ?", txLogTable, DATA_FIELD, ID_FIELD), + ion.newBlob(data.toByteArray()), ion.newBlob(uuidBytes)); + }); + } catch (QldbException e) { + throw new StoreWriteException(e); + } + } + + private void ensureSequence() throws QldbSessionException { + if (seqSource != null) { + return; + } + + driver.execute(tx -> { + var res = tx.execute(String.format("select max(%s) from %s", SEQ_FIELD, seqTable)); + if (res.isEmpty()) { + this.seqSource = new QldbTxSeq(0L); + } else { + var s = (IonStruct) res.iterator().next(); + var i = (IonInt) s.get("_1"); + if (i == null) { + throw QldbSessionException.create("", QldbTransactionException.invalidSchema(s)); + } + + this.seqSource = new QldbTxSeq(i.longValue()); + } + }); + } + + @Override + public Long commit(final UUID txId) throws StoreWriteException { + try { + ensureSequence(); + + var uuidBytes = asBytes(txId); + + driver.execute((ExecutorNoReturn) tx -> { + var query = String.format("select metadata.id from _ql_committed_%s as o where o.%s = ?", txLogTable, ID_FIELD); + var r = tx.execute(query, ion.newBlob(uuidBytes)); + + if (r.isEmpty()) { + throw QldbException.create("", QldbTransactionException.noMetadata(query)); + } + + var metaData = (IonStruct) r.iterator().next(); + var docid = (IonString) metaData.get("id"); + + if (docid == null) { + throw QldbException.create("", QldbTransactionException.invalidSchema(metaData)); + } + + var struct = ion.newEmptyStruct(); + struct.add(SEQ_FIELD, ion.newInt(seqSource.peekNext())); + struct.add(DOCID_FIELD, docid); + + tx.execute(String.format("insert into %s value ?", txLogTable, DATA_FIELD, ID_FIELD), struct); + }); + } catch (QldbException e) { + throw new StoreWriteException(e); + } + return seqSource.takeNext(); + } + + @Override + public void abort(final UUID txId) { + } +} diff --git a/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTxSeq.java b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTxSeq.java new file mode 100644 index 00000000..c82f6e2f --- /dev/null +++ b/src/main/java/com/blockchaintp/daml/stores/qldb/QldbTxSeq.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.stores.qldb; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import com.blockchaintp.daml.stores.service.SeqSource; + +/** + * A long sequence, initialisable from a QLDB tx log sequence table or an explicit point. + */ +public final class QldbTxSeq implements SeqSource { + private long current; + + /** + * Initialise the sequence at a particular point. + * + * @param start + */ + public QldbTxSeq(final Long start) { + this.current = start; + } + + @Override + public Long peekNext() { + return current; + } + + @Override + public Long takeNext() { + var then = current; + current = current + 1; + return then; + } + + @Override + public List peekRange(final long size) { + return LongStream.range(current, current + size).boxed().collect(Collectors.toList()); + } + + @Override + public List takeRange(final long size) { + var seq = peekRange(size); + current += size; + return seq; + } +} diff --git a/src/main/java/com/blockchaintp/daml/stores/qldb/RequiresTables.java b/src/main/java/com/blockchaintp/daml/stores/qldb/RequiresTables.java new file mode 100644 index 00000000..e3c30bb3 --- /dev/null +++ b/src/main/java/com/blockchaintp/daml/stores/qldb/RequiresTables.java @@ -0,0 +1,74 @@ +/* + * Copyright 2021 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.stores.qldb; + +import java.util.List; +import java.util.stream.StreamSupport; + +import io.vavr.Tuple2; +import kr.pe.kwonnam.slf4jlambda.LambdaLogger; +import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; +import software.amazon.awssdk.services.qldb.model.QldbException; +import software.amazon.qldb.QldbDriver; + +/** + * Checks for QLDB and creates if needed, short circuiting after tables have been created. + */ +public final class RequiresTables { + private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(RequiresTables.class); + private final boolean allCreated; + private final List> tables; + private final QldbDriver driver; + + /** + * + * @param tablesToCreate + * Tuples of (tablename,indexedfield). + * @param theDriver + * A QLDB driver. + */ + protected RequiresTables(final List> tablesToCreate, final QldbDriver theDriver) { + this.allCreated = false; + this.tables = tablesToCreate; + this.driver = theDriver; + } + + private boolean tableExists(final String table) { + return StreamSupport.stream(driver.getTableNames().spliterator(), false).anyMatch(s -> s.equals(table)); + } + + /** + * + */ + public void checkTables() throws QldbException { + if (allCreated) { + return; + } + + tables.forEach(t -> { + if (tableExists(t._1)) { + LOG.debug("Table {} exists, skip create", () -> t._1); + return; + } + + LOG.info("Creating table {}", () -> t._1); + + driver.execute(tx -> { + tx.execute(String.format("create table %s", t._1)); + tx.execute(String.format("create index on %s(%s)", t._1, t._2)); + }); + }); + } + +} diff --git a/src/main/java/com/blockchaintp/daml/stores/resources/QldbResources.java b/src/main/java/com/blockchaintp/daml/stores/resources/QldbResources.java index bc447af7..81d9a5d5 100644 --- a/src/main/java/com/blockchaintp/daml/stores/resources/QldbResources.java +++ b/src/main/java/com/blockchaintp/daml/stores/resources/QldbResources.java @@ -14,7 +14,6 @@ package com.blockchaintp.daml.stores.resources; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.StreamSupport; import kr.pe.kwonnam.slf4jlambda.LambdaLogger; import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; @@ -36,7 +35,6 @@ public class QldbResources implements RequiresAWSResources { private final QldbClient infrastructureClient; private final QldbDriver driver; private final String ledger; - private final String table; /** * Constructor. @@ -47,15 +45,11 @@ public class QldbResources implements RequiresAWSResources { * the qldb driver * @param ledgerName * the ledger name - * @param tableName - * the table name */ - public QldbResources(final QldbClient qldbClient, final QldbDriver qldbDriver, final String ledgerName, - final String tableName) { + public QldbResources(final QldbClient qldbClient, final QldbDriver qldbDriver, final String ledgerName) { this.infrastructureClient = qldbClient; this.driver = qldbDriver; this.ledger = ledgerName; - this.table = tableName; } private boolean ledgerState(final LedgerState state) { @@ -72,10 +66,6 @@ private boolean ledgerState(final LedgerState state) { } } - private boolean tableExists() { - return StreamSupport.stream(driver.getTableNames().spliterator(), false).anyMatch(s -> s.equals(table)); - } - @Override public final void ensureResources() { LOG.debug("Check ledger state"); @@ -95,23 +85,12 @@ public final void ensureResources() { } } - if (tableExists()) { - LOG.debug("Table {} exists, skip create", () -> table); - return; - } - - LOG.info("Creating table {}", () -> table); - - driver.execute(tx -> { - tx.execute(String.format("create table %s", table)); - tx.execute(String.format("create index on %s(id)", table)); - }); } @Override public final void destroyResources() { if (ledgerState(LedgerState.DELETED)) { - LOG.debug("Ledger {} does not exist, skip delete", () -> table); + LOG.debug("Ledger {} does not exist, skip delete", () -> ledger); return; } LOG.info("Delete ledger {}", () -> ledger); diff --git a/src/main/java/com/blockchaintp/daml/stores/service/SeqSource.java b/src/main/java/com/blockchaintp/daml/stores/service/SeqSource.java new file mode 100644 index 00000000..53bedc16 --- /dev/null +++ b/src/main/java/com/blockchaintp/daml/stores/service/SeqSource.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.stores.service; + +import java.util.List; + +/** + * A source of ordered values of I. + * + * @param + */ +public interface SeqSource { + /** + * + * @return the next I, without consuming it + */ + I peekNext(); + + /** + * + * @return the next I, consuming it + */ + I takeNext(); + + /** + * A non comsumed sequence of I with the specified size. + * + * @param size + * @return An ordered sequence + */ + List peekRange(long size); + + /** + * A comsumed sequence of I with the specified size. + * + * @param size + * @return An ordered sequence + */ + List takeRange(long size); +} diff --git a/src/main/java/com/blockchaintp/daml/stores/service/TransactionLogWriter.java b/src/main/java/com/blockchaintp/daml/stores/service/TransactionLogWriter.java index ca77eafc..79d7269f 100644 --- a/src/main/java/com/blockchaintp/daml/stores/service/TransactionLogWriter.java +++ b/src/main/java/com/blockchaintp/daml/stores/service/TransactionLogWriter.java @@ -13,6 +13,8 @@ */ package com.blockchaintp.daml.stores.service; +import com.blockchaintp.daml.stores.exception.StoreWriteException; + /** * A TransactionLogWriter is a StoreWriter that also supports the sending of events. * @@ -31,7 +33,7 @@ public interface TransactionLogWriter { * * @return The transaction log id of the uncommitted transaction */ - K begin(); + K begin() throws StoreWriteException; /** * Update the log entry for the id. @@ -39,7 +41,7 @@ public interface TransactionLogWriter { * @param id * @param data */ - void sendEvent(K id, V data); + void sendEvent(K id, V data) throws StoreWriteException; /** * Commit the transaction with this identifier. @@ -47,12 +49,12 @@ public interface TransactionLogWriter { * @param txId * @return the position in the log of the transaction. */ - I commit(K txId); + I commit(K txId) throws StoreWriteException; /** * Abort the transaction with this identifier. * * @param txId */ - void abort(K txId); + void abort(K txId) throws StoreWriteException; } diff --git a/src/test/java/com/blockchaintp/daml/stores/qldb/QldbStoreIntegrationTest.java b/src/test/java/com/blockchaintp/daml/stores/qldb/QldbStoreIntegrationTest.java index 5399ecad..9abf4cc4 100644 --- a/src/test/java/com/blockchaintp/daml/stores/qldb/QldbStoreIntegrationTest.java +++ b/src/test/java/com/blockchaintp/daml/stores/qldb/QldbStoreIntegrationTest.java @@ -62,7 +62,7 @@ final void establishStore() throws StoreWriteException { this.resources = new QldbResources( QldbClient.builder().credentialsProvider(DefaultCredentialsProvider.create()).region(Region.EU_WEST_2).build(), - driver, ledger, "qldbstoreintegrationtest"); + driver, ledger); final var storeBuilder = QldbStore.forDriver(driver).tableName("qldbstoreintegrationtest");