Skip to content

Commit

Permalink
Extract tx publishing from watchers (#1749)
Browse files Browse the repository at this point in the history
Introduce a `TxPublisher` actor to publish channel txs.
Move logic from watcher to this new actor.

Remove the `TxSigningKit` abstraction that was introduced a bit too early.
The `TxPublisher` will hold all the logic so we'll start by providing the full
commitments, and we'll extract more compact objects later.

We also now publish the commit-tx and its anchor-tx independently.
  • Loading branch information
t-bast authored Apr 12, 2021
1 parent 3da0b80 commit 48c0c4c
Show file tree
Hide file tree
Showing 27 changed files with 1,561 additions and 1,308 deletions.
10 changes: 6 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair

import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.actor.{ActorContext, ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.pattern.after
import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
Expand All @@ -28,7 +28,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, Batch
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.channel.{Channel, Register, TxPublisher}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
Expand Down Expand Up @@ -230,10 +230,11 @@ class Setup(datadir: File,
})
_ <- feeratesRetrieved.future

extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))
watcher = {
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, extendedBitcoinClient), "watcher", SupervisorStrategy.Resume))
}

router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))
Expand Down Expand Up @@ -267,7 +268,8 @@ class Setup(datadir: File,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future

channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet)
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, extendedBitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet, txPublisherFactory)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package fr.acinq.eclair.blockchain

import akka.actor.ActorRef
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.bitcoin.{ByteVector32, Transaction}
import fr.acinq.eclair.channel.BitcoinEvent
import fr.acinq.eclair.transactions.Transactions.TransactionSigningKit
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement

/**
Expand Down Expand Up @@ -110,19 +108,6 @@ final case class WatchEventSpentBasic(event: BitcoinEvent) extends WatchEvent
// TODO: not implemented yet.
final case class WatchEventLost(event: BitcoinEvent) extends WatchEvent

// @formatter:off
sealed trait PublishStrategy
object PublishStrategy {
case object JustPublish extends PublishStrategy
case class SetFeerate(currentFeerate: FeeratePerKw, targetFeerate: FeeratePerKw, dustLimit: Satoshi, signingKit: TransactionSigningKit) extends PublishStrategy {
override def toString = s"SetFeerate(target=$targetFeerate)"
}
}
// @formatter:on

/** Publish the provided tx as soon as possible depending on lock time, csv and publishing strategy. */
final case class PublishAsap(tx: Transaction, strategy: PublishStrategy)

// @formatter:off
sealed trait UtxoStatus
object UtxoStatus {
Expand Down

Large diffs are not rendered by default.

55 changes: 39 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

package fr.acinq.eclair.channel

import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{ActorContext, ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.event.Logging.MDC
import akka.pattern.pipe
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, SatoshiLong, Script, ScriptFlags, Transaction}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelId, SignAndPublishTx}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.PendingRelayDb
Expand All @@ -47,7 +51,19 @@ import scala.util.{Failure, Success, Try}
*/

object Channel {
def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef]): Props = Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, origin_opt))

trait TxPublisherFactory {
def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): akka.actor.typed.ActorRef[TxPublisher.Command]
}

case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: ActorRef, bitcoinClient: ExtendedBitcoinClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): akka.actor.typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, watcher, bitcoinClient)).onFailure(akka.actor.typed.SupervisorStrategy.restart), "tx-publisher")
}
}

def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: TxPublisherFactory, origin_opt: Option[ActorRef]): Props =
Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, txPublisherFactory, origin_opt))

// see /~https://github.com/lightningnetwork/lightning-rfc/blob/master/07-routing-gossip.md#requirements
val ANNOUNCEMENTS_MINCONF = 6
Expand Down Expand Up @@ -100,7 +116,7 @@ object Channel {

}

class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {

import Channel._

Expand All @@ -111,14 +127,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

// we assume that the peer is the channel's parent
private val peer = context.parent
//noinspection ActorMutableStateInspection
// noinspection ActorMutableStateInspection
// the last active connection we are aware of; note that the peer manages connections and asynchronously notifies
// the channel, which means that if we get disconnected, the previous active connection will die and some messages will
// be sent to dead letters, before the channel gets notified of the disconnection; knowing that this will happen, we
// choose to not make this an Option (that would be None before the first connection), and instead embrace the fact
// that the active connection may point to dead letters at all time
private var activeConnection = context.system.deadLetters

private val txPublisher = txPublisherFactory.spawnTxPublisher(context, remoteNodeId)

// this will be used to detect htlc timeouts
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
// this will be used to make sure the current commitment fee is up-to-date
Expand Down Expand Up @@ -165,6 +183,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, _, localParams, remote, _, channelFlags, channelVersion), Nothing) =>
context.system.eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isFunder = true, temporaryChannelId, initialFeeratePerKw, Some(fundingTxFeeratePerKw)))
activeConnection = remote
txPublisher ! SetChannelId(remoteNodeId, temporaryChannelId)
val fundingPubKey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey
val channelKeyPath = keyManager.keyPath(localParams, channelVersion)
val open = OpenChannel(nodeParams.chainHash,
Expand Down Expand Up @@ -192,11 +211,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(inputFundee@INPUT_INIT_FUNDEE(_, localParams, remote, _, _), Nothing) if !localParams.isFunder =>
activeConnection = remote
txPublisher ! SetChannelId(remoteNodeId, inputFundee.temporaryChannelId)
goto(WAIT_FOR_OPEN_CHANNEL) using DATA_WAIT_FOR_OPEN_CHANNEL(inputFundee)

case Event(INPUT_RESTORED(data), _) =>
log.info("restoring channel")
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data.commitments.localParams.isFunder, data.commitments))
txPublisher ! SetChannelId(remoteNodeId, data.channelId)
data match {
// NB: order matters!
case closing: DATA_CLOSING if Closing.nothingAtStake(closing) =>
Expand Down Expand Up @@ -413,6 +434,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
)
val channelId = toLongId(fundingTx.hash, fundingTxOutputIndex)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
// NB: we don't send a ChannelSignatureSent for the first commit
goto(WAIT_FOR_FUNDING_SIGNED) using DATA_WAIT_FOR_FUNDING_SIGNED(channelId, localParams, remoteParams, fundingTx, fundingTxFee, initialRelayFees_opt, localSpec, localCommitTx, RemoteCommit(0, remoteSpec, remoteCommitTx.tx.txid, remoteFirstPerCommitmentPoint), open.channelFlags, channelVersion, fundingCreated) sending fundingCreated
Expand Down Expand Up @@ -469,6 +491,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
remoteNextCommitInfo = Right(randomKey.publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array,
commitInput, ShaChain.init, channelId = channelId)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
// NB: we don't send a ChannelSignatureSent for the first commit
Expand Down Expand Up @@ -1341,7 +1364,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => blockchain ! PublishAsap(claimTx.tx, PublishStrategy.JustPublish))
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx))
penaltyTxs.foreach(claimTx => blockchain ! WatchSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1355,7 +1378,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
blockchain ! PublishAsap(claimHtlcTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(claimHtlcTx)
blockchain ! WatchConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(claimHtlcTx.tx))
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -1990,7 +2013,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Some(fundingTx) =>
// if we are funder, we never give up
log.info(s"republishing the funding tx...")
blockchain ! PublishAsap(fundingTx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(fundingTx, "funding-tx")
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta(txid))
Expand Down Expand Up @@ -2142,7 +2165,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

private def doPublish(closingTx: ClosingTx): Unit = {
blockchain ! PublishAsap(closingTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(closingTx)
blockchain ! WatchConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(closingTx.tx))
}

Expand Down Expand Up @@ -2171,11 +2194,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
/**
* This helper method will publish txs only if they haven't yet reached minDepth
*/
private def publishIfNeeded(txs: Iterable[PublishAsap], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputsAlreadySpent(publishTx.tx, irrevocablySpent))
process.foreach { publishTx =>
log.info(s"publishing txid=${publishTx.tx.txid}")
blockchain ! publishTx
txPublisher ! publishTx
}
skip.foreach(publishTx => log.info(s"no need to republish txid=${publishTx.tx.txid}, it has already been confirmed"))
}
Expand Down Expand Up @@ -2209,11 +2232,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
val txs = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ htlcTxs.values.flatten.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx)
txs.map(tx => PublishAsap(tx, PublishStrategy.JustPublish))
List(PublishRawTx(commitTx, "commit-tx")) ++ (claimMainDelayedOutputTx ++ htlcTxs.values.flatten ++ claimHtlcDelayedTxs).map(tx => PublishRawTx(tx))
case Transactions.AnchorOutputsCommitmentFormat =>
val (publishCommitTx, htlcTxs) = Helpers.Closing.createLocalCommitAnchorPublishStrategy(keyManager, commitments, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
List(publishCommitTx) ++ claimMainDelayedOutputTx.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish)) ++ htlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => SignAndPublishTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => SignAndPublishTx(tx, commitments) }
List(PublishRawTx(commitTx, "commit-tx")) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2276,7 +2299,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
import remoteCommitPublished._

val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2315,7 +2338,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package fr.acinq.eclair.channel
import akka.actor.{ActorRef, PossiblyHarmful}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.PublishAsap
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
Expand Down Expand Up @@ -105,7 +105,7 @@ case object BITCOIN_FUNDING_SPENT extends BitcoinEvent
case object BITCOIN_OUTPUT_SPENT extends BitcoinEvent
case class BITCOIN_TX_CONFIRMED(tx: Transaction) extends BitcoinEvent
case class BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(shortChannelId: ShortChannelId) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(publishChildTx: PublishAsap) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(childTx: PublishTx) extends BitcoinEvent

/*
.d8888b. .d88888b. 888b d888 888b d888 d8888 888b 888 8888888b. .d8888b.
Expand Down Expand Up @@ -293,7 +293,7 @@ sealed trait CommitPublished {
* @param claimHtlcDelayedTxs 3rd-stage txs (spending the output of HTLC txs).
* @param claimAnchorTxs txs spending anchor outputs to bump the feerate of the commitment tx (if applicable).
*/
case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: Option[ClaimLocalDelayedOutputTx], htlcTxs: Map[OutPoint, Option[HtlcTx]], claimHtlcDelayedTxs: List[ClaimLocalDelayedOutputTx], claimAnchorTxs: List[ClaimAnchorOutputTx], irrevocablySpent: Map[OutPoint, Transaction]) extends CommitPublished {
case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: Option[ClaimLocalDelayedOutputTx], htlcTxs: Map[OutPoint, Option[HtlcTx]], claimHtlcDelayedTxs: List[HtlcDelayedTx], claimAnchorTxs: List[ClaimAnchorOutputTx], irrevocablySpent: Map[OutPoint, Transaction]) extends CommitPublished {
/**
* A local commit is considered done when:
* - all commitment tx outputs that we can spend have been spent and confirmed (even if the spending tx was not ours)
Expand Down
Loading

0 comments on commit 48c0c4c

Please sign in to comment.