Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract tx publishing from watchers #1749

Merged
merged 12 commits into from
Apr 12, 2021
10 changes: 6 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair

import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.actor.{ActorContext, ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.pattern.after
import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
Expand All @@ -28,7 +28,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, Batch
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.channel.{Channel, Register, TxPublisher}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
Expand Down Expand Up @@ -230,10 +230,11 @@ class Setup(datadir: File,
})
_ <- feeratesRetrieved.future

extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))
watcher = {
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, extendedBitcoinClient), "watcher", SupervisorStrategy.Resume))
}

router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))
Expand Down Expand Up @@ -267,7 +268,8 @@ class Setup(datadir: File,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future

channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet)
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, extendedBitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet, txPublisherFactory)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package fr.acinq.eclair.blockchain

import akka.actor.ActorRef
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.bitcoin.{ByteVector32, Transaction}
import fr.acinq.eclair.channel.BitcoinEvent
import fr.acinq.eclair.transactions.Transactions.TransactionSigningKit
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement

/**
Expand Down Expand Up @@ -110,19 +108,6 @@ final case class WatchEventSpentBasic(event: BitcoinEvent) extends WatchEvent
// TODO: not implemented yet.
final case class WatchEventLost(event: BitcoinEvent) extends WatchEvent

// @formatter:off
sealed trait PublishStrategy
object PublishStrategy {
case object JustPublish extends PublishStrategy
case class SetFeerate(currentFeerate: FeeratePerKw, targetFeerate: FeeratePerKw, dustLimit: Satoshi, signingKit: TransactionSigningKit) extends PublishStrategy {
override def toString = s"SetFeerate(target=$targetFeerate)"
}
}
// @formatter:on

/** Publish the provided tx as soon as possible depending on lock time, csv and publishing strategy. */
final case class PublishAsap(tx: Transaction, strategy: PublishStrategy)

// @formatter:off
sealed trait UtxoStatus
object UtxoStatus {
Expand Down

Large diffs are not rendered by default.

55 changes: 39 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

package fr.acinq.eclair.channel

import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{ActorContext, ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.event.Logging.MDC
import akka.pattern.pipe
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, SatoshiLong, Script, ScriptFlags, Transaction}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelId, SignAndPublishTx}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.PendingRelayDb
Expand All @@ -47,7 +51,19 @@ import scala.util.{Failure, Success, Try}
*/

object Channel {
def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef]): Props = Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, origin_opt))

trait TxPublisherFactory {
def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): akka.actor.typed.ActorRef[TxPublisher.Command]
}

case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: ActorRef, bitcoinClient: ExtendedBitcoinClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): akka.actor.typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, watcher, bitcoinClient)).onFailure(akka.actor.typed.SupervisorStrategy.restart), "tx-publisher")
}
}

def props(nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: TxPublisherFactory, origin_opt: Option[ActorRef]): Props =
Props(new Channel(nodeParams, wallet, remoteNodeId, blockchain, relayer, txPublisherFactory, origin_opt))

// see /~https://github.com/lightningnetwork/lightning-rfc/blob/master/07-routing-gossip.md#requirements
val ANNOUNCEMENTS_MINCONF = 6
Expand Down Expand Up @@ -100,7 +116,7 @@ object Channel {

}

class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: ActorRef, relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {

import Channel._

Expand All @@ -111,14 +127,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

// we assume that the peer is the channel's parent
private val peer = context.parent
//noinspection ActorMutableStateInspection
// noinspection ActorMutableStateInspection
// the last active connection we are aware of; note that the peer manages connections and asynchronously notifies
// the channel, which means that if we get disconnected, the previous active connection will die and some messages will
// be sent to dead letters, before the channel gets notified of the disconnection; knowing that this will happen, we
// choose to not make this an Option (that would be None before the first connection), and instead embrace the fact
// that the active connection may point to dead letters at all time
private var activeConnection = context.system.deadLetters

private val txPublisher = txPublisherFactory.spawnTxPublisher(context, remoteNodeId)

// this will be used to detect htlc timeouts
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
// this will be used to make sure the current commitment fee is up-to-date
Expand Down Expand Up @@ -165,6 +183,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, _, localParams, remote, _, channelFlags, channelVersion), Nothing) =>
context.system.eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isFunder = true, temporaryChannelId, initialFeeratePerKw, Some(fundingTxFeeratePerKw)))
activeConnection = remote
txPublisher ! SetChannelId(remoteNodeId, temporaryChannelId)
val fundingPubKey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey
val channelKeyPath = keyManager.keyPath(localParams, channelVersion)
val open = OpenChannel(nodeParams.chainHash,
Expand Down Expand Up @@ -192,11 +211,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(inputFundee@INPUT_INIT_FUNDEE(_, localParams, remote, _, _), Nothing) if !localParams.isFunder =>
activeConnection = remote
txPublisher ! SetChannelId(remoteNodeId, inputFundee.temporaryChannelId)
goto(WAIT_FOR_OPEN_CHANNEL) using DATA_WAIT_FOR_OPEN_CHANNEL(inputFundee)

case Event(INPUT_RESTORED(data), _) =>
log.info("restoring channel")
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data.commitments.localParams.isFunder, data.commitments))
txPublisher ! SetChannelId(remoteNodeId, data.channelId)
data match {
// NB: order matters!
case closing: DATA_CLOSING if Closing.nothingAtStake(closing) =>
Expand Down Expand Up @@ -413,6 +434,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
)
val channelId = toLongId(fundingTx.hash, fundingTxOutputIndex)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
// NB: we don't send a ChannelSignatureSent for the first commit
goto(WAIT_FOR_FUNDING_SIGNED) using DATA_WAIT_FOR_FUNDING_SIGNED(channelId, localParams, remoteParams, fundingTx, fundingTxFee, initialRelayFees_opt, localSpec, localCommitTx, RemoteCommit(0, remoteSpec, remoteCommitTx.tx.txid, remoteFirstPerCommitmentPoint), open.channelFlags, channelVersion, fundingCreated) sending fundingCreated
Expand Down Expand Up @@ -469,6 +491,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
remoteNextCommitInfo = Right(randomKey.publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array,
commitInput, ShaChain.init, channelId = channelId)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
// NB: we don't send a ChannelSignatureSent for the first commit
Expand Down Expand Up @@ -1341,7 +1364,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => blockchain ! PublishAsap(claimTx.tx, PublishStrategy.JustPublish))
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx))
penaltyTxs.foreach(claimTx => blockchain ! WatchSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1355,7 +1378,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
blockchain ! PublishAsap(claimHtlcTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(claimHtlcTx)
blockchain ! WatchConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(claimHtlcTx.tx))
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -1990,7 +2013,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Some(fundingTx) =>
// if we are funder, we never give up
log.info(s"republishing the funding tx...")
blockchain ! PublishAsap(fundingTx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(fundingTx, "funding-tx")
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta(txid))
Expand Down Expand Up @@ -2142,7 +2165,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

private def doPublish(closingTx: ClosingTx): Unit = {
blockchain ! PublishAsap(closingTx.tx, PublishStrategy.JustPublish)
txPublisher ! PublishRawTx(closingTx)
blockchain ! WatchConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(closingTx.tx))
}

Expand Down Expand Up @@ -2171,11 +2194,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
/**
* This helper method will publish txs only if they haven't yet reached minDepth
*/
private def publishIfNeeded(txs: Iterable[PublishAsap], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputsAlreadySpent(publishTx.tx, irrevocablySpent))
process.foreach { publishTx =>
log.info(s"publishing txid=${publishTx.tx.txid}")
blockchain ! publishTx
txPublisher ! publishTx
}
skip.foreach(publishTx => log.info(s"no need to republish txid=${publishTx.tx.txid}, it has already been confirmed"))
}
Expand Down Expand Up @@ -2209,11 +2232,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
val txs = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ htlcTxs.values.flatten.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx)
txs.map(tx => PublishAsap(tx, PublishStrategy.JustPublish))
List(PublishRawTx(commitTx, "commit-tx")) ++ (claimMainDelayedOutputTx ++ htlcTxs.values.flatten ++ claimHtlcDelayedTxs).map(tx => PublishRawTx(tx))
case Transactions.AnchorOutputsCommitmentFormat =>
val (publishCommitTx, htlcTxs) = Helpers.Closing.createLocalCommitAnchorPublishStrategy(keyManager, commitments, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
List(publishCommitTx) ++ claimMainDelayedOutputTx.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish)) ++ htlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => SignAndPublishTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => SignAndPublishTx(tx, commitments) }
List(PublishRawTx(commitTx, "commit-tx")) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2276,7 +2299,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
import remoteCommitPublished._

val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2315,7 +2338,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishAsap(tx.tx, PublishStrategy.JustPublish))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package fr.acinq.eclair.channel
import akka.actor.{ActorRef, PossiblyHarmful}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.PublishAsap
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
Expand Down Expand Up @@ -105,7 +105,7 @@ case object BITCOIN_FUNDING_SPENT extends BitcoinEvent
case object BITCOIN_OUTPUT_SPENT extends BitcoinEvent
case class BITCOIN_TX_CONFIRMED(tx: Transaction) extends BitcoinEvent
case class BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(shortChannelId: ShortChannelId) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(publishChildTx: PublishAsap) extends BitcoinEvent
case class BITCOIN_PARENT_TX_CONFIRMED(childTx: PublishTx) extends BitcoinEvent

/*
.d8888b. .d88888b. 888b d888 888b d888 d8888 888b 888 8888888b. .d8888b.
Expand Down Expand Up @@ -293,7 +293,7 @@ sealed trait CommitPublished {
* @param claimHtlcDelayedTxs 3rd-stage txs (spending the output of HTLC txs).
* @param claimAnchorTxs txs spending anchor outputs to bump the feerate of the commitment tx (if applicable).
*/
case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: Option[ClaimLocalDelayedOutputTx], htlcTxs: Map[OutPoint, Option[HtlcTx]], claimHtlcDelayedTxs: List[ClaimLocalDelayedOutputTx], claimAnchorTxs: List[ClaimAnchorOutputTx], irrevocablySpent: Map[OutPoint, Transaction]) extends CommitPublished {
case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: Option[ClaimLocalDelayedOutputTx], htlcTxs: Map[OutPoint, Option[HtlcTx]], claimHtlcDelayedTxs: List[HtlcDelayedTx], claimAnchorTxs: List[ClaimAnchorOutputTx], irrevocablySpent: Map[OutPoint, Transaction]) extends CommitPublished {
/**
* A local commit is considered done when:
* - all commitment tx outputs that we can spend have been spent and confirmed (even if the spending tx was not ours)
Expand Down
Loading