Skip to content

Commit

Permalink
tag db metrics by backend
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed Apr 8, 2021
1 parent 4b658ec commit 3aec5fe
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 104 deletions.
19 changes: 13 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@ package fr.acinq.eclair.db

import fr.acinq.eclair.KamonExt
import kamon.Kamon
import kamon.metric.Metric

object Monitoring {

object Metrics {
val FileBackupCompleted = Kamon.counter("db.file-backup.completed")
val FileBackupDuration = Kamon.timer("db.file-backup.duration")
val FileBackupCompleted: Metric.Counter = Kamon.counter("db.file-backup.completed")
val FileBackupDuration: Metric.Timer = Kamon.timer("db.file-backup.duration")

val DbOperation = Kamon.counter("db.operation.execute")
val DbOperationDuration = Kamon.timer("db.operation.duration")
private val DbOperation: Metric.Counter = Kamon.counter("db.operation.execute")
private val DbOperationDuration: Metric.Timer = Kamon.timer("db.operation.duration")

def withMetrics[T](name: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name)) {
DbOperation.withTag(Tags.DbOperation, name).increment()
def withMetrics[T](name: String, backend: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend)) {
DbOperation.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend).increment()
operation
}
}

object Tags {
val DbOperation = "operation"
val DbBackend = "backend"

object DbBackends {
val Sqlite = "sqlite"
val Postgres = "postgres"
}
}

}
13 changes: 7 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
Expand Down Expand Up @@ -66,7 +67,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
Expand All @@ -81,7 +82,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent") {
override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
Expand All @@ -102,7 +103,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received") {
override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
Expand All @@ -117,7 +118,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed") {
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) {
inTransaction { pg =>
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
Expand All @@ -141,7 +142,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee") {
override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
Expand All @@ -155,7 +156,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error") {
override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
val (errorName, errorMessage) = e.error match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -63,7 +64,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel") {
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
withLock { pg =>
val data = stateDataCodec.encode(state).require.toByteArray
using(pg.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update =>
Expand Down Expand Up @@ -105,7 +106,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
Expand All @@ -124,7 +125,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels") {
override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels", DbBackends.Postgres) {
withLock { pg =>
using(pg.createStatement) { statement =>
val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=FALSE")
Expand All @@ -133,7 +134,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info") {
override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, channelId.toHex)
Expand All @@ -145,7 +146,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos") {
override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT payment_hash, cltv_expiry FROM htlc_infos WHERE channel_id=? AND commitment_number=?")) { statement =>
statement.setString(1, channelId.toHex)
Expand Down
27 changes: 14 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package fr.acinq.eclair.db.pg
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Router.PublicChannel
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
import grizzled.slf4j.Logging
import javax.sql.DataSource

import javax.sql.DataSource
import scala.collection.immutable.SortedMap

class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
Expand All @@ -48,7 +49,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node") {
override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO nodes VALUES (?, ?) ON CONFLICT DO NOTHING")) { statement =>
statement.setString(1, n.nodeId.value.toHex)
Expand All @@ -58,7 +59,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node") {
override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("UPDATE nodes SET data=? WHERE node_id=?")) { statement =>
statement.setBytes(1, nodeAnnouncementCodec.encode(n).require.toByteArray)
Expand All @@ -68,7 +69,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node") {
override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT data FROM nodes WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
Expand All @@ -78,7 +79,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node") {
override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("DELETE FROM nodes WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
Expand All @@ -87,7 +88,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes") {
override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT data FROM nodes")
Expand All @@ -96,7 +97,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel") {
override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channels VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
statement.setLong(1, c.shortChannelId.toLong)
Expand All @@ -108,7 +109,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel") {
override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel", DbBackends.Postgres) {
val column = if (u.isNode1) "channel_update_1" else "channel_update_2"
inTransaction { pg =>
using(pg.prepareStatement(s"UPDATE channels SET $column=? WHERE short_channel_id=?")) { statement =>
Expand All @@ -119,7 +120,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels") {
override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT channel_announcement, txid, capacity_sat, channel_update_1, channel_update_2 FROM channels")
Expand All @@ -137,7 +138,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels") {
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
shortChannelIds
Expand All @@ -150,7 +151,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned") {
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO pruned VALUES (?) ON CONFLICT DO NOTHING")) { statement =>
shortChannelIds.foreach(shortChannelId => {
Expand All @@ -162,15 +163,15 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned") {
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
}
}
}

override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned") {
override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
Expand Down
Loading

0 comments on commit 3aec5fe

Please sign in to comment.