Skip to content

Commit

Permalink
bypass blockchain validation for local channels
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed May 18, 2022
1 parent 05274a1 commit e7f7ef0
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val
}
emitEvent_opt.foreach {
case EmitLocalChannelUpdate(reason, d, sendToPeer) =>
log.info(s"emitting channel update: reason={} enabled={} sendToPeer={} realScid=${d.realShortChannelId_opt} channel_update={}", reason, d.channelUpdate.channelFlags.isEnabled, sendToPeer, d.channelUpdate)
log.info(s"emitting channel update: reason=$reason enabled=${d.channelUpdate.channelFlags.isEnabled} sendToPeer=${sendToPeer} realScid=${d.realShortChannelId_opt} channel_update={} channel_announcement={}", d.channelUpdate, d.channelAnnouncement.map(_ => "yes").getOrElse("no"))
context.system.eventStream.publish(LocalChannelUpdate(self, d.channelId, d.realShortChannelId_opt, d.localAlias, d.commitments.remoteParams.nodeId, d.channelAnnouncement, d.channelUpdate, d.commitments))
if (sendToPeer) {
send(d.channelUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, Set(RemoteGossip(peerConnection, remoteNodeId)))))

case Event(lcu: LocalChannelUpdate, d: Data) => // from local channel
stay() using Validation.handleLocalChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, watcher, lcu)
stay() using Validation.handleLocalChannelUpdate(d, nodeParams, lcu)

case Event(lcd: LocalChannelDown, d: Data) =>
stay() using Validation.handleLocalChannelDown(d, nodeParams.nodeId, lcd)
Expand Down
98 changes: 58 additions & 40 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{UtxoStatus, ValidateRequest, ValidateResult, WatchExternalChannelSpent}
import fr.acinq.eclair.channel.{AvailableBalanceChanged, LocalChannelDown, LocalChannelUpdate, ShortChannelIdAssigned}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Graph.GraphStructure.GraphEdge
Expand Down Expand Up @@ -115,21 +115,13 @@ object Validation {
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
db.addChannel(c, tx.txid, capacity)
// in case we just validated our first local channel, we announce the local node
if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) {
log.info("first local channel validated, announcing local node")
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
ctx.self ! nodeAnn
}
// maybe this previously was a local unannounced channel
val channelId = toLongId(tx.txid.reverse, outputIndex)
val privateChannel_opt = d0.privateChannels.get(channelId)
Some(PublicChannel(c,
tx.txid,
capacity,
update_1_opt = privateChannel_opt.flatMap(_.update_1_opt),
update_2_opt = privateChannel_opt.flatMap(_.update_2_opt),
meta_opt = privateChannel_opt.map(_.meta)))
update_1_opt = None,
update_2_opt = None,
meta_opt = None))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
Expand All @@ -153,17 +145,9 @@ object Validation {
val awaiting1 = d0.awaiting - c

publicChannel_opt match {
case Some(pc) =>
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
privateChannels = d0.privateChannels - pc.channelId, // we remove fake announcements that we may have made before
rebroadcast = d0.rebroadcast.copy(
channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers
updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet).map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them
), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
case Some(publicChannel) =>
val d1 = addPublicChannel(d0, nodeParams, publicChannel).copy(stash = stash1, awaiting = awaiting1)
// we process channel updates and node announcements if validation succeeded
val d2 = reprocessUpdates.foldLeft(d1) {
case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true)
}
Expand All @@ -172,13 +156,37 @@ object Validation {
}
d3
case None =>
// if validation failed we can fast-discard related announcements
reprocessUpdates.foreach { case (u, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoRelatedChannel(u)) } }
reprocessNodes.foreach { case (n, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoKnownChannel(n)) } }
d0.copy(stash = stash1, awaiting = awaiting1)
}
}
}

private def addPublicChannel(d: Data, nodeParams: NodeParams, pc: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
log.debug("adding public channel channelId={} realScid={}", pc.channelId, pc.shortChannelId)
// in case this was our first local channel, we make a node announcement
if (!d.nodes.contains(nodeParams.nodeId)) {
log.info("first local channel validated, announcing local node")
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
ctx.self ! nodeAnn
}
d.copy(
channels = d.channels + (pc.shortChannelId -> pc),
// we remove fake announcements that we may have made before
privateChannels = d.privateChannels - pc.channelId,
// we also add the newly validated channels to the rebroadcast queue
rebroadcast = d.rebroadcast.copy(
// we rebroadcast the channel to our peers
channels = d.rebroadcast.channels + (pc.ann -> d.awaiting.getOrElse(pc.ann, if (pc.nodeId1 == nodeParams.nodeId || pc.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
// those updates are only defined if the channel is was previously an unannounced local channel, we broadcast them
updates = d.rebroadcast.updates ++ (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet).map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them
)
)
}

def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels(shortChannelId).ann
Expand Down Expand Up @@ -440,30 +448,40 @@ object Validation {
}
}

def handleLocalChannelUpdate(d: Data, db: NetworkDb, routerConf: RouterConf, watcher: typed.ActorRef[ZmqWatcher.Command], lcu: LocalChannelUpdate)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleLocalChannelUpdate(d: Data, nodeParams: NodeParams, lcu: LocalChannelUpdate)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
import nodeParams.db.{network => db}
d.resolve(lcu.channelId, lcu.realShortChannelId_opt) match {
case Some(_: PublicChannel) =>
// channel is already known, we can process the channel_update
handleChannelUpdate(d, db, routerConf, Left(lcu))
case _ =>
// known private channel, or unknown channel
// this a known public channel, we can process the channel_update
handleChannelUpdate(d, db, nodeParams.routerConf, Left(lcu))
case Some(privateChannel: PrivateChannel) =>
lcu.channelAnnouncement_opt match {
case Some(c) if d.awaiting.contains(c) =>
// this is a public channel currently being verified, we can process the channel_update right away (it will be stashed)
handleChannelUpdate(d, db, routerConf, Left(lcu))
case Some(c) =>
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(ctx.self, c)
val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip)))
case Some(ann) =>
// channel is graduating from private to public
// since this is a local channel, we can trust the announcement, no need to go through the full
// verification process and make calls to bitcoin core
val commitments = lcu.commitments.asInstanceOf[Commitments] // TODO: ugly! a public channel has to have a real commitment
val publicChannel = PublicChannel(
ann = ann,
fundingTxid = commitments.commitInput.outPoint.txid,
capacity = commitments.capacity,
update_1_opt = privateChannel.update_1_opt,
update_2_opt = privateChannel.update_2_opt,
meta_opt = Some(privateChannel.meta)
)
val d1 = addPublicChannel(d, nodeParams, publicChannel)
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
handleChannelUpdate(d1, db, routerConf, Left(lcu))
db.removeFromPruned(ann.shortChannelId)
handleChannelUpdate(d1, db, nodeParams.routerConf, Left(lcu))
case None =>
// should never happen, we log a warning and handle the update, it will be rejected since there is no related channel
log.warning("unrecognized local chanel update for channelId={} localAlias={}", lcu.channelId, lcu.localAlias)
handleChannelUpdate(d, db, routerConf, Left(lcu))
// this is a known unannounced channel, we can process the channel_update
handleChannelUpdate(d, db, nodeParams.routerConf, Left(lcu))
}
case None =>
// should never happen, we log a warning and handle the update, it will be rejected since there is no related channel
log.warning("unrecognized local chanel update for channelId={} localAlias={}", lcu.channelId, lcu.localAlias)
handleChannelUpdate(d, db, nodeParams.routerConf, Left(lcu))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
// both channel_updates are known
pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
}
val privateChannel = router.stateData.privateChannels.values.head

// funding tx reaches 6 blocks, announcements are exchanged
channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null)
Expand All @@ -97,10 +96,6 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
channels.bob2alice.expectMsgType[AnnouncementSignatures]
channels.bob2alice.forward(channels.alice)

// router gets notified and attempts to validate the local channel
val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest]
vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent)))

awaitCond {
router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1
}
Expand All @@ -114,7 +109,7 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
// manual rebroadcast
router ! Router.TickBroadcast
rebroadcastListener.expectMsg(Router.Rebroadcast(
channels = Map(vr.ann -> Set[GossipOrigin](LocalGossip)),
channels = Map(channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelAnnouncement.get -> Set[GossipOrigin](LocalGossip)),
updates = Map(aliceChannelUpdate -> Set[GossipOrigin](LocalGossip), bobChannelUpdate -> Set.empty[GossipOrigin]), // broadcast the channel_updates (they were previously unannounced)
nodes = Map(router.underlyingActor.stateData.nodes.values.head -> Set[GossipOrigin](LocalGossip)), // new node_announcement
))
Expand Down

0 comments on commit e7f7ef0

Please sign in to comment.