Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously clean up obsolete HTLC info from DB #2705

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ eclair {
migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary)
compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary)
}
// During normal channel operation, we need to store information about past HTLCs to be able to punish our peer if
// they publish a revoked commitment. Once a channel closes or a splice transaction confirms, we can clean up past
// data (which reduces the size of our DB). Since there may be millions of rows to delete and we don't want to slow
// down the node, we delete those rows in batches at regular intervals.
revoked-htlc-info-cleaner {
// Number of rows to delete per batch: a higher value will clean up the DB faster, but may have a higher impact on performance.
batch-size = 50000
// Frequency at which batches of rows are deleted: a lower value will clean up the DB faster, but may have a higher impact on performance.
interval = 15 minutes
}
}

file-backup {
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogThreshold: Int,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration]) {
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -604,7 +605,11 @@ object NodeParams extends Logging {
timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS),
maxAttempts = config.getInt("onion-messages.max-attempts"),
),
purgeInvoicesInterval = purgeInvoicesInterval
purgeInvoicesInterval = purgeInvoicesInterval,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ sealed trait LocalFundingStatus {
def signedTx_opt: Option[Transaction]
/** We store local signatures for the purpose of retransmitting if the funding/splicing flow is interrupted. */
def localSigs_opt: Option[TxSignatures]
/**
* Commitment index of the first remote commitment we signed that spends this funding transaction.
* Once the funding transaction confirms, our peer won't be able to publish revoked commitments with lower indices.
*/
def firstCommitIndex_opt: Option[Long]
}
object LocalFundingStatus {
sealed trait NotLocked extends LocalFundingStatus
Expand All @@ -431,15 +436,16 @@ object LocalFundingStatus {
*/
case class SingleFundedUnconfirmedFundingTx(signedTx_opt: Option[Transaction]) extends UnconfirmedFundingTx with NotLocked {
override val localSigs_opt: Option[TxSignatures] = None
override val firstCommitIndex_opt: Option[Long] = None
}
case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams) extends UnconfirmedFundingTx with NotLocked {
case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams, firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with NotLocked {
override val signedTx_opt: Option[Transaction] = sharedTx.signedTx_opt
override val localSigs_opt: Option[TxSignatures] = Some(sharedTx.localSigs)
}
case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends UnconfirmedFundingTx with Locked {
case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with Locked {
override val signedTx_opt: Option[Transaction] = Some(tx)
}
case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends LocalFundingStatus with Locked {
case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends LocalFundingStatus with Locked {
override val signedTx_opt: Option[Transaction] = Some(tx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,11 @@ case class Commitments(params: ChannelParams,
all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.localSigs_opt)
}

/** Return the index of the first remote commitment we signed spending the given funding transaction. */
def firstCommitIndex(fundingTxId: TxId): Option[Long] = {
all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.firstCommitIndex_opt)
}

/**
* Update the local/remote funding status
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with

case Event(msg: TxSignatures, d: DATA_NORMAL) =>
d.commitments.latest.localFundingStatus match {
case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) if fundingTx.txId == msg.txId =>
case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) if fundingTx.txId == msg.txId =>
// we already sent our tx_signatures
InteractiveTxSigningSession.addRemoteSigs(keyManager, d.commitments.params, dfu.fundingParams, fundingTx, msg) match {
case Left(cause) =>
Expand Down Expand Up @@ -1138,7 +1138,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
}

case Event(w: WatchPublishedTriggered, d: DATA_NORMAL) =>
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
Expand Down Expand Up @@ -1857,7 +1857,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case d: DATA_NORMAL => d.spliceStatus match {
case SpliceStatus.SpliceWaitingForSigs(status) => Set(ChannelReestablishTlv.NextFundingTlv(status.fundingTx.txId))
case _ => d.commitments.latest.localFundingStatus match {
case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId))
case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId))
case _ => Set.empty
}
}
Expand Down Expand Up @@ -2241,11 +2241,11 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// slightly before us. In that case, the WatchConfirmed may trigger first, and it would be inefficient to let the
// WatchPublished override our funding status: it will make us set a new WatchConfirmed that will instantly
// trigger and rewrite the funding status again.
val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _) => tx }.exists(_.txid == w.tx.txid)
val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _, _) => tx }.exists(_.txid == w.tx.txid)
if (alreadyConfirmed) {
stay()
} else {
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info(s"zero-conf funding txid=${w.tx.txid} has been published")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {

case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
stay() using d.copy(deferred = Some(remoteChannelReady)) // no need to store, they will re-send if we get disconnected

case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) =>
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None)
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None, None)
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees
import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL}
import fr.acinq.eclair.db.RevokedHtlcInfoCleaner
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down Expand Up @@ -81,8 +83,14 @@ trait CommonFundingHandlers extends CommonHandlers {
}
case _ => () // in the dual-funding case, we have already verified the funding tx
}
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx))
// When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the
// previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can
// clean-up the htlc data that we were storing for the matching penalty transactions.
fundingStatus.firstCommitIndex_opt.foreach {
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, commitIndex))
}
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
// First of all, we watch the funding tx that is now confirmed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ object InteractiveTxSigningSession {
val localPerCommitmentPoint = nodeParams.channelKeyManager.commitmentPoint(channelKeyPath, localCommitIndex)
LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit =>
if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) {
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index))
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
SendingSigs(fundingStatus, commitment, fundingTx.localSigs)
} else {
Expand All @@ -988,7 +988,7 @@ object InteractiveTxSigningSession {
Left(f)
case Right(fullySignedTx) =>
log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index))
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond}
import fr.acinq.eclair.channel.PersistentChannelData
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.{CltvExpiry, Paginated}

trait ChannelsDb {

Expand All @@ -30,8 +30,15 @@ trait ChannelsDb {

def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit

/** Mark a channel as closed, but keep it in the DB. */
def removeChannel(channelId: ByteVector32): Unit

/** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */
def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit

/** Remove up to batchSize obsolete revoked HTLC information. */
def removeHtlcInfos(batchSize: Int): Unit

def listLocalChannels(): Seq[PersistentChannelData]

def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package fr.acinq.eclair.db

import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{Actor, DiagnosticActorLogging, Props}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand All @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams}
*/
class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging {

val auditDb: AuditDb = nodeParams.db.audit
val channelsDb: ChannelsDb = nodeParams.db.channels
private val auditDb: AuditDb = nodeParams.db.audit
private val channelsDb: ChannelsDb = nodeParams.db.channels

context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner")

context.system.eventStream.subscribe(self, classOf[PaymentSent])
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
Expand Down
10 changes: 10 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.forgetHtlcInfos(channelId, beforeCommitIndex))
primary.forgetHtlcInfos(channelId, beforeCommitIndex)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
runAsync(secondary.removeHtlcInfos(batchSize))
primary.removeHtlcInfos(batchSize)
}

override def listLocalChannels(): Seq[PersistentChannelData] = {
runAsync(secondary.listLocalChannels())
primary.listLocalChannels()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.scalacompat.ByteVector32

import scala.concurrent.duration.FiniteDuration

/**
* When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was
* stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there
* is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data
* in small batches, at regular intervals.
*/
object RevokedHtlcInfoCleaner {

// @formatter:off
sealed trait Command
case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command
private case object DeleteBatch extends Command
// @formatter:on

case class Config(batchSize: Int, interval: FiniteDuration)

def apply(db: ChannelsDb, config: Config): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.self)
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case ForgetHtlcInfos(channelId, beforeCommitIndex) =>
db.forgetHtlcInfos(channelId, beforeCommitIndex)
Behaviors.same
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Behaviors.same
}
}
}
}

}
Loading