Skip to content

Commit

Permalink
fix: Pariticpant now commits synchronously so we are green but slow
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Sep 28, 2021
1 parent 97941ed commit 0688116
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package com.blockchaintp.daml.address;

import java.util.Objects;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;


/**
*
*/
Expand All @@ -33,4 +36,26 @@ public QldbIdentifier(final DamlKvutils.DamlStateKey theData) {
public DamlKvutils.DamlStateKey toKey() {
return data;
}

@Override
public boolean equals(final Object theO) {
if (this == theO) {
return true;
}
if (theO == null || getClass() != theO.getClass()) {
return false;
}
final QldbIdentifier that = (QldbIdentifier) theO;
return data.equals(that.data);
}

@Override
public int hashCode() {
return Objects.hash(data);
}

@Override
public String toString() {
return data.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
Expand All @@ -32,6 +33,7 @@ public final class CommitHighwaterMark {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(CommitHighwaterMark.class);
private final ConcurrentSkipListSet<Long> open = new ConcurrentSkipListSet<>();
private final AtomicLong highWatermark = new AtomicLong();

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import com.blockchaintp.daml.address.Identifier;
Expand All @@ -40,10 +41,8 @@
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.compat.java8.FutureConverters;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters;
import scala.math.Ordering;

/**
Expand Down Expand Up @@ -136,27 +135,24 @@ public String participantId() {
@Override
public Future<SubmissionResult> commit(final String correlationId, final Raw.Envelope envelope,
final CommitMetadata metadata, final TelemetryContext telemetryContext) {

var payloads = commitPayloadBuilder.build(envelope, metadata, correlationId).stream().map(submitter::submitPayload)
.map(f -> f.thenApply(x -> {
if (x == SubmissionStatus.OVERLOADED) {
return SubmissionResult.Overloaded$.MODULE$;
} else if (x == SubmissionStatus.REJECTED) {
return SubmissionResult.NotSupported$.MODULE$;
}
return SubmissionResult.Acknowledged$.MODULE$;
})).map(FutureConverters::toScala).collect(Collectors.toList());

return Future.foldLeft(CollectionConverters.CollectionHasAsScala(payloads).asScala().toSeq(),
SubmissionResult.Acknowledged$.MODULE$,
// Folding latch to any failed payload, unknown how this will affect retry behaviour without
// atomicity
(a, x) -> {
if (a == SubmissionResult.Acknowledged$.MODULE$ && x == SubmissionResult.Acknowledged$.MODULE$) {
synchronized (this) {

var commits = commitPayloadBuilder.build(envelope, metadata, correlationId).stream()
.map(payload -> submitter.submitPayload(payload).thenApply(x -> {
if (x == SubmissionStatus.OVERLOADED) {
return SubmissionResult.Overloaded$.MODULE$;
} else if (x == SubmissionStatus.REJECTED) {
return SubmissionResult.NotSupported$.MODULE$;
}
return SubmissionResult.Acknowledged$.MODULE$;
}
LOG.debug("Folding result {} {}", a, x);
return x;
}, context);
})).collect(Collectors.toList());

try {
var blockedRes = commits.stream().findFirst().get().get();
return Future.successful(blockedRes);
} catch (InterruptedException | ExecutionException e) {
return Future.successful(SubmissionResult.NotSupported$.MODULE$);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
*/
class StateAccess implements LedgerStateAccess<Long> {
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(StateAccess.class);
public static final int DELAY = 50;
private final Store<Raw.StateKey, Raw.Envelope> stateStore;
private final SerialisedSequenceAllocation sequenceAllocation;
private final TransactionLogWriter<Raw.LogEntryId, Raw.Envelope, Long> writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public PostgresStoreBuilder(final String theUrl) {
* @return A configured builder.
*/
public PostgresStoreBuilder migrate() {
var flyway = Flyway.configure().locations("classpath:migrations/store").dataSource(url, "", "").load();
var flyway = Flyway.configure().locations("classpath:migrations/store", "classpath:migrations/txlog")
.dataSource(url, "", "").load();

flyway.migrate();

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/resources/migrations/txlog/V1__txlog_init.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
create extension if not exists "uuid-ossp";
create table kv (
id bytea primary key not null,
data bytea not null
);

CREATE INDEX kv_idx ON kv(id);

create table tx (
id uuid primary key,
Expand Down
12 changes: 0 additions & 12 deletions docker/daml-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,21 @@ services:
sleep 120 && \
java -jar ledger-api-test-tool.jar \
--timeout-scale-factor 1 \
--exclude RaceConditionIT \
--exclude CommandDeduplicationIT \
--exclude SemanticTests:SemanticConcurrentDoubleSpend \
--concurrent-test-runs 8 \
daml-on-qldb:9000 \
|| \
java -jar ledger-api-test-tool.jar \
--timeout-scale-factor 1 \
--exclude RaceConditionIT \
--exclude CommandDeduplicationIT \
--exclude SemanticTests:SemanticConcurrentDoubleSpend \
--concurrent-test-runs 4 \
daml-on-qldb:9000 \
|| \
java -jar ledger-api-test-tool.jar \
--timeout-scale-factor 1 \
--exclude RaceConditionIT \
--exclude CommandDeduplicationIT \
--exclude SemanticTests:SemanticConcurrentDoubleSpend \
--concurrent-test-runs 2 \
daml-on-qldb:9000 \
|| \
java -jar ledger-api-test-tool.jar \
--timeout-scale-factor 1 \
--exclude RaceConditionIT \
--exclude CommandDeduplicationIT \
--exclude SemanticTests:SemanticConcurrentDoubleSpend \
--concurrent-test-runs 1 \
daml-on-qldb:9000 \
\""
Expand Down
34 changes: 28 additions & 6 deletions postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@ import com.daml.ledger.api.auth.AuthServiceJWT
import com.daml.ledger.api.auth.AuthServiceWildcard
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
import com.daml.ledger.participant.state.kvutils.app.Config
import com.daml.ledger.participant.state.kvutils.app.ParticipantConfig
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.daml.ledger.resources.ResourceContext
import com.daml.ledger.validator.DefaultStateKeySerializationStrategy
import com.daml.platform.configuration.CommandConfiguration
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.ProgramResource
import scopt.OptionParser

import scala.jdk.CollectionConverters._
import java.nio.file.Paths
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import scala.jdk.FunctionConverters.enrichAsJavaFunction
import scala.util.Try

Expand All @@ -57,17 +61,17 @@ object Main extends App {
"daml-on-qldb",
new LedgerFactory((config: Config[ExtraConfig], builder: ParticipantBuilder[QldbIdentifier, QldbAddress]) => {

val txLog = PostgresTransactionLog
.fromUrl(config.extra.txLogStore)
.migrate()
.build();

val stateStore = PostgresStore
.fromUrl(config.extra.txLogStore)
.migrate()
.retrying(3)
.build()

/// Only migrate the
val txLog = PostgresTransactionLog
.fromUrl(config.extra.txLogStore)
.migrate()
.build();

val inputAddressReader = (meta: CommitMetadata) =>
meta
.inputKeys(DefaultStateKeySerializationStrategy)
Expand Down Expand Up @@ -121,8 +125,26 @@ class LedgerFactory(
}
}

override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.defaultLocalLedger

override val defaultExtraConfig: ExtraConfig = ExtraConfig.default

override def commandConfig(
participantConfig: ParticipantConfig,
config: Config[ExtraConfig]
): CommandConfiguration = {
val DefaultTrackerRetentionPeriod: FiniteDuration = 5.minutes

CommandConfiguration(
inputBufferSize = 512,
maxParallelSubmissions = 1,
maxCommandsInFlight = 256,
limitMaxCommandsInFlight = true,
retentionPeriod = DefaultTrackerRetentionPeriod
)
}

private def validatePath(path: String, message: String) = {
val valid = Try(Paths.get(path).toFile.canRead).getOrElse(false)
if (valid) Right(()) else Left(message)
Expand Down

0 comments on commit 0688116

Please sign in to comment.