Skip to content

Commit

Permalink
ZMQ actors should subscribe to a single topic (#1793)
Browse files Browse the repository at this point in the history
We use one actor per topic, but each actor previously registered to multiple
topics so we received duplicate events and consumed twice the necessary
bandwidth.
  • Loading branch information
t-bast authored May 11, 2021
1 parent a8d4e07 commit 55b50ec
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
4 changes: 2 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class Setup(datadir: File,

extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))
watcher = {
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), ZMQActor.Topics.RawBlock, Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), ZMQActor.Topics.RawTx, Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.spawn(Behaviors.supervise(ZmqWatcher(nodeParams.chainHash, blockCount, extendedBitcoinClient)).onFailure(typed.SupervisorStrategy.resume), "watcher")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.Try
/**
* Created by PM on 04/04/2017.
*/
class ZMQActor(address: String, connected: Option[Promise[Done]] = None) extends Actor with ActorLogging {
class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] = None) extends Actor with ActorLogging {

import ZMQActor._

Expand All @@ -40,8 +40,7 @@ class ZMQActor(address: String, connected: Option[Promise[Done]] = None) extends
val subscriber = ctx.createSocket(SocketType.SUB)
subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED)
subscriber.connect(address)
subscriber.subscribe("rawblock".getBytes(ZMQ.CHARSET))
subscriber.subscribe("rawtx".getBytes(ZMQ.CHARSET))
subscriber.subscribe(topic.getBytes(ZMQ.CHARSET))

val monitor = ctx.createSocket(SocketType.PAIR)
monitor.connect("inproc://events")
Expand Down Expand Up @@ -114,4 +113,9 @@ object ZMQActor {
case object ZMQDisconnected extends ZMQEvent
// @formatter:on

object Topics {
val RawBlock: String = "rawblock"
val RawTx: String = "rawtx"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
waitForBitcoindReady()
logger.info("starting zmq actors")
val (zmqBlockConnected, zmqTxConnected) = (Promise[Done](), Promise[Done]())
zmqBlock = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqBlockPort", Some(zmqBlockConnected))))
zmqTx = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqTxPort", Some(zmqTxConnected))))
zmqBlock = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqBlockPort", ZMQActor.Topics.RawBlock, Some(zmqBlockConnected))))
zmqTx = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqTxPort", ZMQActor.Topics.RawTx, Some(zmqTxConnected))))
awaitCond(zmqBlockConnected.isCompleted && zmqTxConnected.isCompleted)
super.beforeAll()
}
Expand Down

0 comments on commit 55b50ec

Please sign in to comment.