From e1fcdf435b40b3b9d90e1520f684a5950a17b601 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 9 Apr 2021 14:19:12 +0200 Subject: [PATCH 1/6] use prepared statements for pruning --- .../fr/acinq/eclair/db/pg/PgNetworkDb.scala | 19 ++++++++++------- .../eclair/db/sqlite/SqliteNetworkDb.scala | 21 ++++++++++++------- .../acinq/eclair/router/StaleChannels.scala | 1 - .../fr/acinq/eclair/db/NetworkDbSpec.scala | 5 +++-- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala index 070c4eb88b..6689aaa424 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala @@ -139,13 +139,17 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { } override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) { + val batchSize = 100 inTransaction { pg => - using(pg.createStatement) { statement => + using(pg.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement => shortChannelIds - .grouped(1000) // remove channels by batch of 1000 - .foreach { _ => - val ids = shortChannelIds.map(_.toLong).mkString(",") - statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)") + .grouped(batchSize) + .foreach { group => + val padded = group.toArray.padTo(batchSize, ShortChannelId(0L)) + for (i <- 0 until batchSize) { + statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1 + } + statement.executeUpdate() } } } @@ -165,8 +169,9 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging { override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) { inTransaction { pg => - using(pg.createStatement) { statement => - statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") + using(pg.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement => + statement.setLong(1, shortChannelId.toLong) + statement.executeUpdate() } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala index a4d78ac18d..57eb13a070 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteNetworkDb.scala @@ -16,7 +16,6 @@ package fr.acinq.eclair.db.sqlite -import java.sql.Connection import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.ShortChannelId import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics @@ -27,6 +26,7 @@ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncement import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement} import grizzled.slf4j.Logging +import java.sql.Connection import scala.collection.immutable.SortedMap class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { @@ -132,12 +132,16 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) { - using(sqlite.createStatement) { statement => + val batchSize = 100 + using(sqlite.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement => shortChannelIds - .grouped(1000) // remove channels by batch of 1000 - .foreach { _ => - val ids = shortChannelIds.map(_.toLong).mkString(",") - statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)") + .grouped(batchSize) + .foreach { group => + val padded = group.toArray.padTo(batchSize, ShortChannelId(0L)) + for (i <- 0 until batchSize) { + statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1 + } + statement.executeUpdate() } } } @@ -153,8 +157,9 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging { } override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) { - using(sqlite.createStatement) { statement => - statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}") + using(sqlite.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement => + statement.setLong(1, shortChannelId.toLong) + statement.executeUpdate() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala index c850b16e3c..99d0cc3f15 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/StaleChannels.scala @@ -24,7 +24,6 @@ import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} import fr.acinq.eclair.{ShortChannelId, TxCoordinates} import scala.collection.mutable -import scala.compat.Platform import scala.concurrent.duration._ object StaleChannels { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala index 3657222d22..8d2fa2ec5c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/NetworkDbSpec.scala @@ -30,7 +30,8 @@ import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, Tor2} import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, ShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite -import scala.collection.{SortedMap, mutable} +import scala.collection.{SortedMap, SortedSet, mutable} +import scala.util.Random class NetworkDbSpec extends AnyFunSuite { @@ -248,7 +249,7 @@ class NetworkDbSpec extends AnyFunSuite { updates.foreach(u => db.updateChannel(u)) assert(db.listChannels().keySet === channels.map(_.shortChannelId).toSet) - val toDelete = channels.map(_.shortChannelId).drop(500).take(2500) + val toDelete = channels.map(_.shortChannelId).take(1 + Random.nextInt(2500)) db.removeChannels(toDelete) assert(db.listChannels().keySet === (channels.map(_.shortChannelId).toSet -- toDelete)) } From 6d901a33d67728d5653de19e0b9d601e87e6e4c0 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 16 Apr 2021 18:07:32 +0200 Subject: [PATCH 2/6] optional read-only user It is good practice to create a dedicated read-only user to browse the database safely. But since the app itself creates its tables, the postgres user is the owner and a manual `GRANT` is required everytime a new table is added. This PR makes it possible to specify an arbitrary username, that will be granted read-only access to all tables in the `public` schema. NB: The assumption here is that eclair is the only app using the eclair database (in the `CREATE DATABASE eclair` sense), which I believe is reasonable. --- eclair-core/src/main/resources/reference.conf | 1 + .../scala/fr/acinq/eclair/db/Databases.scala | 16 ++++++++++++++-- .../scala/fr/acinq/eclair/TestDatabases.scala | 2 +- .../scala/fr/acinq/eclair/db/PgUtilsSpec.scala | 1 + 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 5b33da5b92..5027676900 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -221,6 +221,7 @@ eclair { port = 5432 username = "" password = "" + readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database pool { max-size = 10 // recommended value = number_of_cpu_cores * 2 connection-timeout = 30 seconds diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 5125651aa2..99ab04169f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -95,7 +95,8 @@ object Databases extends Logging { def apply(hikariConfig: HikariConfig, instanceId: UUID, lock: PgLock = PgLock.NoLock, - jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = { + jdbcUrlFile_opt: Option[File], + readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = { jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile)) @@ -122,6 +123,15 @@ object Databases extends Logging { system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock()) } + readOnlyUser_opt.foreach { readOnlyUser => + PgUtils.inTransaction { connection => + using(connection.createStatement()) { statement => + logger.info(s"granting read-only access to user=$readOnlyUser") + statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser") + } + } + } + databases } @@ -183,6 +193,7 @@ object Databases extends Logging { val port = dbConfig.getInt("postgres.port") val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username")) val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password")) + val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user")) val hikariConfig = new HikariConfig() hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database") @@ -210,7 +221,8 @@ object Databases extends Logging { hikariConfig = hikariConfig, instanceId = instanceId, lock = lock, - jdbcUrlFile_opt = Some(jdbcUrlFile) + jdbcUrlFile_opt = Some(jdbcUrlFile), + readOnlyUser_opt = readOnlyUser_opt ) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 06b3edf7df..6ffd01c0e7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -68,7 +68,7 @@ object TestDatabases { // @formatter:off override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection] - override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile)) + override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile), readOnlyUser_opt = None) override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion) override def close(): Unit = pg.close() // @formatter:on diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala index 388bf6b731..9306274bb1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala @@ -96,6 +96,7 @@ object PgUtilsSpec extends Logging { | port = $port | username = "postgres" | password = "" + | readonly-user = "" | pool { | max-size = 10 // recommended value = number_of_cpu_cores * 2 | connection-timeout = 30 seconds From 641555f2a10edb96088972328e6c21a79e69ca3f Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 16 Apr 2021 18:25:35 +0200 Subject: [PATCH 3/6] set timezone on lease table This only affects newly created table, there is no migration. Users that are already using postgres will keep the previous column type, it doesn't change anything for them. --- eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala index 3a4d18727e..3171a337f0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala @@ -177,7 +177,7 @@ object PgUtils extends JdbcUtils { using(connection.createStatement()) { statement => // allow only one row in the ownership lease table - statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))") + statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP WITH TIME ZONE NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))") } } From e8872f0e3b6350c0c1d60a3718a3e3c0f365ab6d Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 13 Apr 2021 10:59:21 +0200 Subject: [PATCH 4/6] put back lock timeout on lease table We use a timeout, because due to concurrency we may not be able to obtain a lock immediately. The timeout has been set to its original value of 5s and made configurable. --- eclair-core/src/main/resources/reference.conf | 3 ++- .../scala/fr/acinq/eclair/db/Databases.scala | 3 ++- .../scala/fr/acinq/eclair/db/pg/PgUtils.scala | 21 ++++++++++++------- .../scala/fr/acinq/eclair/TestDatabases.scala | 2 +- .../fr/acinq/eclair/db/PgUtilsSpec.scala | 3 ++- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 5027676900..00246de5cf 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -228,11 +228,12 @@ eclair { idle-timeout = 10 minutes max-life-time = 30 minutes } + lock-type = "lease" // lease or none (do not use none in production) lease { interval = 5 minutes // lease-interval must be greater than lease-renew-interval renew-interval = 1 minute + lock-timeout = 5 seconds // timeout for the lock statement on the lease table } - lock-type = "lease" // lease or none } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 99ab04169f..c1a26e136f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -211,7 +211,8 @@ object Databases extends Logging { val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`") - PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler) + val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds + PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockTimeout, lockExceptionHandler) case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`") } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala index 3171a337f0..d8e512e3ed 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.db.pg +import fr.acinq.eclair.db.Monitoring.Metrics._ +import fr.acinq.eclair.db.Monitoring.Tags import fr.acinq.eclair.db.jdbc.JdbcUtils import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException import grizzled.slf4j.Logging @@ -104,12 +106,12 @@ object PgUtils extends JdbcUtils { * * `lockExceptionHandler` provides a lock exception handler to customize the behavior when locking errors occur. */ - case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock { + case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockTimeout: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock { import LeaseLock._ override def obtainExclusiveLock(implicit ds: DataSource): Unit = { - obtainDatabaseLease(instanceId, leaseDuration) match { + obtainDatabaseLease(instanceId, leaseDuration, lockTimeout) match { case Right(_) => () case Left(ex) => lockFailureHandler(ex) } @@ -138,7 +140,7 @@ object PgUtils extends JdbcUtils { /** We use a [[LeaseLock]] mechanism to get a [[LockLease]]. */ case class LockLease(expiresAt: Timestamp, instanceId: UUID, expired: Boolean) - private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized { + private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, lockTimeout: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized { logger.debug(s"trying to acquire database lease (attempt #$attempt) instance ID=$instanceId") // this is a recursive method, we need to make sure we don't enter an infinite loop @@ -146,7 +148,7 @@ object PgUtils extends JdbcUtils { try { inTransaction { implicit connection => - acquireExclusiveTableLock() + acquireExclusiveTableLock(lockTimeout) logger.debug("database lease was successfully acquired") checkDatabaseLease(connection, instanceId) match { case Right(_) => @@ -167,7 +169,7 @@ object PgUtils extends JdbcUtils { connection => logger.warn(s"table $LeaseTable does not exist, trying to create it") initializeLeaseTable(connection) - obtainDatabaseLease(instanceId, leaseDuration, attempt + 1) + obtainDatabaseLease(instanceId, leaseDuration, lockTimeout, attempt + 1) } case t: Throwable => Left(LockFailure.GeneralLockException(t)) } @@ -181,10 +183,15 @@ object PgUtils extends JdbcUtils { } } - private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = { + private def acquireExclusiveTableLock(lockTimeout: FiniteDuration)(implicit connection: Connection): Unit = { using(connection.createStatement()) { statement => - statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE NOWAIT") + // We use a timeout here, because we might not be able to get the lock right away due to concurrent access + // by other threads. That timeout gives time for other transactions to complete, then ours can take the lock + statement.executeUpdate(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'") + withMetrics("utils/lock", Tags.DbBackends.Postgres) { + statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE") + } } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 6ffd01c0e7..541f47cfcf 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -59,7 +59,7 @@ object TestDatabases { val datasource: DataSource = pg.getPostgresDatabase val hikariConfig = new HikariConfig hikariConfig.setDataSource(datasource) - val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, LockFailureHandler.logAndThrow) + val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, 5 seconds, LockFailureHandler.logAndThrow) val jdbcUrlFile: File = new File(sys.props("tmp.dir"), s"jdbcUrlFile_${UUID.randomUUID()}.tmp") jdbcUrlFile.deleteOnExit() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala index 9306274bb1..87adb5ac56 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PgUtilsSpec.scala @@ -103,11 +103,12 @@ object PgUtilsSpec extends Logging { | idle-timeout = 10 minutes | max-life-time = 30 minutes | } + | lock-type = "lease" // lease or none (do not use none in production) | lease { | interval = 5 seconds // lease-interval must be greater than lease-renew-interval | renew-interval = 2 seconds + | lock-timeout = 5 seconds // timeout for the lock statement on the lease table | } - | lock-type = "lease" // lease or none |} |""".stripMargin ) From 38dba58312949da7a775b5c6e82c599201dbf421 Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 19 Apr 2021 11:46:05 +0200 Subject: [PATCH 5/6] obtain lock before initializing tables --- .../scala/fr/acinq/eclair/db/Databases.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index c1a26e136f..e691375e2a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -103,6 +103,16 @@ object Databases extends Logging { implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig) implicit val implicitLock: PgLock = lock + lock match { + case PgLock.NoLock => () + case l: PgLock.LeaseLock => + // we obtain a lock right now... + l.obtainExclusiveLock(ds) + // ...and renew the lease regularly + import system.dispatcher + system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => l.obtainExclusiveLock(ds)) + } + val databases = PostgresDatabases( network = new PgNetworkDb, audit = new PgAuditDb, @@ -113,16 +123,6 @@ object Databases extends Logging { dataSource = ds, lock = lock) - lock match { - case PgLock.NoLock => () - case l: PgLock.LeaseLock => - // we obtain a lock right now... - databases.obtainExclusiveLock() - // ...and renew the lease regularly - import system.dispatcher - system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock()) - } - readOnlyUser_opt.foreach { readOnlyUser => PgUtils.inTransaction { connection => using(connection.createStatement()) { statement => From 7a5bf88365fefb9c22e9642fc8cd73220c3d8b6a Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 20 Apr 2021 11:40:37 +0200 Subject: [PATCH 6/6] set lock timeout at connection creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From Hikari doc: > 🔤connectionInitSql This property sets a SQL statement that will be executed after every new connection creation before adding it to the pool. If this SQL is not valid or throws an exception, it will be treated as a connection failure and the standard retry logic will be followed. Default: none --- .../main/scala/fr/acinq/eclair/db/Databases.scala | 5 ++++- .../scala/fr/acinq/eclair/db/pg/PgUtils.scala | 15 ++++++--------- .../scala/fr/acinq/eclair/TestDatabases.scala | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index e691375e2a..cb8179f877 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -211,8 +211,11 @@ object Databases extends Logging { val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`") + // We use a timeout for locks, because we might not be able to get the lock right away due to concurrent access + // by other threads. That timeout gives time for other transactions to complete, then ours can take the lock val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds - PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockTimeout, lockExceptionHandler) + hikariConfig.setConnectionInitSql(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'") + PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler) case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`") } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala index d8e512e3ed..9e72afa869 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala @@ -106,12 +106,12 @@ object PgUtils extends JdbcUtils { * * `lockExceptionHandler` provides a lock exception handler to customize the behavior when locking errors occur. */ - case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockTimeout: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock { + case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock { import LeaseLock._ override def obtainExclusiveLock(implicit ds: DataSource): Unit = { - obtainDatabaseLease(instanceId, leaseDuration, lockTimeout) match { + obtainDatabaseLease(instanceId, leaseDuration) match { case Right(_) => () case Left(ex) => lockFailureHandler(ex) } @@ -140,7 +140,7 @@ object PgUtils extends JdbcUtils { /** We use a [[LeaseLock]] mechanism to get a [[LockLease]]. */ case class LockLease(expiresAt: Timestamp, instanceId: UUID, expired: Boolean) - private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, lockTimeout: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized { + private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized { logger.debug(s"trying to acquire database lease (attempt #$attempt) instance ID=$instanceId") // this is a recursive method, we need to make sure we don't enter an infinite loop @@ -148,7 +148,7 @@ object PgUtils extends JdbcUtils { try { inTransaction { implicit connection => - acquireExclusiveTableLock(lockTimeout) + acquireExclusiveTableLock() logger.debug("database lease was successfully acquired") checkDatabaseLease(connection, instanceId) match { case Right(_) => @@ -169,7 +169,7 @@ object PgUtils extends JdbcUtils { connection => logger.warn(s"table $LeaseTable does not exist, trying to create it") initializeLeaseTable(connection) - obtainDatabaseLease(instanceId, leaseDuration, lockTimeout, attempt + 1) + obtainDatabaseLease(instanceId, leaseDuration, attempt + 1) } case t: Throwable => Left(LockFailure.GeneralLockException(t)) } @@ -183,12 +183,9 @@ object PgUtils extends JdbcUtils { } } - private def acquireExclusiveTableLock(lockTimeout: FiniteDuration)(implicit connection: Connection): Unit = { + private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = { using(connection.createStatement()) { statement => - // We use a timeout here, because we might not be able to get the lock right away due to concurrent access - // by other threads. That timeout gives time for other transactions to complete, then ours can take the lock - statement.executeUpdate(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'") withMetrics("utils/lock", Tags.DbBackends.Postgres) { statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE") } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index 541f47cfcf..6ffd01c0e7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -59,7 +59,7 @@ object TestDatabases { val datasource: DataSource = pg.getPostgresDatabase val hikariConfig = new HikariConfig hikariConfig.setDataSource(datasource) - val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, 5 seconds, LockFailureHandler.logAndThrow) + val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, LockFailureHandler.logAndThrow) val jdbcUrlFile: File = new File(sys.props("tmp.dir"), s"jdbcUrlFile_${UUID.randomUUID()}.tmp") jdbcUrlFile.deleteOnExit()