From 003ead8362be819cb63e40c1a875666558ae0540 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 09:45:25 +0200 Subject: [PATCH 1/6] Add trampoline info to auditDB Add two columns for the recipient and amount sent to the recipient in case of trampoline relaying. When using trampoline, the recipient may not be the next node on the path. --- .../fr/acinq/eclair/db/DbEventHandler.scala | 2 +- .../fr/acinq/eclair/db/pg/PgAuditDb.scala | 37 ++++++++++++---- .../eclair/db/sqlite/SqliteAuditDb.scala | 42 +++++++++++++++---- .../acinq/eclair/payment/PaymentEvents.scala | 2 +- .../eclair/payment/relay/NodeRelay.scala | 2 +- .../fr/acinq/eclair/db/AuditDbSpec.scala | 32 +++++++------- 6 files changed, 80 insertions(+), 37 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index b67381725f..38a8f6eaab 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -69,7 +69,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { .withTag(PaymentTags.Relay, PaymentTags.RelayType(e)) .record((e.amountIn - e.amountOut).truncateToSatoshi.toLong) e match { - case TrampolinePaymentRelayed(_, incoming, outgoing, _) => + case TrampolinePaymentRelayed(_, incoming, outgoing, _, _, _) => PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length) PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length) incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 70947b9ec1..94c0a50d83 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -27,7 +27,9 @@ import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging +import scodec.bits.ByteVector +import java.sql.Statement import java.util.UUID import javax.sql.DataSource import scala.collection.immutable.Queue @@ -38,18 +40,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { import ExtendedResultSet._ val DB_NAME = "audit" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 - case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long) + case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long) inTransaction { pg => using(pg.createStatement()) { statement => + def migration45(statement: Statement): Int = { + statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId TEXT") + statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount BIGINT") + } getVersion(statement, DB_NAME, CURRENT_VERSION) match { + case 4 => + logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION") + migration45(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) case CURRENT_VERSION => statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL, recipientNodeId TEXT, recipientAmount BIGINT)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)") @@ -123,19 +133,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => // non-trampoline relayed payments have one input and one output - Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts)) - case TrampolinePaymentRelayed(_, incoming, outgoing, ts) => + Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts)) + case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) => // trampoline relayed payments do MPP aggregation and may have M inputs and N outputs - incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts)) + incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) } for (p <- payments) { - using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, e.paymentHash.toHex) statement.setLong(2, p.amount.toLong) statement.setString(3, p.channelId.toHex) statement.setString(4, p.direction) statement.setString(5, p.relayType) statement.setLong(6, e.timestamp) + statement.setString(7, p.recipientNodeId.map(_.value.toHex).orNull) + p.recipientAmount match { + case None => + statement.setNull(8, java.sql.Types.BIGINT) + case Some(recipientAmount) => + statement.setLong(8, recipientAmount.toLong) + } statement.executeUpdate() } } @@ -243,6 +260,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { MilliSatoshi(rs.getLong("amount_msat")), rs.getString("direction"), rs.getString("relay_type"), + Option(rs.getString("recipientNodeId")).map(s => PublicKey(ByteVector.fromValidHex(s))), + Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat), rs.getLong("timestamp")) relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part)) } @@ -253,10 +272,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) parts.headOption match { - case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map { + case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } - case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil + case Some(RelayedPart(_, _, _, "trampoline",Some(recipientNodeId), Some(recipientAmount), timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil case _ => Nil } }.toSeq.sortBy(_.timestamp) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 292340ad3d..69c4829656 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -27,6 +27,7 @@ import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging +import scodec.bits.ByteVector import java.sql.{Connection, Statement} import java.util.UUID @@ -38,9 +39,9 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { import ExtendedResultSet._ val DB_NAME = "audit" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 - case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long) + case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long) using(sqlite.createStatement(), inTransaction = true) { statement => @@ -75,26 +76,38 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)") } + def migration45(statement: Statement): Int = { + statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId BLOB") + statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount INTEGER") + } + getVersion(statement, DB_NAME, CURRENT_VERSION) match { case 1 => // previous version let's migrate logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION") migration12(statement) migration23(statement) migration34(statement) + migration45(statement) setVersion(statement, DB_NAME, CURRENT_VERSION) case 2 => logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION") migration23(statement) migration34(statement) + migration45(statement) setVersion(statement, DB_NAME, CURRENT_VERSION) case 3 => logger.warn(s"migrating db $DB_NAME, found version=3 current=$CURRENT_VERSION") migration34(statement) + migration45(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) + case 4 => + logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION") + migration45(statement) setVersion(statement, DB_NAME, CURRENT_VERSION) case CURRENT_VERSION => statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL, recipientNodeId BLOB, recipientAmount INTEGER)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)") @@ -159,19 +172,27 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => // non-trampoline relayed payments have one input and one output - Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts)) - case TrampolinePaymentRelayed(_, incoming, outgoing, ts) => + Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts)) + case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) => // trampoline relayed payments do MPP aggregation and may have M inputs and N outputs - incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts)) + incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++ + outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) } for (p <- payments) { - using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.paymentHash.toArray) statement.setLong(2, p.amount.toLong) statement.setBytes(3, p.channelId.toArray) statement.setString(4, p.direction) statement.setString(5, p.relayType) statement.setLong(6, e.timestamp) + statement.setBytes(7, p.recipientNodeId.map(_.value.toArray).orNull) + p.recipientAmount match { + case None => + statement.setNull(8, java.sql.Types.INTEGER) + case Some(recipientAmount) => + statement.setLong(8, recipientAmount.toLong) + } statement.executeUpdate() } } @@ -269,6 +290,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { MilliSatoshi(rs.getLong("amount_msat")), rs.getString("direction"), rs.getString("relay_type"), + Option(rs.getBytes("recipientNodeId")).map(bytes => PublicKey(ByteVector(bytes))), + Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat), rs.getLong("timestamp")) relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part)) } @@ -279,10 +302,11 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) parts.headOption match { - case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map { + case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } - case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil + case Some(RelayedPart(_, _, _, "trampoline", Some(recipientNodeId), Some(recipientAmount), timestamp)) => + TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil case _ => Nil } }.toSeq.sortBy(_.timestamp) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala index c62a53d5c3..ffad0f6b8d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala @@ -85,7 +85,7 @@ sealed trait PaymentRelayed extends PaymentEvent { case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed -case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed { +case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, recipientNodeId: PublicKey, recipientAmount: MilliSatoshi, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed { override val amountIn: MilliSatoshi = incoming.map(_.amount).sum override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index 572395c479..5ae6e3a538 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -349,7 +349,7 @@ class NodeRelay private(nodeParams: NodeParams, } val incoming = upstream.adds.map(add => PaymentRelayed.Part(add.amountMsat, add.channelId)) val outgoing = paymentSent.parts.map(part => PaymentRelayed.Part(part.amountWithFees, part.toChannelId)) - context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing)) + context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing, paymentSent.recipientNodeId, paymentSent.recipientAmount)) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index 22134e0051..b450ca0396 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -71,7 +71,7 @@ class AuditDbSpec extends AnyFunSuite { val e7 = ChannelEvent(randomBytes32, randomKey.publicKey, 456123000 sat, isFunder = true, isPrivate = false, ChannelEvent.EventType.Closed(MutualClose(null))) val e8 = ChannelErrorOccurred(null, randomBytes32, randomKey.publicKey, null, LocalError(new RuntimeException("oops")), isFatal = true) val e9 = ChannelErrorOccurred(null, randomBytes32, randomKey.publicKey, null, RemoteError(Error(randomBytes32, "remote oops")), isFatal = true) - val e10 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(20000 msat, randomBytes32), PaymentRelayed.Part(22000 msat, randomBytes32)), Seq(PaymentRelayed.Part(10000 msat, randomBytes32), PaymentRelayed.Part(12000 msat, randomBytes32), PaymentRelayed.Part(15000 msat, randomBytes32))) + val e10 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(20000 msat, randomBytes32), PaymentRelayed.Part(22000 msat, randomBytes32)), Seq(PaymentRelayed.Part(10000 msat, randomBytes32), PaymentRelayed.Part(12000 msat, randomBytes32), PaymentRelayed.Part(15000 msat, randomBytes32)), randomKey.publicKey, 30000 msat) val multiPartPaymentHash = randomBytes32 val now = System.currentTimeMillis val e11 = ChannelPaymentRelayed(13000 msat, 11000 msat, multiPartPaymentHash, randomBytes32, randomBytes32, now) @@ -119,8 +119,8 @@ class AuditDbSpec extends AnyFunSuite { db.add(ChannelPaymentRelayed(43000 msat, 42000 msat, randomBytes32, c5, c1)) db.add(ChannelPaymentRelayed(42000 msat, 40000 msat, randomBytes32, c5, c2)) db.add(ChannelPaymentRelayed(45000 msat, 40000 msat, randomBytes32, c5, c6)) - db.add(TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(25000 msat, c6)), Seq(PaymentRelayed.Part(20000 msat, c4)))) - db.add(TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(46000 msat, c6)), Seq(PaymentRelayed.Part(16000 msat, c2), PaymentRelayed.Part(10000 msat, c4), PaymentRelayed.Part(14000 msat, c4)))) + db.add(TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(25000 msat, c6)), Seq(PaymentRelayed.Part(20000 msat, c4)), randomKey.publicKey, 15000 msat)) + db.add(TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(46000 msat, c6)), Seq(PaymentRelayed.Part(16000 msat, c2), PaymentRelayed.Part(10000 msat, c4), PaymentRelayed.Part(14000 msat, c4)), randomKey.publicKey, 37000 msat)) db.add(NetworkFeePaid(null, n2, c2, Transaction(0, Seq.empty, Seq.empty, 0), 200 sat, "funding")) db.add(NetworkFeePaid(null, n2, c2, Transaction(0, Seq.empty, Seq.empty, 0), 300 sat, "mutual")) @@ -165,7 +165,7 @@ class AuditDbSpec extends AnyFunSuite { val outgoingCount = 1 + Random.nextInt(4) val incoming = Seq(PaymentRelayed.Part(10000 msat, randomBytes32)) val outgoing = (1 to outgoingCount).map(_ => PaymentRelayed.Part(Random.nextInt(2000).msat, channelIds(Random.nextInt(channelCount)))) - db.add(TrampolinePaymentRelayed(randomBytes32, incoming, outgoing)) + db.add(TrampolinePaymentRelayed(randomBytes32, incoming, outgoing, randomKey.publicKey, 5000 msat)) } else { val toChannelId = channelIds(Random.nextInt(channelCount)) db.add(ChannelPaymentRelayed(10000 msat, Random.nextInt(10000).msat, randomBytes32, randomBytes32, toChannelId)) @@ -179,7 +179,7 @@ class AuditDbSpec extends AnyFunSuite { } } - test("handle migration version 1 -> 4") { + test("handle migration version 1 -> 5") { forAllDbs { case _: TestPgDatabases => // no migration case dbs: TestSqliteDatabases => @@ -229,7 +229,7 @@ class AuditDbSpec extends AnyFunSuite { val migratedDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version changed from 1 -> 4 + assert(getVersion(statement, "audit", 5) == 5) // version changed from 1 -> 5 } // existing rows in the 'sent' table will use id=00000000-0000-0000-0000-000000000000 as default @@ -238,7 +238,7 @@ class AuditDbSpec extends AnyFunSuite { val postMigrationDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version 4 + assert(getVersion(statement, "audit", 5) == 5) // version 5 } postMigrationDb.add(ps1) @@ -251,7 +251,7 @@ class AuditDbSpec extends AnyFunSuite { } } - test("handle migration version 2 -> 4") { + test("handle migration version 2 -> 5") { forAllDbs { case _: TestPgDatabases => // no migration case dbs: TestSqliteDatabases => @@ -277,7 +277,7 @@ class AuditDbSpec extends AnyFunSuite { } using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 2) // version 2 is deployed now + assert(getVersion(statement, "audit", 5) == 2) // version 2 is deployed now } val e1 = ChannelErrorOccurred(null, randomBytes32, randomKey.publicKey, null, LocalError(new RuntimeException("oops")), isFatal = true) @@ -286,7 +286,7 @@ class AuditDbSpec extends AnyFunSuite { val migratedDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version changed from 2 -> 4 + assert(getVersion(statement, "audit", 5) == 5) // version changed from 2 -> 5 } migratedDb.add(e1) @@ -294,14 +294,14 @@ class AuditDbSpec extends AnyFunSuite { val postMigrationDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version 4 + assert(getVersion(statement, "audit", 5) == 5) // version 5 } postMigrationDb.add(e2) } } - test("handle migration version 3 -> 4") { + test("handle migration version 3 -> 5") { forAllDbs { case _: TestPgDatabases => // no migration case dbs: TestSqliteDatabases => @@ -329,7 +329,7 @@ class AuditDbSpec extends AnyFunSuite { } using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 3) // version 3 is deployed now + assert(getVersion(statement, "audit", 5) == 3) // version 3 is deployed now } val pp1 = PaymentSent.PartialPayment(UUID.randomUUID(), 500 msat, 10 msat, randomBytes32, None, 100) @@ -366,7 +366,7 @@ class AuditDbSpec extends AnyFunSuite { val migratedDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version changed from 3 -> 4 + assert(getVersion(statement, "audit", 5) == 5) // version changed from 3 -> 5 } assert(migratedDb.listSent(50, 150).toSet === Set( @@ -378,14 +378,14 @@ class AuditDbSpec extends AnyFunSuite { val postMigrationDb = new SqliteAuditDb(connection) using(connection.createStatement()) { statement => - assert(getVersion(statement, "audit", 4) == 4) // version 4 + assert(getVersion(statement, "audit", 5) == 5) // version 5 } val ps2 = PaymentSent(UUID.randomUUID(), randomBytes32, randomBytes32, 1100 msat, randomKey.publicKey, Seq( PaymentSent.PartialPayment(UUID.randomUUID(), 500 msat, 10 msat, randomBytes32, None, 160), PaymentSent.PartialPayment(UUID.randomUUID(), 600 msat, 5 msat, randomBytes32, None, 165) )) - val relayed3 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(450 msat, randomBytes32), PaymentRelayed.Part(500 msat, randomBytes32)), Seq(PaymentRelayed.Part(800 msat, randomBytes32)), 150) + val relayed3 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(450 msat, randomBytes32), PaymentRelayed.Part(500 msat, randomBytes32)), Seq(PaymentRelayed.Part(800 msat, randomBytes32)), randomKey.publicKey, 700 msat, 150) postMigrationDb.add(ps2) assert(postMigrationDb.listSent(155, 200) === Seq(ps2)) From 63d0eb1d0bfe36b4029cb67765a129ea221c57db Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 14:03:49 +0200 Subject: [PATCH 2/6] Put trampoline information in separate table --- .../fr/acinq/eclair/db/pg/PgAuditDb.scala | 56 +++++++++++------ .../eclair/db/sqlite/SqliteAuditDb.scala | 62 ++++++++++++------- .../fr/acinq/eclair/api/ApiServiceSpec.scala | 4 +- 3 files changed, 77 insertions(+), 45 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 94c0a50d83..76a7bf6bb8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging -import scodec.bits.ByteVector +import scodec.bits._ import java.sql.Statement import java.util.UUID @@ -42,13 +42,14 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val DB_NAME = "audit" val CURRENT_VERSION = 5 - case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long) + case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long) inTransaction { pg => using(pg.createStatement()) { statement => def migration45(statement: Statement): Int = { - statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId TEXT") - statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount BIGINT") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)") } getVersion(statement, DB_NAME, CURRENT_VERSION) match { @@ -59,7 +60,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { case CURRENT_VERSION => statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL, recipientNodeId TEXT, recipientAmount BIGINT)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)") @@ -68,6 +70,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)") @@ -133,26 +137,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => // non-trampoline relayed payments have one input and one output - Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts)) - case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) => + Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts)) + case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) => + using(pg.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => + statement.setString(1, e.paymentHash.toHex) + statement.setLong(2, nextTrampolineAmount.toLong) + statement.setString(3, nextTrampolineNodeId.value.toHex) + statement.setLong(4, e.timestamp) + statement.executeUpdate() + } // trampoline relayed payments do MPP aggregation and may have M inputs and N outputs - incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) + incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts)) } for (p <- payments) { - using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement => + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setString(1, e.paymentHash.toHex) statement.setLong(2, p.amount.toLong) statement.setString(3, p.channelId.toHex) statement.setString(4, p.direction) statement.setString(5, p.relayType) statement.setLong(6, e.timestamp) - statement.setString(7, p.recipientNodeId.map(_.value.toHex).orNull) - p.recipientAmount match { - case None => - statement.setNull(8, java.sql.Types.BIGINT) - case Some(recipientAmount) => - statement.setLong(8, recipientAmount.toLong) - } statement.executeUpdate() } } @@ -248,6 +252,18 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] = inTransaction { pg => + var trampolineByHash = Map.empty[ByteVector32, (MilliSatoshi, PublicKey)] + using(pg.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp >= ? AND timestamp < ?")) { statement => + statement.setLong(1, from) + statement.setLong(2, to) + val rs = statement.executeQuery() + while (rs.next()) { + val paymentHash = rs.getByteVector32FromHex("payment_hash") + val amount = MilliSatoshi(rs.getLong("amount_msat")) + val nodeId = PublicKey(rs.getByteVectorFromHex("next_node_id")) + trampolineByHash += (paymentHash -> (amount, nodeId)) + } + } using(pg.prepareStatement("SELECT * FROM relayed WHERE timestamp >= ? AND timestamp < ? ORDER BY timestamp")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -260,8 +276,6 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { MilliSatoshi(rs.getLong("amount_msat")), rs.getString("direction"), rs.getString("relay_type"), - Option(rs.getString("recipientNodeId")).map(s => PublicKey(ByteVector.fromValidHex(s))), - Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat), rs.getLong("timestamp")) relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part)) } @@ -272,10 +286,12 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) parts.headOption match { - case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map { + case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } - case Some(RelayedPart(_, _, _, "trampoline",Some(recipientNodeId), Some(recipientAmount), timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil + case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"000000000000000000000000000000000000000000000000000000000000000000"))) + TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } }.toSeq.sortBy(_.timestamp) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 69c4829656..cbb805e414 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging -import scodec.bits.ByteVector +import scodec.bits._ import java.sql.{Connection, Statement} import java.util.UUID @@ -41,7 +41,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { val DB_NAME = "audit" val CURRENT_VERSION = 5 - case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long) + case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long) using(sqlite.createStatement(), inTransaction = true) { statement => @@ -77,8 +77,9 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { } def migration45(statement: Statement): Int = { - statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId BLOB") - statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount INTEGER") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, next_node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)") } getVersion(statement, DB_NAME, CURRENT_VERSION) match { @@ -107,7 +108,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { case CURRENT_VERSION => statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL, recipientNodeId BLOB, recipientAmount INTEGER)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, next_node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)") @@ -116,6 +118,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)") statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)") @@ -172,27 +176,27 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { val payments = e match { case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) => // non-trampoline relayed payments have one input and one output - Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts)) - case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) => + Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts)) + case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) => + using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement => + statement.setBytes(1, e.paymentHash.toArray) + statement.setLong(2, nextTrampolineAmount.toLong) + statement.setBytes(3, nextTrampolineNodeId.value.toArray) + statement.setLong(4, e.timestamp) + statement.executeUpdate() + } // trampoline relayed payments do MPP aggregation and may have M inputs and N outputs - incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++ - outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) + incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ + outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts)) } for (p <- payments) { - using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.paymentHash.toArray) statement.setLong(2, p.amount.toLong) statement.setBytes(3, p.channelId.toArray) statement.setString(4, p.direction) statement.setString(5, p.relayType) statement.setLong(6, e.timestamp) - statement.setBytes(7, p.recipientNodeId.map(_.value.toArray).orNull) - p.recipientAmount match { - case None => - statement.setNull(8, java.sql.Types.INTEGER) - case Some(recipientAmount) => - statement.setLong(8, recipientAmount.toLong) - } statement.executeUpdate() } } @@ -277,7 +281,19 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { receivedByHash.values.toSeq.sortBy(_.timestamp) } - override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] = + override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] = { + var trampolineByHash = Map.empty[ByteVector32, (MilliSatoshi, PublicKey)] + using(sqlite.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp >= ? AND timestamp < ?")) { statement => + statement.setLong(1, from) + statement.setLong(2, to) + val rs = statement.executeQuery() + while (rs.next()) { + val paymentHash = rs.getByteVector32("payment_hash") + val amount = MilliSatoshi(rs.getLong("amount_msat")) + val nodeId = PublicKey(rs.getByteVector("next_node_id")) + trampolineByHash += (paymentHash -> (amount, nodeId)) + } + } using(sqlite.prepareStatement("SELECT * FROM relayed WHERE timestamp >= ? AND timestamp < ?")) { statement => statement.setLong(1, from) statement.setLong(2, to) @@ -290,8 +306,6 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { MilliSatoshi(rs.getLong("amount_msat")), rs.getString("direction"), rs.getString("relay_type"), - Option(rs.getBytes("recipientNodeId")).map(bytes => PublicKey(ByteVector(bytes))), - Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat), rs.getLong("timestamp")) relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part)) } @@ -302,15 +316,17 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount) parts.headOption match { - case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map { + case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } - case Some(RelayedPart(_, _, _, "trampoline", Some(recipientNodeId), Some(recipientAmount), timestamp)) => - TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil + case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"000000000000000000000000000000000000000000000000000000000000000000"))) + TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } }.toSeq.sortBy(_.timestamp) } + } override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] = using(sqlite.prepareStatement("SELECT * FROM network_fees WHERE timestamp >= ? AND timestamp < ? ORDER BY timestamp")) { statement => diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala index 84b548df32..b8394c6ac8 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala @@ -631,8 +631,8 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM system.eventStream.publish(prel) wsClient.expectMessage(expectedSerializedPrel) - val ptrel = TrampolinePaymentRelayed(ByteVector32.Zeroes, Seq(PaymentRelayed.Part(21 msat, ByteVector32.Zeroes)), Seq(PaymentRelayed.Part(8 msat, ByteVector32.Zeroes), PaymentRelayed.Part(10 msat, ByteVector32.One)), 1553784963659L) - val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000"}],"timestamp":1553784963659}""" + val ptrel = TrampolinePaymentRelayed(ByteVector32.Zeroes, Seq(PaymentRelayed.Part(21 msat, ByteVector32.Zeroes)), Seq(PaymentRelayed.Part(8 msat, ByteVector32.Zeroes), PaymentRelayed.Part(10 msat, ByteVector32.One)), bobNodeId, 17 msat, 1553784963659L) + val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000"}],"recipientNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","recipientAmount":17,"timestamp":1553784963659}""" assert(serialization.write(ptrel) === expectedSerializedPtrel) system.eventStream.publish(ptrel) wsClient.expectMessage(expectedSerializedPtrel) From 8e71ec2d857548d75a23eb3596babc32fbce67c6 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 14:12:52 +0200 Subject: [PATCH 3/6] Rename nextTrampoline fields --- .../src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala | 2 +- .../src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala index ffad0f6b8d..4b710256d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentEvents.scala @@ -85,7 +85,7 @@ sealed trait PaymentRelayed extends PaymentEvent { case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed -case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, recipientNodeId: PublicKey, recipientAmount: MilliSatoshi, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed { +case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed { override val amountIn: MilliSatoshi = incoming.map(_.amount).sum override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum } diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala index b8394c6ac8..df73b6ff10 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala @@ -632,7 +632,7 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM wsClient.expectMessage(expectedSerializedPrel) val ptrel = TrampolinePaymentRelayed(ByteVector32.Zeroes, Seq(PaymentRelayed.Part(21 msat, ByteVector32.Zeroes)), Seq(PaymentRelayed.Part(8 msat, ByteVector32.Zeroes), PaymentRelayed.Part(10 msat, ByteVector32.One)), bobNodeId, 17 msat, 1553784963659L) - val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000"}],"recipientNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","recipientAmount":17,"timestamp":1553784963659}""" + val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000"}],"nextTrampolineNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","nextTrampolineAmount":17,"timestamp":1553784963659}""" assert(serialization.write(ptrel) === expectedSerializedPtrel) system.eventStream.publish(ptrel) wsClient.expectMessage(expectedSerializedPtrel) From 91e21f04478120e65801dd06b96888d4a2da6b6b Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 14:58:51 +0200 Subject: [PATCH 4/6] Add tests for audit DB migration 4 -> 5 --- .../fr/acinq/eclair/db/pg/PgAuditDb.scala | 2 +- .../eclair/db/sqlite/SqliteAuditDb.scala | 2 +- .../fr/acinq/eclair/db/AuditDbSpec.scala | 196 +++++++++++++++++- 3 files changed, 197 insertions(+), 3 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 76a7bf6bb8..71bc891484 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -290,7 +290,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => - val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"000000000000000000000000000000000000000000000000000000000000000000"))) + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"))) TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index cbb805e414..84edd9eacb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -320,7 +320,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => - val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"000000000000000000000000000000000000000000000000000000000000000000"))) + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"))) TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index b450ca0396..3ec547d5ab 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -16,7 +16,7 @@ package fr.acinq.eclair.db -import fr.acinq.bitcoin.Crypto.PrivateKey +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.{ByteVector32, SatoshiLong, Transaction} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} import fr.acinq.eclair._ @@ -26,13 +26,16 @@ import fr.acinq.eclair.db.AuditDb.Stats import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.jdbc.JdbcUtils.using import fr.acinq.eclair.db.pg.PgAuditDb +import fr.acinq.eclair.db.pg.PgUtils.inTransaction import fr.acinq.eclair.db.sqlite.SqliteAuditDb import fr.acinq.eclair.payment._ import fr.acinq.eclair.wire.protocol.Error import org.scalatest.Tag import org.scalatest.funsuite.AnyFunSuite +import scodec.bits._ import java.util.UUID +import javax.sql.DataSource import scala.concurrent.duration._ import scala.util.Random @@ -394,6 +397,197 @@ class AuditDbSpec extends AnyFunSuite { } } + test("handle migration version 4 -> 5") { + forAllDbs { + case dbs: TestPgDatabases => + import fr.acinq.eclair.db.pg.PgUtils.getVersion + implicit val datasource: DataSource = dbs.datasource + + // simulate existing previous version db + inTransaction { pg => + using(pg.createStatement()) { statement => + getVersion(statement, "audit", 4) + statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)") + + statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)") + } + } + + inTransaction { pg => + using(pg.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 4) // version 4 is deployed now + } + } + + val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32, randomBytes32, randomBytes32, 105) + val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"), 0 msat, 110) + + inTransaction { pg => + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, relayed1.paymentHash.toHex) + statement.setLong(2, relayed1.amountIn.toLong) + statement.setString(3, relayed1.fromChannelId.toHex) + statement.setString(4, "IN") + statement.setString(5, "channel") + statement.setLong(6, relayed1.timestamp) + statement.executeUpdate() + } + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, relayed1.paymentHash.toHex) + statement.setLong(2, relayed1.amountOut.toLong) + statement.setString(3, relayed1.toChannelId.toHex) + statement.setString(4, "OUT") + statement.setString(5, "channel") + statement.setLong(6, relayed1.timestamp) + statement.executeUpdate() + } + for (incoming <- relayed2.incoming) { + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, relayed2.paymentHash.toHex) + statement.setLong(2, incoming.amount.toLong) + statement.setString(3, incoming.channelId.toHex) + statement.setString(4, "IN") + statement.setString(5, "trampoline") + statement.setLong(6, relayed2.timestamp) + statement.executeUpdate() + } + } + for (outgoing <- relayed2.outgoing) { + using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, relayed2.paymentHash.toHex) + statement.setLong(2, outgoing.amount.toLong) + statement.setString(3, outgoing.channelId.toHex) + statement.setString(4, "OUT") + statement.setString(5, "trampoline") + statement.setLong(6, relayed2.timestamp) + statement.executeUpdate() + } + } + } + + val migratedDb = new PgAuditDb()(datasource) + inTransaction { pg => + using(pg.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 5) // version changed from 4 -> 5 + } + } + + assert(migratedDb.listRelayed(100, 120) === Seq(relayed1, relayed2)) + + val postMigrationDb = new PgAuditDb()(datasource) + + inTransaction { pg => + using(pg.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 5) // version 5 + } + } + + val relayed3 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(450 msat, randomBytes32), PaymentRelayed.Part(500 msat, randomBytes32)), Seq(PaymentRelayed.Part(800 msat, randomBytes32)), randomKey.publicKey, 700 msat, 150) + + postMigrationDb.add(relayed3) + assert(postMigrationDb.listRelayed(100, 160) === Seq(relayed1, relayed2, relayed3)) + case dbs: TestSqliteDatabases => + + val connection = dbs.connection + + // simulate existing previous version db + using(connection.createStatement()) { statement => + getVersion(statement, "audit", 4) + statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)") + + statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)") + } + + using(connection.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 4) // version 4 is deployed now + } + + val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32, randomBytes32, randomBytes32, 105) + val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"), 0 msat, 110) + + using(connection.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setBytes(1, relayed1.paymentHash.toArray) + statement.setLong(2, relayed1.amountIn.toLong) + statement.setBytes(3, relayed1.fromChannelId.toArray) + statement.setString(4, "IN") + statement.setString(5, "channel") + statement.setLong(6, relayed1.timestamp) + statement.executeUpdate() + } + using(connection.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setBytes(1, relayed1.paymentHash.toArray) + statement.setLong(2, relayed1.amountOut.toLong) + statement.setBytes(3, relayed1.toChannelId.toArray) + statement.setString(4, "OUT") + statement.setString(5, "channel") + statement.setLong(6, relayed1.timestamp) + statement.executeUpdate() + } + for (incoming <- relayed2.incoming) { + using(connection.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setBytes(1, relayed2.paymentHash.toArray) + statement.setLong(2, incoming.amount.toLong) + statement.setBytes(3, incoming.channelId.toArray) + statement.setString(4, "IN") + statement.setString(5, "trampoline") + statement.setLong(6, relayed2.timestamp) + statement.executeUpdate() + } + } + for (outgoing <- relayed2.outgoing) { + using(connection.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => + statement.setBytes(1, relayed2.paymentHash.toArray) + statement.setLong(2, outgoing.amount.toLong) + statement.setBytes(3, outgoing.channelId.toArray) + statement.setString(4, "OUT") + statement.setString(5, "trampoline") + statement.setLong(6, relayed2.timestamp) + statement.executeUpdate() + } + } + + val migratedDb = new SqliteAuditDb(connection) + using(connection.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 5) // version changed from 3 -> 5 + } + + assert(migratedDb.listRelayed(100, 120) === Seq(relayed1, relayed2)) + + val postMigrationDb = new SqliteAuditDb(connection) + + using(connection.createStatement()) { statement => + assert(getVersion(statement, "audit", 5) == 5) // version 5 + } + + val relayed3 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(450 msat, randomBytes32), PaymentRelayed.Part(500 msat, randomBytes32)), Seq(PaymentRelayed.Part(800 msat, randomBytes32)), randomKey.publicKey, 700 msat, 150) + + postMigrationDb.add(relayed3) + assert(postMigrationDb.listRelayed(100, 160) === Seq(relayed1, relayed2, relayed3)) + } + } + test("ignore invalid values in the DB") { forAllDbs { dbs => val db = dbs.audit From aa9c8a2dd4dbc3d0802e7a9e9301aa712eb022ec Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 15:04:04 +0200 Subject: [PATCH 5/6] Fight IntelliJ's formatter --- eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index 3ec547d5ab..cf6a021a9f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -498,7 +498,7 @@ class AuditDbSpec extends AnyFunSuite { postMigrationDb.add(relayed3) assert(postMigrationDb.listRelayed(100, 160) === Seq(relayed1, relayed2, relayed3)) case dbs: TestSqliteDatabases => - + import fr.acinq.eclair.db.sqlite.SqliteUtils.getVersion val connection = dbs.connection // simulate existing previous version db From f3bdc8d90f30b628832b44f155c6b26a66910388 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 14 Apr 2021 18:09:10 +0200 Subject: [PATCH 6/6] Use PlaceHolderPubKey --- .../src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala | 4 ++-- .../scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala | 4 ++-- .../src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 71bc891484..094920facb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -25,9 +25,9 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ +import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging -import scodec.bits._ import java.sql.Statement import java.util.UUID @@ -290,7 +290,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => - val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"))) + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey)) TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 84edd9eacb..b117f97e9e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -25,9 +25,9 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ +import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong} import grizzled.slf4j.Logging -import scodec.bits._ import java.sql.{Connection, Statement} import java.util.UUID @@ -320,7 +320,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging { case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp) } case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => - val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"))) + val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey)) TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil case _ => Nil } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index cf6a021a9f..9977e274cb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -16,7 +16,7 @@ package fr.acinq.eclair.db -import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, SatoshiLong, Transaction} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} import fr.acinq.eclair._ @@ -29,10 +29,10 @@ import fr.acinq.eclair.db.pg.PgAuditDb import fr.acinq.eclair.db.pg.PgUtils.inTransaction import fr.acinq.eclair.db.sqlite.SqliteAuditDb import fr.acinq.eclair.payment._ +import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey import fr.acinq.eclair.wire.protocol.Error import org.scalatest.Tag import org.scalatest.funsuite.AnyFunSuite -import scodec.bits._ import java.util.UUID import javax.sql.DataSource @@ -431,7 +431,7 @@ class AuditDbSpec extends AnyFunSuite { } val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32, randomBytes32, randomBytes32, 105) - val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"), 0 msat, 110) + val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PlaceHolderPubKey, 0 msat, 110) inTransaction { pg => using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => @@ -525,7 +525,7 @@ class AuditDbSpec extends AnyFunSuite { } val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32, randomBytes32, randomBytes32, 105) - val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PublicKey(hex"020000000000000000000000000000000000000000000000000000000000000000"), 0 msat, 110) + val relayed2 = TrampolinePaymentRelayed(randomBytes32, Seq(PaymentRelayed.Part(300 msat, randomBytes32), PaymentRelayed.Part(350 msat, randomBytes32)), Seq(PaymentRelayed.Part(600 msat, randomBytes32)), PlaceHolderPubKey, 0 msat, 110) using(connection.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, relayed1.paymentHash.toArray)