Skip to content

Commit

Permalink
Optimize watching for spending txs (#1699)
Browse files Browse the repository at this point in the history
Since we almost always know which transactions will spend the utxos that we are watching, we can optimize the watcher to look for those instead of starting from scratch.
  • Loading branch information
pm47 authored Feb 24, 2021
1 parent fa759f1 commit c1bf9bd
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ object WatchConfirmed {
* @param outputIndex index of the outpoint to watch.
* @param publicKeyScript electrum requires us to specify a public key script; the script of the outpoint must be provided.
* @param event channel event related to the outpoint.
* @param hints txids of potential spending transactions; most of the time we know the txs, and it allows for optimizations.
* This argument can safely be ignored by watcher implementations.
*/
final case class WatchSpent(replyTo: ActorRef, txId: ByteVector32, outputIndex: Int, publicKeyScript: ByteVector, event: BitcoinEvent) extends Watch
final case class WatchSpent(replyTo: ActorRef, txId: ByteVector32, outputIndex: Int, publicKeyScript: ByteVector, event: BitcoinEvent, hints: Set[ByteVector32]) extends Watch
object WatchSpent {
// if we have the entire transaction, we can get the publicKeyScript from the relevant output
def apply(replyTo: ActorRef, tx: Transaction, outputIndex: Int, event: BitcoinEvent): WatchSpent = WatchSpent(replyTo, tx.txid, outputIndex, tx.txOut(outputIndex).publicKeyScript, event)
def apply(replyTo: ActorRef, tx: Transaction, outputIndex: Int, event: BitcoinEvent, hints: Set[ByteVector32]): WatchSpent = WatchSpent(replyTo, tx.txid, outputIndex, tx.txOut(outputIndex).publicKeyScript, event, hints)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,27 +140,40 @@ class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: Extend
}
Keep

case WatchSpent(_, txid, outputIndex, _, _) =>
case WatchSpent(_, txid, outputIndex, _, _, hints) =>
// first let's see if the parent tx was published or not
client.getTxConfirmations(txid).collect {
case Some(_) =>
// parent tx was published, we need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(txid, outputIndex, includeMempool = true).collect {
case false =>
log.info(s"$txid:$outputIndex has already been spent, looking for the spending tx in the mempool")
client.getMempool().map { mempoolTxs =>
mempoolTxs.filter(tx => tx.txIn.exists(i => i.outPoint.txid == txid && i.outPoint.index == outputIndex)) match {
case Nil =>
log.warning(s"$txid:$outputIndex has already been spent, spending tx not in the mempool, looking in the blockchain...")
client.lookForSpendingTx(None, txid, outputIndex).map { tx =>
log.warning(s"found the spending tx of $txid:$outputIndex in the blockchain: txid=${tx.txid}")
self ! NewTransaction(tx)
// the output has been spent, let's find the spending tx
// if we know some potential spending txs, we try to fetch them directly
Future.sequence(hints.map(txid => client.getTransaction(txid).map(Some(_)).recover { case _ => None }))
.map(_
.flatten // filter out errors
.find(tx => tx.txIn.exists(i => i.outPoint.txid == txid && i.outPoint.index == outputIndex)) match {
case Some(spendingTx) =>
// there can be only one spending tx for an utxo
log.info(s"$txid:$outputIndex has already been spent by a tx provided in hints: txid=${spendingTx.txid}")
self ! NewTransaction(spendingTx)
case None =>
// no luck, we have to do it the hard way...
log.info(s"$txid:$outputIndex has already been spent, looking for the spending tx in the mempool")
client.getMempool().map { mempoolTxs =>
mempoolTxs.filter(tx => tx.txIn.exists(i => i.outPoint.txid == txid && i.outPoint.index == outputIndex)) match {
case Nil =>
log.warning(s"$txid:$outputIndex has already been spent, spending tx not in the mempool, looking in the blockchain...")
client.lookForSpendingTx(None, txid, outputIndex).map { tx =>
log.warning(s"found the spending tx of $txid:$outputIndex in the blockchain: txid=${tx.txid}")
self ! NewTransaction(tx)
}
case txs =>
log.info(s"found ${txs.size} txs spending $txid:$outputIndex in the mempool: txids=${txs.map(_.txid).mkString(",")}")
txs.foreach(tx => self ! NewTransaction(tx))
}
}
case txs =>
log.info(s"found ${txs.size} txs spending $txid:$outputIndex in the mempool: txids=${txs.map(_.txid).mkString(",")}")
txs.foreach(tx => self ! NewTransaction(tx))
}
}
})
}
}
Keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi

case watch: Watch if watches.contains(watch) => ()

case watch@WatchSpent(_, txid, outputIndex, publicKeyScript, _) =>
case watch@WatchSpent(_, txid, outputIndex, publicKeyScript, _, _) =>
val scriptHash = computeScriptHash(publicKeyScript)
log.info(s"added watch-spent on output=$txid:$outputIndex scriptHash=$scriptHash")
client ! ElectrumClient.ScriptHashSubscription(scriptHash, self)
Expand Down Expand Up @@ -121,7 +121,7 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
case ElectrumClient.GetTransactionResponse(tx, Some(item: ElectrumClient.TransactionHistoryItem)) =>
// this is for WatchSpent/WatchSpentBasic
val watchSpentTriggered = tx.txIn.map(_.outPoint).flatMap(outPoint => watches.collect {
case WatchSpent(channel, txid, pos, _, event) if txid == outPoint.txid && pos == outPoint.index.toInt =>
case WatchSpent(channel, txid, pos, _, event, _) if txid == outPoint.txid && pos == outPoint.index.toInt =>
log.info(s"output $txid:$pos spent by transaction ${tx.txid}")
channel ! WatchEventSpent(event, tx)
// NB: WatchSpent are permanent because we need to detect multiple spending of the funding tx
Expand Down
32 changes: 16 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
doPublish(c.revokedCommitPublished)
case None =>
// in all other cases we need to be ready for any type of closing
// TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
blockchain ! WatchLost(self, data.commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
watchFundingTx(data.commitments, closing.spendingTxes.map(_.txid).toSet)
closing.mutualClosePublished.foreach(doPublish)
closing.localCommitPublished.foreach(doPublish)
closing.remoteCommitPublished.foreach(doPublish)
Expand All @@ -246,9 +244,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

case normal: DATA_NORMAL =>
// TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
blockchain ! WatchLost(self, data.commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
watchFundingTx(data.commitments)
context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None))

// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
Expand Down Expand Up @@ -280,9 +276,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)

case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
// TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
blockchain ! WatchLost(self, data.commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
watchFundingTx(funding.commitments)
// we make sure that the funding tx has been published
blockchain ! GetTxWithMeta(funding.commitments.commitInput.outPoint.txid)
if (funding.waitingSinceBlock > 1500000) {
Expand All @@ -293,9 +287,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

case _ =>
// TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
blockchain ! WatchLost(self, data.commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
watchFundingTx(data.commitments)
goto(OFFLINE) using data
}

Expand Down Expand Up @@ -482,7 +474,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// NB: we don't send a ChannelSignatureSent for the first commit
log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
val fundingMinDepth = Helpers.minDepthForFunding(nodeParams, fundingAmount)
blockchain ! WatchSpent(self, commitInput.outPoint.txid, commitInput.outPoint.index.toInt, commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT) // TODO: should we wait for an acknowledgment from the watcher?
watchFundingTx(commitments)
blockchain ! WatchConfirmed(self, commitInput.outPoint.txid, commitInput.txOut.publicKeyScript, fundingMinDepth, BITCOIN_FUNDING_DEPTHOK)
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, None, initialRelayFees_opt, nodeParams.currentBlockHeight, None, Right(fundingSigned)) storing() sending fundingSigned
}
Expand Down Expand Up @@ -521,7 +513,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val now = System.currentTimeMillis.milliseconds.toSeconds
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
blockchain ! WatchSpent(self, commitInput.outPoint.txid, commitInput.outPoint.index.toInt, commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT) // TODO: should we wait for an acknowledgment from the watcher?
watchFundingTx(commitments)
blockchain ! WatchConfirmed(self, commitInput.outPoint.txid, commitInput.txOut.publicKeyScript, nodeParams.minDepthBlocks, BITCOIN_FUNDING_DEPTHOK)
log.info(s"committing txid=${fundingTx.txid}")

Expand Down Expand Up @@ -1339,7 +1331,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, tx_opt) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
tx_opt.foreach(claimTx => blockchain ! PublishAsap(claimTx))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.filter(_.outPoint.txid == tx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.filter(_.outPoint.txid == tx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT, hints = Set(claimTx.txid)))
rev1
}
stay using d.copy(revokedCommitPublished = revokedCommitPublished1) storing()
Expand Down Expand Up @@ -1944,6 +1936,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
stay
}

def watchFundingTx(commitments: Commitments, additionalKnownSpendingTxs: Set[ByteVector32] = Set.empty): Unit = {
// TODO: should we wait for an acknowledgment from the watcher?
val knownSpendingTxs = Set(commitments.localCommit.publishableTxs.commitTx.tx.txid, commitments.remoteCommit.txid) ++ commitments.remoteNextCommitInfo.left.toSeq.map(_.nextRemoteCommit.txid).toSet ++ additionalKnownSpendingTxs
blockchain ! WatchSpent(self, commitments.commitInput.outPoint.txid, commitments.commitInput.outPoint.index.toInt, commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT, knownSpendingTxs)
// TODO: implement this? (not needed if we use a reasonable min_depth)
//blockchain ! WatchLost(self, commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
}

/**
* When we are funder, we use this function to detect when our funding tx has been double-spent (by another transaction
* that we made for some reason). If the funding tx has been double spent we can forget about the channel.
Expand Down Expand Up @@ -2162,7 +2162,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
*/
def watchSpentIfNeeded(parentTx: Transaction, txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.filter(_.outPoint.txid == parentTx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.filter(_.outPoint.txid == parentTx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT, hints = Set(tx.txid)))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
val outputIndex = 42
val utxo = OutPoint(txid.reverse, outputIndex)

val w1 = WatchSpent(null, txid, outputIndex, randomBytes32, BITCOIN_FUNDING_SPENT)
val w2 = WatchSpent(null, txid, outputIndex, randomBytes32, BITCOIN_FUNDING_SPENT)
val w1 = WatchSpent(null, txid, outputIndex, randomBytes32, BITCOIN_FUNDING_SPENT, hints = Set.empty)
val w2 = WatchSpent(null, txid, outputIndex, randomBytes32, BITCOIN_FUNDING_SPENT, hints = Set.empty)
val w3 = WatchSpentBasic(null, txid, outputIndex, randomBytes32, BITCOIN_FUNDING_SPENT)
val w4 = WatchSpentBasic(null, randomBytes32, 5, randomBytes32, BITCOIN_FUNDING_SPENT)
val w5 = WatchConfirmed(null, txid, randomBytes32, 3, BITCOIN_FUNDING_SPENT)
Expand Down Expand Up @@ -136,7 +136,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind

val listener = TestProbe()
probe.send(watcher, WatchSpentBasic(listener.ref, tx, outputIndex, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(listener.ref, tx, outputIndex, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(listener.ref, tx, outputIndex, BITCOIN_FUNDING_SPENT, hints = Set.empty))
listener.expectNoMsg(1 second)
probe.send(bitcoincli, BitcoinReq("sendrawtransaction", tx1.toString()))
probe.expectMsgType[JValue]
Expand All @@ -156,11 +156,20 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
listener.expectNoMsg(1 second)
generateBlocks(1)
probe.send(watcher, WatchSpentBasic(listener.ref, tx1, 0, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(listener.ref, tx1, 0, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(listener.ref, tx1, 0, BITCOIN_FUNDING_SPENT, hints = Set.empty))
listener.expectMsgAllOf(
WatchEventSpentBasic(BITCOIN_FUNDING_SPENT),
WatchEventSpent(BITCOIN_FUNDING_SPENT, tx2)
)

// We use hints and see if we can find tx2
probe.send(watcher, WatchSpent(listener.ref, tx1, 0, BITCOIN_FUNDING_SPENT, hints = Set(tx2.txid)))
listener.expectMsg(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx2))

// We should still find tx2 if the provided hint is wrong
probe.send(watcher, WatchSpent(listener.ref, tx1, 0, BITCOIN_FUNDING_SPENT, hints = Set(randomBytes32)))
listener.expectMsg(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx2))

system.stop(watcher)
}

Expand All @@ -183,7 +192,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
val tx2 = createSpendP2WPKH(tx1, priv, priv.publicKey, 10000 sat, 1, 0)

// setup watches before we publish transactions
probe.send(watcher, WatchSpent(probe.ref, tx1, outputIndex, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(probe.ref, tx1, outputIndex, BITCOIN_FUNDING_SPENT, hints = Set.empty))
probe.send(watcher, WatchConfirmed(probe.ref, tx1, 3, BITCOIN_FUNDING_SPENT))
client.publishTransaction(tx1).pipeTo(probe.ref)
probe.expectMsg(tx1.txid)
Expand Down Expand Up @@ -242,7 +251,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
// tx3 has both relative and absolute delays
val tx3 = createSpendP2WPKH(tx2, priv, priv.publicKey, 10000 sat, sequence = 1, lockTime = blockCount.get + 5)
probe.send(watcher, WatchConfirmed(probe.ref, tx2, 1, BITCOIN_FUNDING_DEPTHOK))
probe.send(watcher, WatchSpent(probe.ref, tx2, 0, BITCOIN_FUNDING_SPENT))
probe.send(watcher, WatchSpent(probe.ref, tx2, 0, BITCOIN_FUNDING_SPENT, hints = Set.empty))
probe.send(watcher, PublishAsap(tx3))
generateBlocks(1)
assert(probe.expectMsgType[WatchEventConfirmed].tx === tx2)
Expand Down
Loading

0 comments on commit c1bf9bd

Please sign in to comment.