Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send query_short_channel_ids sequentially #672

Merged
merged 1 commit into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ case class ChannelDiscovered(ann: ChannelAnnouncement, capacity: Satoshi) extend
case class ChannelLost(shortChannelId: ShortChannelId) extends NetworkEvent

case class ChannelUpdateReceived(ann: ChannelUpdate) extends NetworkEvent

case class SyncProgress(progress: Double) extends NetworkEvent
42 changes: 34 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ case class RoutingState(channels: Iterable[ChannelAnnouncement], updates: Iterab
case class Stash(updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])
case class Rebroadcast(channels: Map[ChannelAnnouncement, Set[ActorRef]], updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])

case class Sync(missing: SortedSet[ShortChannelId], count: Int)

case class DescEdge(desc: ChannelDesc, u: ChannelUpdate) extends DefaultWeightedEdge

case class Data(nodes: Map[PublicKey, NodeAnnouncement],
Expand All @@ -71,7 +73,8 @@ case class Data(nodes: Map[PublicKey, NodeAnnouncement],
privateChannels: Map[ShortChannelId, PublicKey], // short_channel_id -> node_id
privateUpdates: Map[ChannelDesc, ChannelUpdate],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedWeightedPseudograph[PublicKey, DescEdge]
graph: DirectedWeightedPseudograph[PublicKey, DescEdge],
sync: Map[PublicKey, Sync]
)

sealed trait State
Expand All @@ -98,6 +101,8 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
setTimer(TickBroadcast.toString, TickBroadcast, nodeParams.routerBroadcastInterval, repeat = true)
setTimer(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour, repeat = true)

val SHORTID_WINDOW = 100

val db = nodeParams.networkDb

{
Expand Down Expand Up @@ -139,7 +144,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
self ! nodeAnn

log.info(s"initialization completed, ready to process messages")
startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph))
startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty))
}

when(NORMAL) {
Expand Down Expand Up @@ -467,15 +472,19 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
replies.foreach(reply => sender ! reply)
stay

case Event(PeerRoutingMessage(_, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
val (format, theirShortChannelIds, useGzip) = ChannelRangeQueries.decodeShortChannelIds(data)
val ourShortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _, d.channels, d.updates))
val missing: SortedSet[ShortChannelId] = theirShortChannelIds -- ourShortChannelIds
log.info("received reply_channel_range, we're missing {} channel announcements/updates, format={} useGzip={}", missing.size, format, useGzip)
val blocks = ChannelRangeQueries.encodeShortChannelIds(firstBlockNum, numberOfBlocks, missing, format, useGzip)
blocks.foreach(block => sender ! QueryShortChannelIds(chainHash, block.shortChannelIds))
stay
val d1 = if (missing.nonEmpty) {
val (slice, rest) = missing.splitAt(SHORTID_WINDOW)
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, format, useGzip))
d.copy(sync = d.sync + (remoteNodeId -> Sync(rest, missing.size)))
} else d
context.system.eventStream.publish(syncProgress(d1))
stay using d1

case Event(PeerRoutingMessage(_, routingMessage@QueryShortChannelIds(chainHash, data)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
Expand All @@ -495,9 +504,19 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct

case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyShortChannelIdsEnd(chainHash, complete)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
// we don't do anything with this yet
log.info("received reply_short_channel_ids_end={}", routingMessage)
stay
// have we more channels to ask this peer?
val d1 = d.sync.get(remoteNodeId) match {
case Some(sync) if sync.missing.nonEmpty =>
log.info(s"asking {} for the next slice of short_channel_ids", remoteNodeId)
val (slice, rest) = sync.missing.splitAt(SHORTID_WINDOW)
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(missing = rest)))
case _ =>
d
}
context.system.eventStream.publish(syncProgress(d1))
stay using d1
}

initialize()
Expand Down Expand Up @@ -715,6 +734,13 @@ object Router {
(validChannels, validNodes, validUpdates)
}

def syncProgress(d: Data): SyncProgress =
if (d.sync.isEmpty) {
SyncProgress(1)
} else {
SyncProgress(1 - d.sync.values.map(_.missing.size).sum * 1.0 / d.sync.values.map(_.count).sum)
}

/**
* This method is used after a payment failed, and we want to exclude some nodes that we know are failing
*/
Expand Down