Skip to content

Commit

Permalink
Send query_short_channel_ids sequentially (#672)
Browse files Browse the repository at this point in the history
and add a event to follow the synchronization progress.
  • Loading branch information
pm47 authored Aug 15, 2018
1 parent 24db529 commit 952f94c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ case class ChannelDiscovered(ann: ChannelAnnouncement, capacity: Satoshi) extend
case class ChannelLost(shortChannelId: ShortChannelId) extends NetworkEvent

case class ChannelUpdateReceived(ann: ChannelUpdate) extends NetworkEvent

case class SyncProgress(progress: Double) extends NetworkEvent
42 changes: 34 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ case class RoutingState(channels: Iterable[ChannelAnnouncement], updates: Iterab
case class Stash(updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])
case class Rebroadcast(channels: Map[ChannelAnnouncement, Set[ActorRef]], updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])

case class Sync(missing: SortedSet[ShortChannelId], count: Int)

case class DescEdge(desc: ChannelDesc, u: ChannelUpdate) extends DefaultWeightedEdge

case class Data(nodes: Map[PublicKey, NodeAnnouncement],
Expand All @@ -71,7 +73,8 @@ case class Data(nodes: Map[PublicKey, NodeAnnouncement],
privateChannels: Map[ShortChannelId, PublicKey], // short_channel_id -> node_id
privateUpdates: Map[ChannelDesc, ChannelUpdate],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedWeightedPseudograph[PublicKey, DescEdge]
graph: DirectedWeightedPseudograph[PublicKey, DescEdge],
sync: Map[PublicKey, Sync]
)

sealed trait State
Expand All @@ -98,6 +101,8 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
setTimer(TickBroadcast.toString, TickBroadcast, nodeParams.routerBroadcastInterval, repeat = true)
setTimer(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour, repeat = true)

val SHORTID_WINDOW = 100

val db = nodeParams.networkDb

{
Expand Down Expand Up @@ -139,7 +144,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
self ! nodeAnn

log.info(s"initialization completed, ready to process messages")
startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph))
startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty))
}

when(NORMAL) {
Expand Down Expand Up @@ -467,15 +472,19 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
replies.foreach(reply => sender ! reply)
stay

case Event(PeerRoutingMessage(_, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
val (format, theirShortChannelIds, useGzip) = ChannelRangeQueries.decodeShortChannelIds(data)
val ourShortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _, d.channels, d.updates))
val missing: SortedSet[ShortChannelId] = theirShortChannelIds -- ourShortChannelIds
log.info("received reply_channel_range, we're missing {} channel announcements/updates, format={} useGzip={}", missing.size, format, useGzip)
val blocks = ChannelRangeQueries.encodeShortChannelIds(firstBlockNum, numberOfBlocks, missing, format, useGzip)
blocks.foreach(block => sender ! QueryShortChannelIds(chainHash, block.shortChannelIds))
stay
val d1 = if (missing.nonEmpty) {
val (slice, rest) = missing.splitAt(SHORTID_WINDOW)
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, format, useGzip))
d.copy(sync = d.sync + (remoteNodeId -> Sync(rest, missing.size)))
} else d
context.system.eventStream.publish(syncProgress(d1))
stay using d1

case Event(PeerRoutingMessage(_, routingMessage@QueryShortChannelIds(chainHash, data)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
Expand All @@ -495,9 +504,19 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct

case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyShortChannelIdsEnd(chainHash, complete)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
// we don't do anything with this yet
log.info("received reply_short_channel_ids_end={}", routingMessage)
stay
// have we more channels to ask this peer?
val d1 = d.sync.get(remoteNodeId) match {
case Some(sync) if sync.missing.nonEmpty =>
log.info(s"asking {} for the next slice of short_channel_ids", remoteNodeId)
val (slice, rest) = sync.missing.splitAt(SHORTID_WINDOW)
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(missing = rest)))
case _ =>
d
}
context.system.eventStream.publish(syncProgress(d1))
stay using d1
}

initialize()
Expand Down Expand Up @@ -715,6 +734,13 @@ object Router {
(validChannels, validNodes, validUpdates)
}

def syncProgress(d: Data): SyncProgress =
if (d.sync.isEmpty) {
SyncProgress(1)
} else {
SyncProgress(1 - d.sync.values.map(_.missing.size).sum * 1.0 / d.sync.values.map(_.count).sum)
}

/**
* This method is used after a payment failed, and we want to exclude some nodes that we know are failing
*/
Expand Down

0 comments on commit 952f94c

Please sign in to comment.