Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract tx publishing from watchers #1749

Merged
merged 12 commits into from
Apr 12, 2021
10 changes: 6 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair

import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.actor.{ActorContext, ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.pattern.after
import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
Expand All @@ -28,7 +28,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, Batch
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.channel.{Channel, Register, TxPublisher}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
Expand Down Expand Up @@ -230,10 +230,11 @@ class Setup(datadir: File,
})
_ <- feeratesRetrieved.future

extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))
watcher = {
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, extendedBitcoinClient), "watcher", SupervisorStrategy.Resume))
}

router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))
Expand Down Expand Up @@ -267,7 +268,8 @@ class Setup(datadir: File,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future

channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet)
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, extendedBitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet, txPublisherFactory)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package fr.acinq.eclair.blockchain

import akka.actor.ActorRef
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.bitcoin.{ByteVector32, Transaction}
import fr.acinq.eclair.channel.BitcoinEvent
import fr.acinq.eclair.transactions.Transactions.TransactionSigningKit
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement

/**
Expand Down Expand Up @@ -110,19 +108,6 @@ final case class WatchEventSpentBasic(event: BitcoinEvent) extends WatchEvent
// TODO: not implemented yet.
final case class WatchEventLost(event: BitcoinEvent) extends WatchEvent

// @formatter:off
sealed trait PublishStrategy
object PublishStrategy {
case object JustPublish extends PublishStrategy
case class SetFeerate(currentFeerate: FeeratePerKw, targetFeerate: FeeratePerKw, dustLimit: Satoshi, signingKit: TransactionSigningKit) extends PublishStrategy {
override def toString = s"SetFeerate(target=$targetFeerate)"
}
}
// @formatter:on

/** Publish the provided tx as soon as possible depending on lock time, csv and publishing strategy. */
final case class PublishAsap(tx: Transaction, strategy: PublishStrategy)

// @formatter:off
sealed trait UtxoStatus
object UtxoStatus {
Expand Down

Large diffs are not rendered by default.

49 changes: 34 additions & 15 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

package fr.acinq.eclair.channel

import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, actorRefAdapter}
import akka.actor.{ActorContext, ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.event.Logging.MDC
import akka.pattern.pipe
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, SatoshiLong, Script, ScriptFlags, Transaction}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SignAndPublishTx}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.PendingRelayDb
Expand All @@ -47,7 +51,19 @@ import scala.util.{Failure, Success, Try}
*/

object Channel {
def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef]): Props = Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, origin_opt))

trait TxPublisherFactory {
def spawnTxPublisher(context: ActorContext): akka.actor.typed.ActorRef[TxPublisher.Command]
}

case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: ActorRef, bitcoinClient: ExtendedBitcoinClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext): akka.actor.typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, watcher, bitcoinClient)).onFailure(akka.actor.typed.SupervisorStrategy.restart), "tx-publisher")
}
}

def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: TxPublisherFactory, origin_opt: Option[ActorRef]): Props =
Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, txPublisherFactory, origin_opt))

// see /~https://github.com/lightningnetwork/lightning-rfc/blob/master/07-routing-gossip.md#requirements
val ANNOUNCEMENTS_MINCONF = 6
Expand Down Expand Up @@ -100,7 +116,7 @@ object Channel {

}

class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {

import Channel._

Expand All @@ -111,14 +127,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

// we assume that the peer is the channel's parent
private val peer = context.parent
//noinspection ActorMutableStateInspection
// noinspection ActorMutableStateInspection
// the last active connection we are aware of; note that the peer manages connections and asynchronously notifies
// the channel, which means that if we get disconnected, the previous active connection will die and some messages will
// be sent to dead letters, before the channel gets notified of the disconnection; knowing that this will happen, we
// choose to not make this an Option (that would be None before the first connection), and instead embrace the fact
// that the active connection may point to dead letters at all time
private var activeConnection = context.system.deadLetters

private val txPublisher = txPublisherFactory.spawnTxPublisher(context)

// this will be used to detect htlc timeouts
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
// this will be used to make sure the current commitment fee is up-to-date
Expand Down Expand Up @@ -1341,7 +1359,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => blockchain ! PublishAsap(claimTx.tx, PublishStrategy.JustPublish))
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(self, claimTx.tx))
penaltyTxs.foreach(claimTx => blockchain ! WatchSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1355,7 +1373,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
blockchain ! PublishAsap(claimHtlcTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(self, claimHtlcTx.tx)
blockchain ! WatchConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(claimHtlcTx.tx))
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -1990,7 +2008,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Some(fundingTx) =>
// if we are funder, we never give up
log.info(s"republishing the funding tx...")
blockchain ! PublishAsap(fundingTx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(self, fundingTx)
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta(txid))
Expand Down Expand Up @@ -2142,7 +2160,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

private def doPublish(closingTx: ClosingTx): Unit = {
blockchain ! PublishAsap(closingTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(self, closingTx.tx)
blockchain ! WatchConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(closingTx.tx))
}

Expand Down Expand Up @@ -2171,11 +2189,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
/**
* This helper method will publish txs only if they haven't yet reached minDepth
*/
private def publishIfNeeded(txs: Iterable[PublishAsap], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputsAlreadySpent(publishTx.tx, irrevocablySpent))
process.foreach { publishTx =>
log.info(s"publishing txid=${publishTx.tx.txid}")
blockchain ! publishTx
txPublisher ! publishTx
}
skip.foreach(publishTx => log.info(s"no need to republish txid=${publishTx.tx.txid}, it has already been confirmed"))
}
Expand Down Expand Up @@ -2210,10 +2228,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
val txs = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ htlcTxs.values.flatten.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx)
txs.map(tx => PublishAsap(tx, PublishStrategy.JustPublish))
txs.map(tx => PublishRawTx(self, tx))
case Transactions.AnchorOutputsCommitmentFormat =>
val (publishCommitTx, htlcTxs) = Helpers.Closing.createLocalCommitAnchorPublishStrategy(keyManager, commitments, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
List(publishCommitTx) ++ claimMainDelayedOutputTx.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish)) ++ htlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => SignAndPublishTx(self, tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => SignAndPublishTx(self, tx, commitments) }
List(PublishRawTx(self, commitTx)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(self, tx.tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(self, tx.tx))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2276,7 +2295,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
import remoteCommitPublished._

val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(self, tx.tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2315,7 +2334,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(self, tx.tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package fr.acinq.eclair.channel
import akka.actor.{ActorRef, PossiblyHarmful}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.PublishAsap
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
Expand Down Expand Up @@ -105,7 +105,7 @@ case object BITCOIN_FUNDING_SPENT extends BitcoinEvent
case object BITCOIN_OUTPUT_SPENT extends BitcoinEvent
case class BITCOIN_TX_CONFIRMED(tx: Transaction) extends BitcoinEvent
case class BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(shortChannelId: ShortChannelId) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(publishChildTx: PublishAsap) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(childTx: PublishTx) extends BitcoinEvent

/*
.d8888b. .d88888b. 888b d888 888b d888 d8888 888b 888 8888888b. .d8888b.
Expand Down
39 changes: 1 addition & 38 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, sha256}
import fr.acinq.bitcoin.Script._
import fr.acinq.bitcoin._
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.blockchain.fee.{FeeEstimator, FeeTargets, FeeratePerKw}
import fr.acinq.eclair.blockchain.{EclairWallet, PublishAsap, PublishStrategy}
import fr.acinq.eclair.channel.Channel.REFRESH_CHANNEL_UPDATE_INTERVAL
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
Expand Down Expand Up @@ -586,43 +586,6 @@ object Helpers {
}
}

/**
* Create tx publishing strategy (target feerate) for our local commit tx and its HTLC txs. Only used for anchor outputs.
*/
def createLocalCommitAnchorPublishStrategy(keyManager: ChannelKeyManager, commitments: Commitments, feeEstimator: FeeEstimator, feeTargets: FeeTargets): (PublishAsap, List[PublishAsap]) = {
val commitTx = commitments.localCommit.publishableTxs.commitTx.tx
val currentFeerate = commitments.localCommit.spec.feeratePerKw
val targetFeerate = feeEstimator.getFeeratePerKw(feeTargets.commitmentBlockTarget)
val localFundingPubKey = keyManager.fundingPublicKey(commitments.localParams.fundingKeyPath)
val channelKeyPath = keyManager.keyPath(commitments.localParams, commitments.channelVersion)
val localPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, commitments.localCommit.index)
val localHtlcBasepoint = keyManager.htlcPoint(channelKeyPath)

// If we have an anchor output available, we will use it to CPFP the commit tx.
val publishCommitTx = Transactions.makeClaimLocalAnchorOutputTx(commitTx, localFundingPubKey.publicKey).map(claimAnchorOutputTx => {
TransactionSigningKit.ClaimAnchorOutputSigningKit(keyManager, claimAnchorOutputTx, localFundingPubKey)
}) match {
case Left(_) => PublishAsap(commitTx, PublishStrategy.JustPublish)
case Right(signingKit) => PublishAsap(commitTx, PublishStrategy.SetFeerate(currentFeerate, targetFeerate, commitments.localParams.dustLimit, signingKit))
}

// HTLC txs will use RBF to add wallet inputs to reach the targeted feerate.
val preimages = commitments.localChanges.all.collect { case u: UpdateFulfillHtlc => u.paymentPreimage }.map(r => Crypto.sha256(r) -> r).toMap
val htlcTxs = commitments.localCommit.publishableTxs.htlcTxsAndSigs.collect {
case HtlcTxAndSigs(htlcSuccess: Transactions.HtlcSuccessTx, localSig, remoteSig) if preimages.contains(htlcSuccess.paymentHash) =>
val preimage = preimages(htlcSuccess.paymentHash)
val signedTx = Transactions.addSigs(htlcSuccess, localSig, remoteSig, preimage, commitments.commitmentFormat)
val signingKit = TransactionSigningKit.HtlcSuccessSigningKit(keyManager, commitments.commitmentFormat, signedTx, localHtlcBasepoint, localPerCommitmentPoint, remoteSig, preimage)
PublishAsap(signedTx.tx, PublishStrategy.SetFeerate(currentFeerate, targetFeerate, commitments.localParams.dustLimit, signingKit))
case HtlcTxAndSigs(htlcTimeout: Transactions.HtlcTimeoutTx, localSig, remoteSig) =>
val signedTx = Transactions.addSigs(htlcTimeout, localSig, remoteSig, commitments.commitmentFormat)
val signingKit = TransactionSigningKit.HtlcTimeoutSigningKit(keyManager, commitments.commitmentFormat, signedTx, localHtlcBasepoint, localPerCommitmentPoint, remoteSig)
PublishAsap(signedTx.tx, PublishStrategy.SetFeerate(currentFeerate, targetFeerate, commitments.localParams.dustLimit, signingKit))
}

(publishCommitTx, htlcTxs)
}

/**
* Claim all the HTLCs that we've received from their current commit tx, if the channel used option_static_remotekey
* we don't need to claim our main output because it directly pays to one of our wallet's p2wpkh addresses.
Expand Down
Loading