diff --git a/pom.xml b/pom.xml
index 13f07062..81cbf57a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,7 +164,7 @@
io.github.resilience4j
- resilience4j-circuitbreaker
+ resilience4j-all
1.7.1
@@ -172,6 +172,16 @@
resilience4j-retry
1.7.1
+
+ io.github.resilience4j
+ resilience4j-bulkhead
+ 1.7.1
+
+
+ io.github.resilience4j
+ resilience4j-circuitbreaker
+ 1.7.1
+
io.github.resilience4j
resilience4j-rxjava3
@@ -370,12 +380,20 @@
io.github.resilience4j
- resilience4j-circuitbreaker
+ resilience4j-all
io.github.resilience4j
resilience4j-retry
+
+ io.github.resilience4j
+ resilience4j-circuitbreaker
+
+
+ io.github.resilience4j
+ resilience4j-bulkhead
+
io.github.resilience4j
resilience4j-rxjava3
diff --git a/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java b/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java
index 72293f5c..eb6ff26a 100644
--- a/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java
+++ b/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java
@@ -13,11 +13,9 @@
*/
package com.blockchaintp.daml.participant;
-import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import com.blockchaintp.daml.address.Identifier;
@@ -42,8 +40,6 @@
import com.daml.platform.akkastreams.dispatcher.Dispatcher;
import com.google.protobuf.ByteString;
-import io.vavr.Tuple;
-import io.vavr.Tuple2;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.concurrent.Await$;
@@ -66,8 +62,6 @@ public final class InProcLedgerSubmitter dispatcher;
private final TransactionLogWriter writer;
- private final LinkedBlockingQueue>> queue;
- private final ConcurrentHashMap status;
private final ExecutionContext context;
/**
@@ -108,8 +102,6 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
writer = CoercingTxLog.from(Bijection.of(UuidConverter::logEntryToUuid, UuidConverter::uuidtoLogEntry),
Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), Bijection.identity(), theTxLog);
dispatcher = theDispatcher;
- queue = new LinkedBlockingQueue<>();
- status = new ConcurrentHashMap<>();
comitter = new ValidatingCommitter<>(TimeProvider.UTC$.MODULE$::getCurrentTime,
SubmissionValidator.create(
@@ -124,57 +116,29 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
});
context = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor());
-
- scala.concurrent.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()).execute(this::work);
}
- private void work() {
- var run = true;
- while (run) {
- try {
- var next = queue.take();
-
- status.put(next._1, SubmissionStatus.PARTIALLY_SUBMITTED);
-
- var res = Await$.MODULE$.result(this.comitter.commit(next._2.getCorrelationId(), next._2.getSubmission(),
- next._2.getSubmittingParticipantId(), context), Duration.Inf());
-
- if (!(res instanceof SubmissionResult.Acknowledged$)) {
- status.put(next._1, SubmissionStatus.REJECTED);
- } else {
- status.put(next._1, SubmissionStatus.SUBMITTED);
- }
- } catch (TimeoutException | InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Thread interrupted", e);
+ @Override
+ public CompletableFuture submitPayload(final CommitPayload cp) {
- run = false;
+ return CompletableFuture.supplyAsync(() -> {
+ SubmissionResult res = null;
+ try {
+ res = Await$.MODULE$.result(
+ this.comitter.commit(cp.getCorrelationId(), cp.getSubmission(), cp.getSubmittingParticipantId(), context),
+ Duration.Inf());
+ } catch (InterruptedException theE) {
+ return SubmissionStatus.PARTIALLY_SUBMITTED;
+ } catch (TimeoutException theE) {
+ return SubmissionStatus.PARTIALLY_SUBMITTED;
}
- }
- }
- @Override
- public SubmissionReference submitPayload(final CommitPayload cp) {
- var ref = new SubmissionReference();
- try {
- queue.put(Tuple.of(ref, cp));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Committer thread has been interrupted!");
- }
- status.put(ref, SubmissionStatus.ENQUEUED);
-
- return ref;
- }
-
- @Override
- public Optional checkSubmission(final SubmissionReference ref) {
- return Optional.of(status.get(ref));
- }
+ if (!(res instanceof SubmissionResult.Acknowledged$)) {
+ return SubmissionStatus.REJECTED;
+ } else {
+ return SubmissionStatus.SUBMITTED;
+ }
+ });
- @Override
- public CommitPayload translatePayload(final CommitPayload cp) {
- return null;
}
-
}
diff --git a/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitterBuilder.java b/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitterBuilder.java
index 1ea06e12..56d64a67 100644
--- a/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitterBuilder.java
+++ b/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitterBuilder.java
@@ -118,9 +118,9 @@ public InProcLedgerSubmitterBuilder withDispatcher(final Dispatcher
*
* @return A configured ledger sumbitter.
*/
- public InProcLedgerSubmitter build() {
-
- return new InProcLedgerSubmitter<>(engine, metrics, txLog, stateStore, dispatcher);
+ public LedgerSubmitter build() {
+ var inproc = new InProcLedgerSubmitter(engine, metrics, txLog, stateStore, dispatcher);
+ return new LedgerSubmitterBulkhead<>(inproc);
}
}
diff --git a/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java b/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java
index fc37747a..548cb23a 100644
--- a/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java
+++ b/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java
@@ -13,7 +13,7 @@
*/
package com.blockchaintp.daml.participant;
-import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import com.blockchaintp.daml.address.Identifier;
import com.blockchaintp.daml.address.LedgerAddress;
@@ -35,23 +35,6 @@ public interface LedgerSubmitter
* the payload to submit.
* @return a reference to the submission.
*/
- SubmissionReference submitPayload(CommitPayload cp);
+ CompletableFuture submitPayload(CommitPayload cp);
- /**
- * Check the status of a submission.
- *
- * @param ref
- * the reference to check.
- * @return a status object.
- */
- Optional checkSubmission(SubmissionReference ref);
-
- /**
- * For convenience we can translate one the payload to the expected output payload.
- *
- * @param cp
- * the input payload.
- * @return the output payload.
- */
- CommitPayload translatePayload(CommitPayload cp);
}
diff --git a/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java b/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java
new file mode 100644
index 00000000..81e268cf
--- /dev/null
+++ b/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2021 Blockchain Technology Partners
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.blockchaintp.daml.participant;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import com.blockchaintp.daml.address.Identifier;
+import com.blockchaintp.daml.address.LedgerAddress;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
+import io.github.resilience4j.circuitbreaker.CircuitBreaker;
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
+import io.github.resilience4j.decorators.Decorators;
+import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
+import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
+
+/**
+ * Resilience for an underlying ledgersubmitter.
+ *
+ * @param
+ * @param
+ */
+public final class LedgerSubmitterBulkhead
+ implements LedgerSubmitter {
+ private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(InProcLedgerSubmitter.class);
+ private static final int MAX_CONCURRENT = 5;
+ private final LedgerSubmitter inner;
+ private final CircuitBreaker circuitBreaker;
+ private final Bulkhead bulkhead;
+
+ /**
+ *
+ * @param theInner
+ */
+ public LedgerSubmitterBulkhead(final LedgerSubmitter theInner) {
+ inner = theInner;
+ // Create a CircuitBreaker with default configuration
+ circuitBreaker = CircuitBreaker.of("ledger-submitter", CircuitBreakerConfig.ofDefaults().custom().build());
+
+ // Create a Bulkhead with default configuration
+ bulkhead = Bulkhead.of("ledger-submitter", BulkheadConfig.custom().maxConcurrentCalls(MAX_CONCURRENT).build());
+ }
+
+ @Override
+ public CompletableFuture submitPayload(final CommitPayload cp) {
+ return Decorators.ofCompletionStage(() -> inner.submitPayload(cp)).withCircuitBreaker(circuitBreaker)
+ .withBulkhead(bulkhead)
+ .withFallback(
+ Arrays.asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class),
+ throwable -> SubmissionStatus.OVERLOADED)
+ .get().toCompletableFuture();
+ }
+}
diff --git a/src/main/java/com/blockchaintp/daml/participant/Participant.java b/src/main/java/com/blockchaintp/daml/participant/Participant.java
index 47d49a04..b12d1b72 100644
--- a/src/main/java/com/blockchaintp/daml/participant/Participant.java
+++ b/src/main/java/com/blockchaintp/daml/participant/Participant.java
@@ -41,8 +41,10 @@
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.Option;
import scala.Tuple2;
+import scala.compat.java8.FutureConverters;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
+import scala.jdk.CollectionConverters;
import scala.math.Ordering;
/**
@@ -134,17 +136,26 @@ public String participantId() {
public Future commit(final String correlationId, final Raw.Envelope envelope,
final CommitMetadata metadata, final TelemetryContext telemetryContext) {
- return Future.apply(() -> {
- try {
- var references = commitPayloadBuilder.build(envelope, metadata, correlationId).stream()
- .map(submitter::submitPayload).collect(Collectors.toList());
- LOG.trace("Received {} submission references", references.size());
- return SubmissionResult.Acknowledged$.MODULE$;
- } catch (final Exception e) {
- LOG.warn("Interrupted while submitting transaction {}", () -> e);
- Thread.currentThread().interrupt();
- return new SubmissionResult.InternalError("Interrupted while submitting transaction");
- }
- }, context);
+ var payloads = commitPayloadBuilder.build(envelope, metadata, correlationId).stream().map(submitter::submitPayload)
+ .map(f -> f.thenApply((x) -> {
+ if (x == SubmissionStatus.OVERLOADED) {
+ return SubmissionResult.Overloaded$.MODULE$;
+ } else if (x == SubmissionStatus.REJECTED) {
+ return SubmissionResult.NotSupported$.MODULE$;
+ }
+ return SubmissionResult.Acknowledged$.MODULE$;
+ })).map(FutureConverters::toScala).collect(Collectors.toList());
+
+ return Future.foldLeft(CollectionConverters.CollectionHasAsScala(payloads).asScala().toSeq(),
+ SubmissionResult.Acknowledged$.MODULE$,
+ // Folding latch to any failed payload, unknown how this will affect retry behaviour without
+ // atomicity
+ (a, x) -> {
+ if (a == SubmissionResult.Acknowledged$.MODULE$ && x == SubmissionResult.Acknowledged$.MODULE$) {
+ return SubmissionResult.Acknowledged$.MODULE$;
+ }
+
+ return x;
+ }, context);
}
}
diff --git a/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java b/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java
index 60162ee0..f1675535 100644
--- a/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java
+++ b/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java
@@ -17,6 +17,10 @@
* The submission status of a particular submission.
*/
public enum SubmissionStatus {
+ /**
+ * The participant cannot currently process this submission.
+ */
+ OVERLOADED,
/**
* The participant has not yet submitted a proposal.
*/