Skip to content

Commit

Permalink
feat: An inproc ledger submitter with a similar design to legacy do-qldb
Browse files Browse the repository at this point in the history
Many dependencies to map still

Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Aug 4, 2021
1 parent 191c805 commit f9e2c50
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 30 deletions.
9 changes: 7 additions & 2 deletions src/main/java/com/blockchaintp/daml/address/Identifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
*/
package com.blockchaintp.daml.address;

import com.daml.ledger.validator.StateKeySerializationStrategy;
import com.daml.ledger.participant.state.kvutils.DamlKvutils;

/**
* A generic identifier.
*/
public interface Identifier extends StateKeySerializationStrategy {
public interface Identifier {
/**
*
* @return The state key;
*/
DamlKvutils.DamlStateKey toKey();
}
7 changes: 1 addition & 6 deletions src/main/java/com/blockchaintp/daml/address/QldbAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
package com.blockchaintp.daml.address;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.Raw;

/**
*
*/
public final class QldbAddress implements LedgerAddress {
@Override
public Raw.StateKey serializeStateKey(final DamlKvutils.DamlStateKey key) {
return null;
}

@Override
public DamlKvutils.DamlStateKey deserializeStateKey(final Raw.StateKey input) {
public DamlKvutils.DamlStateKey toKey() {
return null;
}
}
16 changes: 2 additions & 14 deletions src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,14 @@
package com.blockchaintp.daml.address;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.Raw;
import com.daml.ledger.validator.DefaultStateKeySerializationStrategy;

/**
*
*/
public final class QldbIdentifier implements Identifier {
/**
*
*/
public QldbIdentifier() {
}

@Override
public Raw.StateKey serializeStateKey(final DamlKvutils.DamlStateKey key) {
return DefaultStateKeySerializationStrategy.serializeStateKey(key);
}

@Override
public DamlKvutils.DamlStateKey deserializeStateKey(final Raw.StateKey input) {
return DefaultStateKeySerializationStrategy.deserializeStateKey(input);
public DamlKvutils.DamlStateKey toKey() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @param <A>
* the type of the identifier (e.g. {@link LedgerAddress} or {@link Identifier})
*/
public class CommitPayload<A extends Identifier> {
public final class CommitPayload<A extends Identifier> {
private final CommitMetadata metadata;
private DamlOperation operation;
private Set<A> reads;
Expand Down Expand Up @@ -56,4 +56,12 @@ public CommitPayload(final DamlOperation theOperation, final CommitMetadata theM
public DamlOperation getOperation() {
return operation;
}

/**
*
* @return The set of read addresses.
*/
public Set<A> getReads() {
return reads;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,36 @@
*/
package com.blockchaintp.daml.participant;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import com.blockchaintp.daml.address.Identifier;
import com.blockchaintp.daml.address.LedgerAddress;
import com.blockchaintp.daml.stores.exception.StoreReadException;
import com.blockchaintp.daml.stores.exception.StoreWriteException;
import com.blockchaintp.daml.stores.service.Key;
import com.blockchaintp.daml.stores.service.Store;
import com.blockchaintp.daml.stores.service.TransactionLogWriter;
import com.blockchaintp.daml.stores.service.Value;
import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting;
import com.daml.ledger.participant.state.v1.Configuration;
import com.daml.ledger.participant.state.v1.Offset;
import com.daml.lf.data.Time;
import com.daml.logging.LoggingContext;
import com.google.protobuf.InvalidProtocolBufferException;

import io.vavr.Tuple;
import io.vavr.Tuple2;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.jdk.javaapi.CollectionConverters$;
import scala.jdk.javaapi.OptionConverters$;

/**
* An in process submitter relying on an ephemeral queue.
Expand All @@ -38,33 +53,92 @@
public final class InProcLedgerSubmitter<A extends Identifier, B extends LedgerAddress>
implements LedgerSubmitter<A, B> {
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(InProcLedgerSubmitter.class);
private final TransactionLogWriter<DamlKvutils.DamlLogEntryId, CommitPayload<A>, Offset> writer;
private ExecutionContext context;
private final KeyValueCommitting committing;
private final TransactionLogWriter<DamlKvutils.DamlLogEntryId, DamlKvutils.DamlLogEntry, Offset> writer;
private final Store<DamlKvutils.DamlStateKey, DamlKvutils.DamlStateValue> stateStore;
private final ExecutionContext context;
private final String participantId;
private final Configuration configuration;
private final LoggingContext loggingContext;
private final LinkedBlockingQueue<Tuple2<SubmissionReference, CommitPayload<A>>> queue;
private final ConcurrentHashMap<SubmissionReference, SubmissionStatus> status;

/**
*
* @param theCommitting
* @param theWriter
* @param theStateStore
* @param theContext
* @param theParticipantId
* @param theConfiguration
* @param theLoggingContext
*/
public InProcLedgerSubmitter(
final TransactionLogWriter<DamlKvutils.DamlLogEntryId, CommitPayload<A>, Offset> theWriter,
final ExecutionContext theContext) {
public InProcLedgerSubmitter(final KeyValueCommitting theCommitting,
final TransactionLogWriter<DamlKvutils.DamlLogEntryId, DamlKvutils.DamlLogEntry, Offset> theWriter,
final Store<DamlKvutils.DamlStateKey, DamlKvutils.DamlStateValue> theStateStore,
final ExecutionContext theContext, final String theParticipantId, final Configuration theConfiguration,
final LoggingContext theLoggingContext) {
committing = theCommitting;
writer = theWriter;
stateStore = theStateStore;
context = theContext;
participantId = theParticipantId;
configuration = theConfiguration;
loggingContext = theLoggingContext;
queue = new LinkedBlockingQueue<>();
status = new ConcurrentHashMap<>();
context.execute(this::work);
}

private Time.Timestamp getCurrentRecordTime() {
return Time.Timestamp$.MODULE$.now();
}

private <A, B> scala.collection.immutable.Map<A, B> mapToScalaImmutableMap(final java.util.Map<A, B> m) {
return scala.collection.immutable.Map$.MODULE$.from(CollectionConverters$.MODULE$.asScala(m));
}

private <A, B> java.util.Map<A, B> scalaMapToMap(final scala.collection.immutable.Map<A, B> m) {
return CollectionConverters$.MODULE$.asJava(m);
}

/**
*
* Do the work of submitting this to our underlying txlog and processing the input and output
* states.
*/
private void work() {
while (true) {
var next = queue.poll();

status.put(next._1, SubmissionStatus.PARTIALLY_SUBMITTED);

var inputKeys = next._2.getReads().stream().map(Identifier::toKey).map(Key::of).collect(Collectors.toList());

var sparseInputs = inputKeys.stream().collect(Collectors.toMap(k -> k.toNative(),
k -> OptionConverters$.MODULE$.toScala(Optional.<DamlKvutils.DamlStateValue>empty())));

try {
stateStore.get(inputKeys).entrySet().forEach(kv -> sparseInputs.put(kv.getKey().toNative(),
OptionConverters$.MODULE$.toScala(Optional.of(kv.getValue().toNative()))));

var entryId = writer.begin();

var rx = committing.processSubmission(entryId, getCurrentRecordTime(), configuration,
DamlKvutils.DamlSubmission.parseFrom(next._2.getOperation().getTransaction().getSubmission()),
participantId, mapToScalaImmutableMap(sparseInputs), loggingContext);

var outputMap = scalaMapToMap(rx._2);

stateStore.put(outputMap.entrySet().stream().map(kv -> Map.entry(Key.of(kv.getKey()), Value.of(kv.getValue())))
.collect(Collectors.toList()));

writer.sendEvent(entryId, rx._1);
writer.commit(entryId);

status.put(next._1, SubmissionStatus.SUBMITTED);

} catch (StoreWriteException | StoreReadException | InvalidProtocolBufferException e) {
LOG.error("Could not submit payload {} due to {}", () -> next._1, () -> e);
}
}
}

Expand Down

0 comments on commit f9e2c50

Please sign in to comment.