Skip to content

Commit

Permalink
feat: corecing, tests, dependency rearrange
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Aug 5, 2021
1 parent f9e2c50 commit 3d1892f
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.blockchaintp.daml.participant;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.daml.ledger.participant.state.v1.Offset;
import com.daml.ledger.participant.state.v1.SubmissionResult;
import com.daml.ledger.resources.ResourceContext;
import com.daml.lf.engine.Engine;
import com.daml.telemetry.TelemetryContext;

import akka.NotUsed;
Expand Down Expand Up @@ -57,6 +58,7 @@ public final class Participant<I extends Identifier, A extends LedgerAddress> im
/**
* Convenience method for creating a builder.
*
* @param theEngine
* @param theParticipantId
* @param theLedgerId
* @param theContext
Expand All @@ -65,8 +67,9 @@ public final class Participant<I extends Identifier, A extends LedgerAddress> im
* @return A partially configured participant builder.
*/
public static <I2 extends Identifier, A2 extends LedgerAddress> ParticipantBuilder<I2, A2> builder(
final String theParticipantId, final String theLedgerId, final ResourceContext theContext) {
return new ParticipantBuilder<I2, A2>(theParticipantId, theLedgerId, theContext);
final Engine theEngine, final String theParticipantId, final String theLedgerId,
final ResourceContext theContext) {
return new ParticipantBuilder<I2, A2>(theEngine, theParticipantId, theLedgerId, theContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord;
import com.daml.ledger.participant.state.v1.Offset;
import com.daml.ledger.resources.ResourceContext;
import com.daml.lf.engine.Engine;

/**
*
Expand All @@ -33,19 +34,23 @@
public final class ParticipantBuilder<I extends Identifier, A extends LedgerAddress> {
private final String participantId;
private final String ledgerId;
private ResourceContext context;
private final Engine engine;
private final ResourceContext context;
private TransactionLogReader<Offset, DamlKvutils.DamlLogEntryId, LedgerRecord> txLog;
private LedgerSubmitter<I, A> submitter;
private final CommitPayloadBuilder commitPayloadBuilder;

/**
* Construct a participant builder for the given identifiers.
*
* @param theEngine
* @param theLedgerId
* @param theParticipantId
* @param theContext
*/
public ParticipantBuilder(final String theLedgerId, final String theParticipantId, final ResourceContext theContext) {
public ParticipantBuilder(final Engine theEngine, final String theLedgerId, final String theParticipantId,
final ResourceContext theContext) {
engine = theEngine;
participantId = theParticipantId;
ledgerId = theLedgerId;
context = theContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public class CoercingStore<K1, K2, V1, V2> implements Store<K1, V1> {
private final Function<K1, K2> keyCoercionTo;
private final Function<V1, V2> valueCoercionTo;

/**
* Convenience method to wrap a store.
*
* @param keyCoercionFrom
* @param valueCoercionFrom
* @param keyCoercionTo
* @param valueCoercionTo
* @param inner
* @param <KK1>
* @param <KK2>
* @param <VV1>
* @param <VV2>
* @return a wrapped store
*/
public static <KK1, KK2, VV1, VV2> Store<KK1, VV1> from(final Function<KK2, KK1> keyCoercionFrom,
final Function<VV2, VV1> valueCoercionFrom, final Function<KK1, KK2> keyCoercionTo,
final Function<VV1, VV2> valueCoercionTo, final Store<KK2, VV2> inner) {
return new CoercingStore<KK1, KK2, VV1, VV2>(keyCoercionFrom, keyCoercionTo, valueCoercionFrom, valueCoercionTo,
inner);
}

/**
* Wrap an underlying store with value and kehy coercions.
*
Expand All @@ -53,9 +74,9 @@ public class CoercingStore<K1, K2, V1, V2> implements Store<K1, V1> {
* @param theKeyCoercionTo
* @param theValueCoercionTo
*/
public CoercingStore(final Store<K2, V2> theInner, final Function<K2, K1> theKeyCoercionFrom,
final Function<V2, V1> theValueCoercionFrom, final Function<K1, K2> theKeyCoercionTo,
final Function<V1, V2> theValueCoercionTo) {
public CoercingStore(final Function<K2, K1> theKeyCoercionFrom, final Function<K1, K2> theKeyCoercionTo,
final Function<V2, V1> theValueCoercionFrom, final Function<V1, V2> theValueCoercionTo,
final Store<K2, V2> theInner) {
this.inner = theInner;
this.keyCoercionFrom = theKeyCoercionFrom;
this.valueCoercionFrom = theValueCoercionFrom;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package com.blockchaintp.daml.stores.qldb;

import com.blockchaintp.daml.stores.layers.RetryingConfig;
import com.blockchaintp.daml.stores.service.Store;
import com.google.protobuf.ByteString;

import software.amazon.qldb.QldbDriver;

/**
Expand All @@ -22,6 +26,7 @@ public final class QldbStoreBuilder {

private final QldbDriver driver;
private String table;
private RetryingConfig retryingConfig;

private QldbStoreBuilder(final QldbDriver qldbDriver) {
this.driver = qldbDriver;
Expand Down Expand Up @@ -50,15 +55,35 @@ public QldbStoreBuilder tableName(final String tableName) {
return this;
}

/**
* Specify the number of retries to use for the stores built.
*
* @param maxRetries
* the maximum number of retries.
* @return the builder
*/
public QldbStoreBuilder retrying(final int maxRetries) {
this.retryingConfig = new RetryingConfig();
this.retryingConfig.setMaxRetries(maxRetries);

return this;
}

/**
* Construct a QLDBStore instance.
*
* @return the instance
*/
public QldbStore build() {
public Store<ByteString, ByteString> build() {
if (table == null) {
throw new QldbStoreBuilderException("No table name specified in builder");
}
return new QldbStore(driver, table);
var store = new QldbStore(driver, table);

if (retryingConfig != null) {
return new QldbRetryStrategy(retryingConfig, store);
}

return store;
}
}
39 changes: 37 additions & 2 deletions src/main/scala/com/blockchaintp/daml/qldb/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,32 @@ import com.amazon.ion.system.IonSystemBuilder
import com.blockchaintp.daml.address.QldbAddress
import com.blockchaintp.daml.address.QldbIdentifier
import com.blockchaintp.daml.participant.CommitPayloadBuilder
import com.blockchaintp.daml.participant.InProcLedgerSubmitter
import com.blockchaintp.daml.participant.ParticipantBuilder
import com.blockchaintp.daml.runtime.BuilderLedgerFactory
import com.blockchaintp.daml.stores.layers.CoercingStore
import com.blockchaintp.daml.stores.layers.CoercingTxLog
import com.blockchaintp.daml.stores.layers.SplitStore
import com.blockchaintp.daml.stores.layers.SplitTransactionLog
import com.blockchaintp.daml.stores.qldb.QldbStore
import com.blockchaintp.daml.stores.qldb.QldbTransactionLog
import com.blockchaintp.daml.stores.s3.S3Store
import com.daml.jwt.JwksVerifier
import com.daml.jwt.RSA256Verifier
import com.daml.ledger.api.auth.AuthService
import com.daml.ledger.api.auth.AuthServiceJWT
import com.daml.ledger.api.auth.AuthServiceWildcard
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateKey
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateValue
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting
import com.daml.ledger.participant.state.kvutils.app.Config
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.daml.ledger.participant.state.v1.Configuration
import com.daml.ledger.participant.state.v1.TimeModel
import com.daml.ledger.resources.ResourceContext
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.ProgramResource
import com.google.protobuf.ByteString
import scopt.OptionParser
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
Expand All @@ -62,7 +69,14 @@ object Main {
var txBlobStore = S3Store
.forClient(clientBuilder)
.forStore(config.ledgerId)
.forTable("blobs")
.forTable("tx_log_blobs")
.retrying(3)
.build();

var stateBlobStore = S3Store
.forClient(clientBuilder)
.forStore(config.ledgerId)
.forTable("daml_state_blobs")
.retrying(3)
.build();

Expand All @@ -73,12 +87,33 @@ object Main {
val ionSystem = IonSystemBuilder.standard.build
val driver =
QldbDriver.builder.ledger(config.ledgerId).sessionClientBuilder(sessionBuilder).ionSystem(ionSystem).build()

var stateQldbStore = QldbStore
.forDriver(driver)
.retrying(3)
.tableName("daml_state")
.build();

var stateStore = {
CoercingStore.from(
(daml: ByteString) => DamlStateKey.parseFrom(daml),
(daml: ByteString) => DamlStateValue.parseFrom(daml),
(daml: DamlStateKey) => daml.toByteString(),
(daml: DamlStateValue) => daml.toByteString(),
SplitStore
.fromStores(stateQldbStore, stateBlobStore)
.verified(true)
.withS3Index(true)
.build()
)
}

val qldbTransactionLog = QldbTransactionLog
.forDriver(driver)
.tablePrefix("default")
.build();

var splitTransactionLog = SplitTransactionLog
var transactionLog = SplitTransactionLog
.from(qldbTransactionLog, txBlobStore)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ abstract class BuilderLedgerFactory[
logCtx: LoggingContext
): ResourceOwner[KeyValueLedger] = {
new ParticipantOwner(
engine,
logCtx,
config.ledgerId,
participantConfig.participantId,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.resources.Resource
import com.daml.ledger.resources.ResourceContext
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.resources

class ParticipantOwner[ExtraConfig, Id <: Identifier, Address <: LedgerAddress](
var engine: Engine,
var logCtx: LoggingContext,
var ledgerId: LedgerId,
var participantId: ParticipantId,
var config: Config[ExtraConfig],
Expand All @@ -36,7 +40,7 @@ class ParticipantOwner[ExtraConfig, Id <: Identifier, Address <: LedgerAddress](
context: ResourceContext
): resources.Resource[ResourceContext, Participant[Id, Address]] = {
Resource.successful(
build(config, new ParticipantBuilder[Id, Address](ledgerId, participantId, context)).build()
build(config, new ParticipantBuilder[Id, Address](engine, ledgerId, participantId, context)).build()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2021 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.blockchaintp.daml.stores.layers;

import com.blockchaintp.daml.stores.StubStore;
import com.blockchaintp.daml.stores.StubTransactionLog;
import com.blockchaintp.daml.stores.exception.StoreReadException;
import com.blockchaintp.daml.stores.exception.StoreWriteException;
import com.blockchaintp.daml.stores.service.Key;
import com.blockchaintp.daml.stores.service.Value;
import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.v1.Offset;
import com.daml.lf.transaction.ContractKeyUniquenessMode;
import com.google.protobuf.ByteString;
import io.vavr.API;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class CoercingTest {
@Test
void store_coercion() throws StoreWriteException, StoreReadException {
var stub = new StubStore<ByteString, ByteString>();

var coerced = CoercingStore.from(API.unchecked((ByteString k) -> DamlKvutils.DamlStateKey.parseFrom(k)),
API.unchecked((ByteString v) -> DamlKvutils.DamlStateValue.parseFrom(v)),
(DamlKvutils.DamlStateKey k) -> k.toByteString(), (DamlKvutils.DamlStateValue v) -> v.toByteString(), stub);

var k = DamlKvutils.DamlStateKey.newBuilder().setParty("bob").build();
var v = DamlKvutils.DamlStateValue.newBuilder().build();
coerced.put(Key.of(k), Value.of(v));

Assertions.assertArrayEquals(v.toByteArray(), coerced.get(Key.of(k)).get().toNative().toByteArray());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.blockchaintp.daml.stores.resources.QldbResources;
import com.blockchaintp.daml.stores.service.Key;
import com.blockchaintp.daml.stores.service.Opaque;
import com.blockchaintp.daml.stores.service.Store;
import com.blockchaintp.daml.stores.service.Value;
import com.google.protobuf.ByteString;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -44,7 +45,7 @@

class QldbStoreIntegrationTest {
private static final int ITERATIONS = 40;
private QldbStore store;
private Store<ByteString, ByteString> store;
private IonSystem ionSystem;
private QldbResources resources;

Expand Down

0 comments on commit 3d1892f

Please sign in to comment.