Skip to content

Commit

Permalink
fix: Immediately return on submission, enqueue
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Sep 28, 2021
1 parent fa40be0 commit 966e090
Showing 1 changed file with 30 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.blockchaintp.daml.address.Identifier;
import com.blockchaintp.daml.address.LedgerAddress;
Expand Down Expand Up @@ -58,7 +62,9 @@ public final class Participant<I extends Identifier, A extends LedgerAddress> im
private final String ledgerId;
private final String participantId;
private final Dispatcher<Long> dispatcher;
private final BlockingDeque<CommitPayload<I>> submissions = new LinkedBlockingDeque<>();
private final ExecutionContextExecutor context;
private final ScheduledExecutorService pollExecutor;

/**
* Convenience method for creating a builder.
Expand Down Expand Up @@ -95,6 +101,25 @@ public Participant(final TransactionLogReader<Long, Raw.LogEntryId, Raw.Envelope
participantId = theParticipantId;
dispatcher = theDispatcher;
context = theContext;
pollExecutor = Executors.newSingleThreadScheduledExecutor();
pollExecutor.scheduleAtFixedRate(this::work, 0, 1, TimeUnit.MILLISECONDS);
}

private void work() {
var next = submissions.poll();

if (next != null) {
LOG.debug("Commit correlation id {}", next.getCorrelationId());

try {
var result = submitter.submitPayload(next).get();

LOG.info("Submission result for {} {}", next.getCorrelationId(), result);

} catch (InterruptedException | ExecutionException theE) {
Thread.currentThread().interrupt();
}
}
}

@Override
Expand Down Expand Up @@ -135,24 +160,9 @@ public String participantId() {
@Override
public Future<SubmissionResult> commit(final String correlationId, final Raw.Envelope envelope,
final CommitMetadata metadata, final TelemetryContext telemetryContext) {
synchronized (this) {
LOG.debug("Commit correlation id {}", correlationId);
var commits = commitPayloadBuilder.build(envelope, metadata, correlationId).stream()
.map(payload -> submitter.submitPayload(payload).thenApply(x -> {
if (x == SubmissionStatus.OVERLOADED) {
return SubmissionResult.Overloaded$.MODULE$;
} else if (x == SubmissionStatus.REJECTED) {
return SubmissionResult.NotSupported$.MODULE$;
}
return SubmissionResult.Acknowledged$.MODULE$;
})).collect(Collectors.toList());
try {
var blockedRes = commits.stream().findFirst().get().get();
return Future.successful(blockedRes);
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupted();
return Future.successful(new SubmissionResult.InternalError(e.toString()));
}
}

commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submissions::add);

return Future.successful(SubmissionResult.Acknowledged$.MODULE$);
}
}

0 comments on commit 966e090

Please sign in to comment.