Skip to content

Commit

Permalink
Extract tx publishing from ZmqWatcher
Browse files Browse the repository at this point in the history
Introduce a TxPublisher actor to publish channel txs.
Move logic from watcher to this new actor, with minimal functional changes.
  • Loading branch information
t-bast committed Mar 31, 2021
1 parent 936f36b commit 91ae610
Show file tree
Hide file tree
Showing 24 changed files with 1,344 additions and 1,210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ object NodeParams extends Logging {
require(featuresErr.isEmpty, featuresErr.map(_.message))
require(features.hasFeature(Features.VariableLengthOnion), s"${Features.VariableLengthOnion.rfcName} must be enabled")
require(!features.hasFeature(Features.InitialRoutingSync), s"${Features.InitialRoutingSync.rfcName} is not supported anymore, use ${Features.ChannelRangeQueries.rfcName} instead")
require(watcherType == BITCOIND || !features.hasFeature(Features.AnchorOutputs), s"${Features.AnchorOutputs.rfcName} is not supported with electrum, use bitcoind instead")
}

val pluginMessageParams = pluginParams.collect { case p: CustomFeaturePlugin => p }
Expand Down
22 changes: 16 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair

import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.actor.{ActorContext, ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.pattern.after
import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
Expand All @@ -33,7 +33,7 @@ import fr.acinq.eclair.blockchain.electrum._
import fr.acinq.eclair.blockchain.electrum.db.sqlite.SqliteWalletDb
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.channel.{Channel, Register, TxPublisher}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
Expand Down Expand Up @@ -266,15 +266,25 @@ class Setup(datadir: File,
})
_ <- feeratesRetrieved.future

watcher = bitcoin match {
(watcher, txPublisherFactory) = bitcoin match {
case Bitcoind(bitcoinClient) =>
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
val extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))
val watcher = system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, extendedBitcoinClient), "watcher", SupervisorStrategy.Resume))
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, extendedBitcoinClient)
(watcher, txPublisherFactory)
case Electrum(electrumClient) =>
zmqBlockConnected.success(Done)
zmqTxConnected.success(Done)
system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(blockCount, electrumClient)), "watcher", SupervisorStrategy.Resume))
val watcher = system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(blockCount, electrumClient)), "watcher", SupervisorStrategy.Resume))
val txPublisherFactory: Channel.TxPublisherFactory = new Channel.TxPublisherFactory {
// @formatter:off
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
override def spawnTxPublisher(context: ActorContext): akka.actor.typed.ActorRef[TxPublisher.Command] = watcher
// @formatter:on
}
(watcher, txPublisherFactory)
}

router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))
Expand Down Expand Up @@ -315,7 +325,7 @@ class Setup(datadir: File,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future

channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, wallet, txPublisherFactory)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package fr.acinq.eclair.blockchain

import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Script, ScriptWitness, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.bitcoin.{ByteVector32, Script, ScriptWitness, Transaction}
import fr.acinq.eclair.channel.BitcoinEvent
import fr.acinq.eclair.transactions.Transactions.TransactionSigningKit
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import scodec.bits.ByteVector

Expand Down Expand Up @@ -138,17 +136,6 @@ final case class WatchEventSpentBasic(event: BitcoinEvent) extends WatchEvent
// TODO: not implemented yet.
final case class WatchEventLost(event: BitcoinEvent) extends WatchEvent

sealed trait PublishStrategy
object PublishStrategy {
case object JustPublish extends PublishStrategy
case class SetFeerate(currentFeerate: FeeratePerKw, targetFeerate: FeeratePerKw, dustLimit: Satoshi, signingKit: TransactionSigningKit) extends PublishStrategy {
override def toString = s"SetFeerate(target=$targetFeerate)"
}
}

/** Publish the provided tx as soon as possible depending on lock time, csv and publishing strategy. */
final case class PublishAsap(tx: Transaction, strategy: PublishStrategy)

sealed trait UtxoStatus
object UtxoStatus {
case object Unspent extends UtxoStatus
Expand Down
Loading

0 comments on commit 91ae610

Please sign in to comment.