Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trampoline info to auditDB #1767

Merged
merged 6 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
.withTag(PaymentTags.Relay, PaymentTags.RelayType(e))
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
e match {
case TrampolinePaymentRelayed(_, incoming, outgoing, _) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _, _) =>
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
Expand Down
37 changes: 28 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

import java.sql.Statement
import java.util.UUID
import javax.sql.DataSource
import scala.collection.immutable.Queue
Expand All @@ -38,18 +40,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)
case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long)

inTransaction { pg =>
using(pg.createStatement()) { statement =>
def migration45(statement: Statement): Int = {
statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId TEXT")
statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount BIGINT")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 4 =>
logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION")
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL, recipientNodeId TEXT, recipientAmount BIGINT)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)")
Expand Down Expand Up @@ -123,19 +133,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) =>
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts))
}
for (p <- payments) {
using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, p.amount.toLong)
statement.setString(3, p.channelId.toHex)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setLong(6, e.timestamp)
statement.setString(7, p.recipientNodeId.map(_.value.toHex).orNull)
p.recipientAmount match {
case None =>
statement.setNull(8, java.sql.Types.BIGINT)
case Some(recipientAmount) =>
statement.setLong(8, recipientAmount.toLong)
}
statement.executeUpdate()
}
}
Expand Down Expand Up @@ -243,6 +260,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
MilliSatoshi(rs.getLong("amount_msat")),
rs.getString("direction"),
rs.getString("relay_type"),
Option(rs.getString("recipientNodeId")).map(s => PublicKey(ByteVector.fromValidHex(s))),
Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat),
rs.getLong("timestamp"))
relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part))
}
Expand All @@ -253,10 +272,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil
case Some(RelayedPart(_, _, _, "trampoline",Some(recipientNodeId), Some(recipientAmount), timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

import java.sql.{Connection, Statement}
import java.util.UUID
Expand All @@ -38,9 +39,9 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)
case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, recipientNodeId: Option[PublicKey], recipientAmount: Option[MilliSatoshi], timestamp: Long)

using(sqlite.createStatement(), inTransaction = true) { statement =>

Expand Down Expand Up @@ -75,26 +76,38 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)")
}

def migration45(statement: Statement): Int = {
statement.executeUpdate("ALTER TABLE relayed ADD recipientNodeId BLOB")
statement.executeUpdate("ALTER TABLE relayed ADD recipientAmount INTEGER")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 => // previous version let's migrate
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 3 =>
logger.warn(s"migrating db $DB_NAME, found version=3 current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 4 =>
logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION")
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL, recipientNodeId BLOB, recipientAmount INTEGER)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
Expand Down Expand Up @@ -159,19 +172,27 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", None, None, ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", None, None, ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, recipientNodeId, recipientAmount, ts) =>
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", Some(recipientNodeId), Some(recipientAmount), ts))
}
for (p <- payments) {
using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.paymentHash.toArray)
statement.setLong(2, p.amount.toLong)
statement.setBytes(3, p.channelId.toArray)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setLong(6, e.timestamp)
statement.setBytes(7, p.recipientNodeId.map(_.value.toArray).orNull)
p.recipientAmount match {
case None =>
statement.setNull(8, java.sql.Types.INTEGER)
case Some(recipientAmount) =>
statement.setLong(8, recipientAmount.toLong)
}
statement.executeUpdate()
}
}
Expand Down Expand Up @@ -269,6 +290,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
MilliSatoshi(rs.getLong("amount_msat")),
rs.getString("direction"),
rs.getString("relay_type"),
Option(rs.getBytes("recipientNodeId")).map(bytes => PublicKey(ByteVector(bytes))),
Option(rs.getLong("recipientAmount")).filterNot(_ => rs.wasNull()).map(_ msat),
rs.getLong("timestamp"))
relayedByHash = relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part))
}
Expand All @@ -279,10 +302,11 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case Some(RelayedPart(_, _, _, "channel", None, None, timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil
case Some(RelayedPart(_, _, _, "trampoline", Some(recipientNodeId), Some(recipientAmount), timestamp)) =>
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, recipientNodeId, recipientAmount, timestamp) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ sealed trait PaymentRelayed extends PaymentEvent {

case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed

case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed {
case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, recipientNodeId: PublicKey, recipientAmount: MilliSatoshi, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed {
override val amountIn: MilliSatoshi = incoming.map(_.amount).sum
override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class NodeRelay private(nodeParams: NodeParams,
}
val incoming = upstream.adds.map(add => PaymentRelayed.Part(add.amountMsat, add.channelId))
val outgoing = paymentSent.parts.map(part => PaymentRelayed.Part(part.amountWithFees, part.toChannelId))
context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing))
context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing, paymentSent.recipientNodeId, paymentSent.recipientAmount))
}

}
Loading