diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala index cb7dfcf9bb..10c1d50a2f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala @@ -18,24 +18,31 @@ package fr.acinq.eclair.db import fr.acinq.eclair.KamonExt import kamon.Kamon +import kamon.metric.Metric object Monitoring { object Metrics { - val FileBackupCompleted = Kamon.counter("db.file-backup.completed") - val FileBackupDuration = Kamon.timer("db.file-backup.duration") + val FileBackupCompleted: Metric.Counter = Kamon.counter("db.file-backup.completed") + val FileBackupDuration: Metric.Timer = Kamon.timer("db.file-backup.duration") - val DbOperation = Kamon.counter("db.operation.execute") - val DbOperationDuration = Kamon.timer("db.operation.duration") + private val DbOperation: Metric.Counter = Kamon.counter("db.operation.execute") + private val DbOperationDuration: Metric.Timer = Kamon.timer("db.operation.duration") - def withMetrics[T](name: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name)) { - DbOperation.withTag(Tags.DbOperation, name).increment() + def withMetrics[T](name: String, backend: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend)) { + DbOperation.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend).increment() operation } } object Tags { val DbOperation = "operation" + val DbBackend = "backend" + + object DbBackends { + val Sqlite = "sqlite" + val Postgres = "postgres" + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 11d5bce9c7..70947b9ec1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} @@ -66,7 +67,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") { + override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, e.channelId.toHex) @@ -81,7 +82,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent") { + override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => e.parts.foreach(p => { @@ -102,7 +103,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received") { + override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement => e.parts.foreach(p => { @@ -117,7 +118,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed") { + override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) { inTransaction { pg => val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => @@ -141,7 +142,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee") { + override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, e.channelId.toHex) @@ -155,7 +156,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } - override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error") { + override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement => val (errorName, errorMessage) = e.error match { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index aa28ea16aa..03be13cd41 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec import grizzled.slf4j.Logging @@ -63,7 +64,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel") { + override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) { withLock { pg => val data = stateDataCodec.encode(state).require.toByteArray using(pg.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update => @@ -105,7 +106,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) } - override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") { + override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement => statement.setString(1, channelId.toHex) @@ -124,7 +125,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels") { + override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=FALSE") @@ -133,7 +134,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info") { + override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement => statement.setString(1, channelId.toHex) @@ -145,7 +146,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos") { + override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT payment_hash, cltv_expiry FROM htlc_infos WHERE channel_id=? AND commitment_number=?")) { statement => statement.setString(1, channelId.toHex) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala index c2656e74ff..070c4eb88b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala @@ -19,13 +19,14 @@ package fr.acinq.eclair.db.pg import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.ShortChannelId import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.router.Router.PublicChannel import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec} import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement} import grizzled.slf4j.Logging -import javax.sql.DataSource +import javax.sql.DataSource import scala.collection.immutable.SortedMap class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { @@ -48,7 +49,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node") { + override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO nodes VALUES (?, ?) ON CONFLICT DO NOTHING")) { statement => statement.setString(1, n.nodeId.value.toHex) @@ -58,7 +59,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node") { + override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("UPDATE nodes SET data=? WHERE node_id=?")) { statement => statement.setBytes(1, nodeAnnouncementCodec.encode(n).require.toByteArray) @@ -68,7 +69,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node") { + override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("SELECT data FROM nodes WHERE node_id=?")) { statement => statement.setString(1, nodeId.value.toHex) @@ -78,7 +79,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node") { + override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("DELETE FROM nodes WHERE node_id=?")) { statement => statement.setString(1, nodeId.value.toHex) @@ -87,7 +88,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes") { + override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes", DbBackends.Postgres) { inTransaction { pg => using(pg.createStatement()) { statement => val rs = statement.executeQuery("SELECT data FROM nodes") @@ -96,7 +97,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel") { + override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO channels VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement => statement.setLong(1, c.shortChannelId.toLong) @@ -108,7 +109,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel") { + override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel", DbBackends.Postgres) { val column = if (u.isNode1) "channel_update_1" else "channel_update_2" inTransaction { pg => using(pg.prepareStatement(s"UPDATE channels SET $column=? WHERE short_channel_id=?")) { statement => @@ -119,7 +120,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels") { + override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels", DbBackends.Postgres) { inTransaction { pg => using(pg.createStatement()) { statement => val rs = statement.executeQuery("SELECT channel_announcement, txid, capacity_sat, channel_update_1, channel_update_2 FROM channels") @@ -137,7 +138,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels") { + override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) { inTransaction { pg => using(pg.createStatement) { statement => shortChannelIds @@ -150,7 +151,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned") { + override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO pruned VALUES (?) ON CONFLICT DO NOTHING")) { statement => shortChannelIds.foreach(shortChannelId => { @@ -162,7 +163,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned") { + override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) { inTransaction { pg => using(pg.createStatement) { statement => statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") @@ -170,7 +171,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } } - override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned") { + override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement => statement.setLong(1, shortChannelId.toLong) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala index e5f134fff6..5fbb6a0c29 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala @@ -20,6 +20,7 @@ import fr.acinq.bitcoin.ByteVector32 import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.MilliSatoshi import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.payment.{PaymentFailed, PaymentRequest, PaymentSent} @@ -68,7 +69,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def addOutgoingPayment(sent: OutgoingPayment): Unit = withMetrics("payments/add-outgoing") { + override def addOutgoingPayment(sent: OutgoingPayment): Unit = withMetrics("payments/add-outgoing", DbBackends.Postgres) { require(sent.status == OutgoingPaymentStatus.Pending, s"outgoing payment isn't pending (${sent.status.getClass.getSimpleName})") withLock { pg => using(pg.prepareStatement("INSERT INTO sent_payments (id, parent_id, external_id, payment_hash, payment_type, amount_msat, recipient_amount_msat, recipient_node_id, created_at, payment_request) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => @@ -87,7 +88,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = withMetrics("payments/update-outgoing-sent") { + override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = withMetrics("payments/update-outgoing-sent", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("UPDATE sent_payments SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement => paymentResult.parts.foreach(p => { @@ -103,7 +104,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed") { + override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("UPDATE sent_payments SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement => statement.setLong(1, paymentResult.timestamp) @@ -162,7 +163,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = withMetrics("payments/get-outgoing") { + override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = withMetrics("payments/get-outgoing", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM sent_payments WHERE id = ?")) { statement => statement.setString(1, id.toString) @@ -176,7 +177,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listOutgoingPayments(parentId: UUID): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-parent-id") { + override def listOutgoingPayments(parentId: UUID): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-parent-id", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM sent_payments WHERE parent_id = ? ORDER BY created_at")) { statement => statement.setString(1, parentId.toString) @@ -190,7 +191,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-payment-hash") { + override def listOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-payment-hash", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM sent_payments WHERE payment_hash = ? ORDER BY created_at")) { statement => statement.setString(1, paymentHash.toHex) @@ -204,7 +205,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp") { + override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM sent_payments WHERE created_at >= ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) @@ -219,7 +220,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def addIncomingPayment(pr: PaymentRequest, preimage: ByteVector32, paymentType: String): Unit = withMetrics("payments/add-incoming") { + override def addIncomingPayment(pr: PaymentRequest, preimage: ByteVector32, paymentType: String): Unit = withMetrics("payments/add-incoming", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("INSERT INTO received_payments (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at) VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, pr.paymentHash.toHex) @@ -233,7 +234,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: Long): Unit = withMetrics("payments/receive-incoming") { + override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: Long): Unit = withMetrics("payments/receive-incoming", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("UPDATE received_payments SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update => update.setLong(1, amount.toLong) @@ -265,7 +266,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = withMetrics("payments/get-incoming") { + override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = withMetrics("payments/get-incoming", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM received_payments WHERE payment_hash = ?")) { statement => statement.setString(1, paymentHash.toHex) @@ -279,7 +280,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming") { + override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM received_payments WHERE created_at > ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) @@ -294,7 +295,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received") { + override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM received_payments WHERE received_msat > 0 AND created_at > ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) @@ -309,7 +310,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending") { + override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM received_payments WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at > ? ORDER BY created_at")) { statement => statement.setLong(1, from) @@ -325,7 +326,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired") { + override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT * FROM received_payments WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) @@ -341,7 +342,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit } } - override def listPaymentsOverview(limit: Int): Seq[PlainPayment] = withMetrics("payments/list-overview") { + override def listPaymentsOverview(limit: Int): Seq[PlainPayment] = withMetrics("payments/list-overview", DbBackends.Postgres) { // This query is an UNION of the ``sent_payments`` and ``received_payments`` table // - missing fields set to NULL when needed. // - only retrieve incoming payments that did receive funds. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala index 9bae575e67..15dac02d95 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala @@ -19,6 +19,7 @@ package fr.acinq.eclair.db.pg import fr.acinq.bitcoin.Crypto import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PeersDb import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.protocol._ @@ -42,7 +43,7 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb { } } - override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update") { + override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) { withLock { pg => val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray using(pg.prepareStatement("UPDATE peers SET data=? WHERE node_id=?")) { update => @@ -59,7 +60,7 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb { } } - override def removePeer(nodeId: Crypto.PublicKey): Unit = withMetrics("peers/remove") { + override def removePeer(nodeId: Crypto.PublicKey): Unit = withMetrics("peers/remove", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("DELETE FROM peers WHERE node_id=?")) { statement => statement.setString(1, nodeId.value.toHex) @@ -68,7 +69,7 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb { } } - override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get") { + override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT data FROM peers WHERE node_id=?")) { statement => statement.setString(1, nodeId.value.toHex) @@ -78,7 +79,7 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb { } } - override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list") { + override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list", DbBackends.Postgres) { withLock { pg => using(pg.createStatement()) { statement => val rs = statement.executeQuery("SELECT node_id, data FROM peers") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingRelayDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingRelayDb.scala index 66754af103..e14cf45e7a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingRelayDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingRelayDb.scala @@ -20,11 +20,12 @@ package fr.acinq.eclair.db.pg import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.channel.{Command, HtlcSettlementCommand} import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PendingRelayDb import fr.acinq.eclair.db.pg.PgUtils._ import fr.acinq.eclair.wire.internal.CommandCodecs.cmdCodec -import javax.sql.DataSource +import javax.sql.DataSource import scala.collection.immutable.Queue class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRelayDb { @@ -44,7 +45,7 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel } } - override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add") { + override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("INSERT INTO pending_relay VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) { statement => statement.setString(1, channelId.toHex) @@ -55,7 +56,7 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel } } - override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove") { + override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=? AND htlc_id=?")) { statement => statement.setString(1, channelId.toHex) @@ -65,7 +66,7 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel } } - override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel") { + override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT htlc_id, data FROM pending_relay WHERE channel_id=?")) { statement => statement.setString(1, channelId.toHex) @@ -75,7 +76,7 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel } } - override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list") { + override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement => val rs = statement.executeQuery() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 72464e8934..292340ad3d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} @@ -109,7 +110,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") { + override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.channelId.toArray) statement.setBytes(2, e.remoteNodeId.value.toArray) @@ -122,7 +123,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent") { + override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => e.parts.foreach(p => { statement.setLong(1, p.amount.toLong) @@ -141,7 +142,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received") { + override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement => e.parts.foreach(p => { statement.setLong(1, p.amount.toLong) @@ -154,7 +155,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed") { + override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Sqlite) { val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => // non-trampoline relayed payments have one input and one output @@ -176,7 +177,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee") { + override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.channelId.toArray) statement.setBytes(2, e.remoteNodeId.value.toArray) @@ -188,7 +189,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } } - override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error") { + override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement => val (errorName, errorMessage) = e.error match { case LocalError(t) => (t.getClass.getSimpleName, t.getMessage) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 8b87a735d1..6d5c06ed37 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.payment.{ChannelPaymentRelayed, PaymentEvent, PaymentReceived, PaymentRelayed, PaymentSent} import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec import grizzled.slf4j.Logging @@ -77,7 +78,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { } - override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel") { + override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Sqlite) { val data = stateDataCodec.encode(state).require.toByteArray using(sqlite.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update => update.setBytes(1, data) @@ -115,7 +116,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) } - override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") { + override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() @@ -132,14 +133,14 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { } } - override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels") { + override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") codecSequence(rs, stateDataCodec) } } - override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info") { + override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement => statement.setBytes(1, channelId.toArray) statement.setLong(2, commitmentNumber) @@ -149,7 +150,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { } } - override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos") { + override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT payment_hash, cltv_expiry FROM htlc_infos WHERE channel_id=? AND commitment_number=?")) { statement => statement.setBytes(1, channelId.toArray) statement.setLong(2, commitmentNumber) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala index 1ad58e080f..a4d78ac18d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala @@ -17,10 +17,10 @@ package fr.acinq.eclair.db.sqlite import java.sql.Connection - import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.ShortChannelId import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.router.Router.PublicChannel import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec} @@ -58,7 +58,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { statement.executeUpdate("CREATE TABLE IF NOT EXISTS pruned (short_channel_id INTEGER NOT NULL PRIMARY KEY)") } - override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node") { + override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO nodes VALUES (?, ?)")) { statement => statement.setBytes(1, n.nodeId.value.toArray) statement.setBytes(2, nodeAnnouncementCodec.encode(n).require.toByteArray) @@ -66,7 +66,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node") { + override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node", DbBackends.Sqlite) { using(sqlite.prepareStatement("UPDATE nodes SET data=? WHERE node_id=?")) { statement => statement.setBytes(1, nodeAnnouncementCodec.encode(n).require.toByteArray) statement.setBytes(2, n.nodeId.value.toArray) @@ -74,7 +74,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node") { + override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT data FROM nodes WHERE node_id=?")) { statement => statement.setBytes(1, nodeId.value.toArray) val rs = statement.executeQuery() @@ -82,21 +82,21 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node") { + override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM nodes WHERE node_id=?")) { statement => statement.setBytes(1, nodeId.value.toArray) statement.executeUpdate() } } - override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes") { + override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes", DbBackends.Sqlite) { using(sqlite.createStatement()) { statement => val rs = statement.executeQuery("SELECT data FROM nodes") codecSequence(rs, nodeAnnouncementCodec) } } - override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel") { + override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO channels VALUES (?, ?, ?, ?, NULL, NULL)")) { statement => statement.setLong(1, c.shortChannelId.toLong) statement.setString(2, txid.toHex) @@ -106,7 +106,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel") { + override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel", DbBackends.Sqlite) { val column = if (u.isNode1) "channel_update_1" else "channel_update_2" using(sqlite.prepareStatement(s"UPDATE channels SET $column=? WHERE short_channel_id=?")) { statement => statement.setBytes(1, channelUpdateCodec.encode(u).require.toByteArray) @@ -115,7 +115,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels") { + override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels", DbBackends.Sqlite) { using(sqlite.createStatement()) { statement => val rs = statement.executeQuery("SELECT channel_announcement, txid, capacity_sat, channel_update_1, channel_update_2 FROM channels") var m = SortedMap.empty[ShortChannelId, PublicChannel] @@ -131,7 +131,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels") { + override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => shortChannelIds .grouped(1000) // remove channels by batch of 1000 @@ -142,7 +142,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned") { + override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO pruned VALUES (?)"), inTransaction = true) { statement => shortChannelIds.foreach(shortChannelId => { statement.setLong(1, shortChannelId.toLong) @@ -152,13 +152,13 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } } - override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned") { + override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") } } - override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned") { + override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement => statement.setLong(1, shortChannelId.toLong) val rs = statement.executeQuery() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePaymentsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePaymentsDb.scala index 9f4ca1d022..f7287a44a7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePaymentsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePaymentsDb.scala @@ -18,11 +18,11 @@ package fr.acinq.eclair.db.sqlite import java.sql.{Connection, ResultSet, Statement} import java.util.UUID - import fr.acinq.bitcoin.ByteVector32 import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.MilliSatoshi import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.db.sqlite.SqliteUtils._ import fr.acinq.eclair.payment.{PaymentFailed, PaymentRequest, PaymentSent} @@ -127,7 +127,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } - override def addOutgoingPayment(sent: OutgoingPayment): Unit = withMetrics("payments/add-outgoing") { + override def addOutgoingPayment(sent: OutgoingPayment): Unit = withMetrics("payments/add-outgoing", DbBackends.Sqlite) { require(sent.status == OutgoingPaymentStatus.Pending, s"outgoing payment isn't pending (${sent.status.getClass.getSimpleName})") using(sqlite.prepareStatement("INSERT INTO sent_payments (id, parent_id, external_id, payment_hash, payment_type, amount_msat, recipient_amount_msat, recipient_node_id, created_at, payment_request) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, sent.id.toString) @@ -144,7 +144,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = withMetrics("payments/update-outgoing-sent") { + override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = withMetrics("payments/update-outgoing-sent", DbBackends.Sqlite) { using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement => paymentResult.parts.foreach(p => { statement.setLong(1, p.timestamp) @@ -158,7 +158,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed") { + override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Sqlite) { using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement => statement.setLong(1, paymentResult.timestamp) statement.setBytes(2, paymentFailuresCodec.encode(paymentResult.failures.map(f => FailureSummary(f)).toList).require.toByteArray) @@ -215,7 +215,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = withMetrics("payments/get-outgoing") { + override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = withMetrics("payments/get-outgoing", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM sent_payments WHERE id = ?")) { statement => statement.setString(1, id.toString) val rs = statement.executeQuery() @@ -227,7 +227,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listOutgoingPayments(parentId: UUID): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-parent-id") { + override def listOutgoingPayments(parentId: UUID): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-parent-id", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM sent_payments WHERE parent_id = ? ORDER BY created_at")) { statement => statement.setString(1, parentId.toString) val rs = statement.executeQuery() @@ -239,7 +239,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-payment-hash") { + override def listOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-payment-hash", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM sent_payments WHERE payment_hash = ? ORDER BY created_at")) { statement => statement.setBytes(1, paymentHash.toArray) val rs = statement.executeQuery() @@ -251,7 +251,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp") { + override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM sent_payments WHERE created_at >= ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -264,7 +264,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def addIncomingPayment(pr: PaymentRequest, preimage: ByteVector32, paymentType: String): Unit = withMetrics("payments/add-incoming") { + override def addIncomingPayment(pr: PaymentRequest, preimage: ByteVector32, paymentType: String): Unit = withMetrics("payments/add-incoming", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO received_payments (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at) VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, pr.paymentHash.toArray) statement.setBytes(2, preimage.toArray) @@ -276,7 +276,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: Long): Unit = withMetrics("payments/receive-incoming") { + override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: Long): Unit = withMetrics("payments/receive-incoming", DbBackends.Sqlite) { using(sqlite.prepareStatement("UPDATE received_payments SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update => update.setLong(1, amount.toLong) update.setLong(2, receivedAt) @@ -306,7 +306,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = withMetrics("payments/get-incoming") { + override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = withMetrics("payments/get-incoming", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM received_payments WHERE payment_hash = ?")) { statement => statement.setBytes(1, paymentHash.toArray) val rs = statement.executeQuery() @@ -318,7 +318,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming") { + override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM received_payments WHERE created_at > ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -331,7 +331,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received") { + override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM received_payments WHERE received_msat > 0 AND created_at > ? AND created_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -344,7 +344,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending") { + override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM received_payments WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at > ? ORDER BY created_at")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -358,7 +358,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired") { + override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT * FROM received_payments WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at < ? ORDER BY created_at")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -372,7 +372,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging { } } - override def listPaymentsOverview(limit: Int): Seq[PlainPayment] = withMetrics("payments/list-overview") { + override def listPaymentsOverview(limit: Int): Seq[PlainPayment] = withMetrics("payments/list-overview", DbBackends.Sqlite) { // This query is an UNION of the ``sent_payments`` and ``received_payments`` table // - missing fields set to NULL when needed. // - only retrieve incoming payments that did receive funds. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index 30d97e3543..b42a371b33 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -17,10 +17,10 @@ package fr.acinq.eclair.db.sqlite import java.sql.Connection - import fr.acinq.bitcoin.Crypto import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PeersDb import fr.acinq.eclair.db.sqlite.SqliteUtils.{codecSequence, getVersion, using} import fr.acinq.eclair.wire.protocol._ @@ -38,7 +38,7 @@ class SqlitePeersDb(sqlite: Connection) extends PeersDb { statement.executeUpdate("CREATE TABLE IF NOT EXISTS peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") } - override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update") { + override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) { val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray using(sqlite.prepareStatement("UPDATE peers SET data=? WHERE node_id=?")) { update => update.setBytes(1, data) @@ -53,14 +53,14 @@ class SqlitePeersDb(sqlite: Connection) extends PeersDb { } } - override def removePeer(nodeId: Crypto.PublicKey): Unit = withMetrics("peers/remove") { + override def removePeer(nodeId: Crypto.PublicKey): Unit = withMetrics("peers/remove", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM peers WHERE node_id=?")) { statement => statement.setBytes(1, nodeId.value.toArray) statement.executeUpdate() } } - override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get") { + override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT data FROM peers WHERE node_id=?")) { statement => statement.setBytes(1, nodeId.value.toArray) val rs = statement.executeQuery() @@ -68,7 +68,7 @@ class SqlitePeersDb(sqlite: Connection) extends PeersDb { } } - override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list") { + override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list", DbBackends.Sqlite) { using(sqlite.createStatement()) { statement => val rs = statement.executeQuery("SELECT node_id, data FROM peers") var m: Map[PublicKey, NodeAddress] = Map() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingRelayDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingRelayDb.scala index b91dcd96da..f5c0e46793 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingRelayDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingRelayDb.scala @@ -17,10 +17,10 @@ package fr.acinq.eclair.db.sqlite import java.sql.Connection - import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.channel.{Command, HtlcSettlementCommand} import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.PendingRelayDb import fr.acinq.eclair.wire.internal.CommandCodecs.cmdCodec @@ -40,7 +40,7 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb { statement.executeUpdate("CREATE TABLE IF NOT EXISTS pending_relay (channel_id BLOB NOT NULL, htlc_id INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY(channel_id, htlc_id))") } - override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add") { + override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO pending_relay VALUES (?, ?, ?)")) { statement => statement.setBytes(1, channelId.toArray) statement.setLong(2, cmd.id) @@ -49,7 +49,7 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb { } } - override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove") { + override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=? AND htlc_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.setLong(2, htlcId) @@ -57,7 +57,7 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb { } } - override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel") { + override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT data FROM pending_relay WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) val rs = statement.executeQuery() @@ -65,7 +65,7 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb { } } - override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list") { + override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list", DbBackends.Sqlite) { using(sqlite.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement => val rs = statement.executeQuery() var q: Queue[(ByteVector32, Long)] = Queue()