diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 5b33da5b92..00246de5cf 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -221,17 +221,19 @@ eclair { port = 5432 username = "" password = "" + readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database pool { max-size = 10 // recommended value = number_of_cpu_cores * 2 connection-timeout = 30 seconds idle-timeout = 10 minutes max-life-time = 30 minutes } + lock-type = "lease" // lease or none (do not use none in production) lease { interval = 5 minutes // lease-interval must be greater than lease-renew-interval renew-interval = 1 minute + lock-timeout = 5 seconds // timeout for the lock statement on the lease table } - lock-type = "lease" // lease or none } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 5125651aa2..cb8179f877 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -95,13 +95,24 @@ object Databases extends Logging { def apply(hikariConfig: HikariConfig, instanceId: UUID, lock: PgLock = PgLock.NoLock, - jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = { + jdbcUrlFile_opt: Option[File], + readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = { jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile)) implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig) implicit val implicitLock: PgLock = lock + lock match { + case PgLock.NoLock => () + case l: PgLock.LeaseLock => + // we obtain a lock right now... + l.obtainExclusiveLock(ds) + // ...and renew the lease regularly + import system.dispatcher + system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => l.obtainExclusiveLock(ds)) + } + val databases = PostgresDatabases( network = new PgNetworkDb, audit = new PgAuditDb, @@ -112,14 +123,13 @@ object Databases extends Logging { dataSource = ds, lock = lock) - lock match { - case PgLock.NoLock => () - case l: PgLock.LeaseLock => - // we obtain a lock right now... - databases.obtainExclusiveLock() - // ...and renew the lease regularly - import system.dispatcher - system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock()) + readOnlyUser_opt.foreach { readOnlyUser => + PgUtils.inTransaction { connection => + using(connection.createStatement()) { statement => + logger.info(s"granting read-only access to user=$readOnlyUser") + statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser") + } + } } databases @@ -183,6 +193,7 @@ object Databases extends Logging { val port = dbConfig.getInt("postgres.port") val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username")) val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password")) + val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user")) val hikariConfig = new HikariConfig() hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database") @@ -200,6 +211,10 @@ object Databases extends Logging { val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`") + // We use a timeout for locks, because we might not be able to get the lock right away due to concurrent access + // by other threads. That timeout gives time for other transactions to complete, then ours can take the lock + val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds + hikariConfig.setConnectionInitSql(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'") PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler) case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`") } @@ -210,7 +225,8 @@ object Databases extends Logging { hikariConfig = hikariConfig, instanceId = instanceId, lock = lock, - jdbcUrlFile_opt = Some(jdbcUrlFile) + jdbcUrlFile_opt = Some(jdbcUrlFile), + readOnlyUser_opt = readOnlyUser_opt ) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala index 070c4eb88b..6689aaa424 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala @@ -139,13 +139,17 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) { + val batchSize = 100 inTransaction { pg => - using(pg.createStatement) { statement => + using(pg.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement => shortChannelIds - .grouped(1000) // remove channels by batch of 1000 - .foreach { _ => - val ids = shortChannelIds.map(_.toLong).mkString(",") - statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)") + .grouped(batchSize) + .foreach { group => + val padded = group.toArray.padTo(batchSize, ShortChannelId(0L)) + for (i <- 0 until batchSize) { + statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1 + } + statement.executeUpdate() } } } @@ -165,8 +169,9 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) { inTransaction { pg => - using(pg.createStatement) { statement => - statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") + using(pg.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement => + statement.setLong(1, shortChannelId.toLong) + statement.executeUpdate() } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala index 3a4d18727e..9e72afa869 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.db.pg +import fr.acinq.eclair.db.Monitoring.Metrics._ +import fr.acinq.eclair.db.Monitoring.Tags import fr.acinq.eclair.db.jdbc.JdbcUtils import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException import grizzled.slf4j.Logging @@ -177,14 +179,16 @@ object PgUtils extends JdbcUtils { using(connection.createStatement()) { statement => // allow only one row in the ownership lease table - statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))") + statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP WITH TIME ZONE NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))") } } private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = { using(connection.createStatement()) { statement => - statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE NOWAIT") + withMetrics("utils/lock", Tags.DbBackends.Postgres) { + statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE") + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala index a4d78ac18d..57eb13a070 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala @@ -16,7 +16,6 @@ package fr.acinq.eclair.db.sqlite -import java.sql.Connection import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.ShortChannelId import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics @@ -27,6 +26,7 @@ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncement import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement} import grizzled.slf4j.Logging +import java.sql.Connection import scala.collection.immutable.SortedMap class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { @@ -132,12 +132,16 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) { - using(sqlite.createStatement) { statement => + val batchSize = 100 + using(sqlite.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement => shortChannelIds - .grouped(1000) // remove channels by batch of 1000 - .foreach { _ => - val ids = shortChannelIds.map(_.toLong).mkString(",") - statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)") + .grouped(batchSize) + .foreach { group => + val padded = group.toArray.padTo(batchSize, ShortChannelId(0L)) + for (i <- 0 until batchSize) { + statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1 + } + statement.executeUpdate() } } } @@ -153,8 +157,9 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) { - using(sqlite.createStatement) { statement => - statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") + using(sqlite.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement => + statement.setLong(1, shortChannelId.toLong) + statement.executeUpdate() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala index c850b16e3c..99d0cc3f15 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala @@ -24,7 +24,6 @@ import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} import fr.acinq.eclair.{ShortChannelId, TxCoordinates} import scala.collection.mutable -import scala.compat.Platform import scala.concurrent.duration._ object StaleChannels { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 06b3edf7df..6ffd01c0e7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -68,7 +68,7 @@ object TestDatabases { // @formatter:off override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection] - override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile)) + override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile), readOnlyUser_opt = None) override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion) override def close(): Unit = pg.close() // @formatter:on diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala index 3657222d22..8d2fa2ec5c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala @@ -30,7 +30,8 @@ import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, Tor2} import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, ShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite -import scala.collection.{SortedMap, mutable} +import scala.collection.{SortedMap, SortedSet, mutable} +import scala.util.Random class NetworkDbSpec extends AnyFunSuite { @@ -248,7 +249,7 @@ class NetworkDbSpec extends AnyFunSuite { updates.foreach(u => db.updateChannel(u)) assert(db.listChannels().keySet === channels.map(_.shortChannelId).toSet) - val toDelete = channels.map(_.shortChannelId).drop(500).take(2500) + val toDelete = channels.map(_.shortChannelId).take(1 + Random.nextInt(2500)) db.removeChannels(toDelete) assert(db.listChannels().keySet === (channels.map(_.shortChannelId).toSet -- toDelete)) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala index 388bf6b731..87adb5ac56 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala @@ -96,17 +96,19 @@ object PgUtilsSpec extends Logging { | port = $port | username = "postgres" | password = "" + | readonly-user = "" | pool { | max-size = 10 // recommended value = number_of_cpu_cores * 2 | connection-timeout = 30 seconds | idle-timeout = 10 minutes | max-life-time = 30 minutes | } + | lock-type = "lease" // lease or none (do not use none in production) | lease { | interval = 5 seconds // lease-interval must be greater than lease-renew-interval | renew-interval = 2 seconds + | lock-timeout = 5 seconds // timeout for the lock statement on the lease table | } - | lock-type = "lease" // lease or none |} |""".stripMargin )