Skip to content

Commit

Permalink
feat: Resilience4j bulkhead and async ledger submitter
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Sep 15, 2021
1 parent 4ace7aa commit 6d5bf55
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 91 deletions.
22 changes: 20 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,24 @@
<!-- Resilience -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<artifactId>resilience4j-all</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-rxjava3</artifactId>
Expand Down Expand Up @@ -370,12 +380,20 @@
<!-- Resilience -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<artifactId>resilience4j-all</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-rxjava3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
*/
package com.blockchaintp.daml.participant;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;

import com.blockchaintp.daml.address.Identifier;
Expand All @@ -42,8 +40,6 @@
import com.daml.platform.akkastreams.dispatcher.Dispatcher;
import com.google.protobuf.ByteString;

import io.vavr.Tuple;
import io.vavr.Tuple2;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.concurrent.Await$;
Expand All @@ -66,8 +62,6 @@ public final class InProcLedgerSubmitter<A extends Identifier, B extends LedgerA
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(InProcLedgerSubmitter.class);
private final Dispatcher<Long> dispatcher;
private final TransactionLogWriter<Raw.LogEntryId, Raw.Envelope, Long> writer;
private final LinkedBlockingQueue<Tuple2<SubmissionReference, CommitPayload<A>>> queue;
private final ConcurrentHashMap<SubmissionReference, SubmissionStatus> status;
private final ExecutionContext context;

/**
Expand Down Expand Up @@ -108,8 +102,6 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
writer = CoercingTxLog.from(Bijection.of(UuidConverter::logEntryToUuid, UuidConverter::uuidtoLogEntry),
Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), Bijection.identity(), theTxLog);
dispatcher = theDispatcher;
queue = new LinkedBlockingQueue<>();
status = new ConcurrentHashMap<>();

comitter = new ValidatingCommitter<>(TimeProvider.UTC$.MODULE$::getCurrentTime,
SubmissionValidator.create(
Expand All @@ -124,57 +116,29 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
});

context = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor());

scala.concurrent.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()).execute(this::work);
}

private void work() {
var run = true;
while (run) {
try {
var next = queue.take();

status.put(next._1, SubmissionStatus.PARTIALLY_SUBMITTED);

var res = Await$.MODULE$.result(this.comitter.commit(next._2.getCorrelationId(), next._2.getSubmission(),
next._2.getSubmittingParticipantId(), context), Duration.Inf());

if (!(res instanceof SubmissionResult.Acknowledged$)) {
status.put(next._1, SubmissionStatus.REJECTED);
} else {
status.put(next._1, SubmissionStatus.SUBMITTED);
}
} catch (TimeoutException | InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread interrupted", e);
@Override
public CompletableFuture<SubmissionStatus> submitPayload(final CommitPayload<A> cp) {

run = false;
return CompletableFuture.supplyAsync(() -> {
SubmissionResult res = null;
try {
res = Await$.MODULE$.result(
this.comitter.commit(cp.getCorrelationId(), cp.getSubmission(), cp.getSubmittingParticipantId(), context),
Duration.Inf());
} catch (InterruptedException theE) {
return SubmissionStatus.PARTIALLY_SUBMITTED;
} catch (TimeoutException theE) {
return SubmissionStatus.PARTIALLY_SUBMITTED;
}
}
}

@Override
public SubmissionReference submitPayload(final CommitPayload<A> cp) {
var ref = new SubmissionReference();
try {
queue.put(Tuple.of(ref, cp));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Committer thread has been interrupted!");
}
status.put(ref, SubmissionStatus.ENQUEUED);

return ref;
}

@Override
public Optional<SubmissionStatus> checkSubmission(final SubmissionReference ref) {
return Optional.of(status.get(ref));
}
if (!(res instanceof SubmissionResult.Acknowledged$)) {
return SubmissionStatus.REJECTED;
} else {
return SubmissionStatus.SUBMITTED;
}
});

@Override
public CommitPayload<B> translatePayload(final CommitPayload<A> cp) {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public InProcLedgerSubmitterBuilder<I, A> withDispatcher(final Dispatcher<Long>
*
* @return A configured ledger sumbitter.
*/
public InProcLedgerSubmitter<I, A> build() {

return new InProcLedgerSubmitter<>(engine, metrics, txLog, stateStore, dispatcher);
public LedgerSubmitter<I, A> build() {
var inproc = new InProcLedgerSubmitter<I, A>(engine, metrics, txLog, stateStore, dispatcher);
return new LedgerSubmitterBulkhead<>(inproc);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package com.blockchaintp.daml.participant;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import com.blockchaintp.daml.address.Identifier;
import com.blockchaintp.daml.address.LedgerAddress;
Expand All @@ -35,23 +35,6 @@ public interface LedgerSubmitter<A extends Identifier, B extends LedgerAddress>
* the payload to submit.
* @return a reference to the submission.
*/
SubmissionReference submitPayload(CommitPayload<A> cp);
CompletableFuture<SubmissionStatus> submitPayload(CommitPayload<A> cp);

/**
* Check the status of a submission.
*
* @param ref
* the reference to check.
* @return a status object.
*/
Optional<SubmissionStatus> checkSubmission(SubmissionReference ref);

/**
* For convenience we can translate one the payload to the expected output payload.
*
* @param cp
* the input payload.
* @return the output payload.
*/
CommitPayload<B> translatePayload(CommitPayload<A> cp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.blockchaintp.daml.participant;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import com.blockchaintp.daml.address.Identifier;
import com.blockchaintp.daml.address.LedgerAddress;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.decorators.Decorators;
import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;

/**
* Resilience for an underlying ledgersubmitter.
*
* @param <A>
* @param <B>
*/
public final class LedgerSubmitterBulkhead<A extends Identifier, B extends LedgerAddress>
implements LedgerSubmitter<A, B> {
private static final LambdaLogger LOG = LambdaLoggerFactory.getLogger(InProcLedgerSubmitter.class);
private static final int MAX_CONCURRENT = 5;
private final LedgerSubmitter<A, B> inner;
private final CircuitBreaker circuitBreaker;
private final Bulkhead bulkhead;

/**
*
* @param theInner
*/
public LedgerSubmitterBulkhead(final LedgerSubmitter<A, B> theInner) {
inner = theInner;
// Create a CircuitBreaker with default configuration
circuitBreaker = CircuitBreaker.of("ledger-submitter", CircuitBreakerConfig.ofDefaults().custom().build());

// Create a Bulkhead with default configuration
bulkhead = Bulkhead.of("ledger-submitter", BulkheadConfig.custom().maxConcurrentCalls(MAX_CONCURRENT).build());
}

@Override
public CompletableFuture<SubmissionStatus> submitPayload(final CommitPayload<A> cp) {
return Decorators.ofCompletionStage(() -> inner.submitPayload(cp)).withCircuitBreaker(circuitBreaker)
.withBulkhead(bulkhead)
.withFallback(
Arrays.asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class),
throwable -> SubmissionStatus.OVERLOADED)
.get().toCompletableFuture();
}
}
35 changes: 23 additions & 12 deletions src/main/java/com/blockchaintp/daml/participant/Participant.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.compat.java8.FutureConverters;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters;
import scala.math.Ordering;

/**
Expand Down Expand Up @@ -134,17 +136,26 @@ public String participantId() {
public Future<SubmissionResult> commit(final String correlationId, final Raw.Envelope envelope,
final CommitMetadata metadata, final TelemetryContext telemetryContext) {

return Future.apply(() -> {
try {
var references = commitPayloadBuilder.build(envelope, metadata, correlationId).stream()
.map(submitter::submitPayload).collect(Collectors.toList());
LOG.trace("Received {} submission references", references.size());
return SubmissionResult.Acknowledged$.MODULE$;
} catch (final Exception e) {
LOG.warn("Interrupted while submitting transaction {}", () -> e);
Thread.currentThread().interrupt();
return new SubmissionResult.InternalError("Interrupted while submitting transaction");
}
}, context);
var payloads = commitPayloadBuilder.build(envelope, metadata, correlationId).stream().map(submitter::submitPayload)
.map(f -> f.thenApply((x) -> {
if (x == SubmissionStatus.OVERLOADED) {
return SubmissionResult.Overloaded$.MODULE$;
} else if (x == SubmissionStatus.REJECTED) {
return SubmissionResult.NotSupported$.MODULE$;
}
return SubmissionResult.Acknowledged$.MODULE$;
})).map(FutureConverters::toScala).collect(Collectors.toList());

return Future.foldLeft(CollectionConverters.CollectionHasAsScala(payloads).asScala().toSeq(),
SubmissionResult.Acknowledged$.MODULE$,
// Folding latch to any failed payload, unknown how this will affect retry behaviour without
// atomicity
(a, x) -> {
if (a == SubmissionResult.Acknowledged$.MODULE$ && x == SubmissionResult.Acknowledged$.MODULE$) {
return SubmissionResult.Acknowledged$.MODULE$;
}

return x;
}, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
* The submission status of a particular submission.
*/
public enum SubmissionStatus {
/**
* The participant cannot currently process this submission.
*/
OVERLOADED,
/**
* The participant has not yet submitted a proposal.
*/
Expand Down

0 comments on commit 6d5bf55

Please sign in to comment.