Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More database nits #1773

Merged
merged 6 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,19 @@ eclair {
port = 5432
username = ""
password = ""
readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database
pool {
max-size = 10 // recommended value = number_of_cpu_cores * 2
connection-timeout = 30 seconds
idle-timeout = 10 minutes
max-life-time = 30 minutes
}
lock-type = "lease" // lease or none (do not use none in production)
lease {
interval = 5 minutes // lease-interval must be greater than lease-renew-interval
renew-interval = 1 minute
lock-timeout = 5 seconds // timeout for the lock statement on the lease table
}
lock-type = "lease" // lease or none
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Databases extends Logging {
def apply(hikariConfig: HikariConfig,
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {
jdbcUrlFile_opt: Option[File],
readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

Expand All @@ -122,6 +123,15 @@ object Databases extends Logging {
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
}

readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
logger.info(s"granting read-only access to user=$readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
}
}
}

databases
}

Expand Down Expand Up @@ -183,6 +193,7 @@ object Databases extends Logging {
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
Expand All @@ -200,7 +211,8 @@ object Databases extends Logging {
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockTimeout, lockExceptionHandler)
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
}

Expand All @@ -210,7 +222,8 @@ object Databases extends Logging {
hikariConfig = hikariConfig,
instanceId = instanceId,
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile)
jdbcUrlFile_opt = Some(jdbcUrlFile),
readOnlyUser_opt = readOnlyUser_opt
)
}

Expand Down
19 changes: 12 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
val batchSize = 100
inTransaction { pg =>
using(pg.createStatement) { statement =>
using(pg.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -165,8 +169,9 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(pg.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.db.pg

import fr.acinq.eclair.db.Monitoring.Metrics._
import fr.acinq.eclair.db.Monitoring.Tags
import fr.acinq.eclair.db.jdbc.JdbcUtils
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -104,12 +106,12 @@ object PgUtils extends JdbcUtils {
*
* `lockExceptionHandler` provides a lock exception handler to customize the behavior when locking errors occur.
*/
case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock {
case class LeaseLock(instanceId: UUID, leaseDuration: FiniteDuration, leaseRenewInterval: FiniteDuration, lockTimeout: FiniteDuration, lockFailureHandler: LockFailureHandler) extends PgLock {

import LeaseLock._

override def obtainExclusiveLock(implicit ds: DataSource): Unit = {
obtainDatabaseLease(instanceId, leaseDuration) match {
obtainDatabaseLease(instanceId, leaseDuration, lockTimeout) match {
case Right(_) => ()
case Left(ex) => lockFailureHandler(ex)
}
Expand Down Expand Up @@ -138,15 +140,15 @@ object PgUtils extends JdbcUtils {
/** We use a [[LeaseLock]] mechanism to get a [[LockLease]]. */
case class LockLease(expiresAt: Timestamp, instanceId: UUID, expired: Boolean)

private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized {
private def obtainDatabaseLease(instanceId: UUID, leaseDuration: FiniteDuration, lockTimeout: FiniteDuration, attempt: Int = 1)(implicit ds: DataSource): Either[LockFailure, LockLease] = synchronized {
logger.debug(s"trying to acquire database lease (attempt #$attempt) instance ID=$instanceId")

// this is a recursive method, we need to make sure we don't enter an infinite loop
if (attempt > 3) return Left(LockFailure.TooManyLockAttempts)

try {
inTransaction { implicit connection =>
acquireExclusiveTableLock()
acquireExclusiveTableLock(lockTimeout)
logger.debug("database lease was successfully acquired")
checkDatabaseLease(connection, instanceId) match {
case Right(_) =>
Expand All @@ -167,7 +169,7 @@ object PgUtils extends JdbcUtils {
connection =>
logger.warn(s"table $LeaseTable does not exist, trying to create it")
initializeLeaseTable(connection)
obtainDatabaseLease(instanceId, leaseDuration, attempt + 1)
obtainDatabaseLease(instanceId, leaseDuration, lockTimeout, attempt + 1)
}
case t: Throwable => Left(LockFailure.GeneralLockException(t))
}
Expand All @@ -177,14 +179,19 @@ object PgUtils extends JdbcUtils {
using(connection.createStatement()) {
statement =>
// allow only one row in the ownership lease table
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP WITH TIME ZONE NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
}
}

private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = {
private def acquireExclusiveTableLock(lockTimeout: FiniteDuration)(implicit connection: Connection): Unit = {
using(connection.createStatement()) {
statement =>
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE NOWAIT")
// We use a timeout here, because we might not be able to get the lock right away due to concurrent access
// by other threads. That timeout gives time for other transactions to complete, then ours can take the lock
statement.executeUpdate(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'")
withMetrics("utils/lock", Tags.DbBackends.Postgres) {
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.db.sqlite

import java.sql.Connection
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand All @@ -27,6 +26,7 @@ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncement
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
import grizzled.slf4j.Logging

import java.sql.Connection
import scala.collection.immutable.SortedMap

class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
Expand Down Expand Up @@ -132,12 +132,16 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
val batchSize = 100
using(sqlite.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -153,8 +157,9 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(sqlite.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.{ShortChannelId, TxCoordinates}

import scala.collection.mutable
import scala.compat.Platform
import scala.concurrent.duration._

object StaleChannels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object TestDatabases {
val datasource: DataSource = pg.getPostgresDatabase
val hikariConfig = new HikariConfig
hikariConfig.setDataSource(datasource)
val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, LockFailureHandler.logAndThrow)
val lock: PgLock.LeaseLock = PgLock.LeaseLock(UUID.randomUUID(), 10 minutes, 8 minute, 5 seconds, LockFailureHandler.logAndThrow)

val jdbcUrlFile: File = new File(sys.props("tmp.dir"), s"jdbcUrlFile_${UUID.randomUUID()}.tmp")
jdbcUrlFile.deleteOnExit()
Expand All @@ -68,7 +68,7 @@ object TestDatabases {

// @formatter:off
override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection]
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile))
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile), readOnlyUser_opt = None)
override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion)
override def close(): Unit = pg.close()
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, Tor2}
import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, ShortChannelId, TestDatabases, randomBytes32, randomKey}
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.{SortedMap, mutable}
import scala.collection.{SortedMap, SortedSet, mutable}
import scala.util.Random

class NetworkDbSpec extends AnyFunSuite {

Expand Down Expand Up @@ -248,7 +249,7 @@ class NetworkDbSpec extends AnyFunSuite {
updates.foreach(u => db.updateChannel(u))
assert(db.listChannels().keySet === channels.map(_.shortChannelId).toSet)

val toDelete = channels.map(_.shortChannelId).drop(500).take(2500)
val toDelete = channels.map(_.shortChannelId).take(1 + Random.nextInt(2500))
db.removeChannels(toDelete)
assert(db.listChannels().keySet === (channels.map(_.shortChannelId).toSet -- toDelete))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,19 @@ object PgUtilsSpec extends Logging {
| port = $port
| username = "postgres"
| password = ""
| readonly-user = ""
| pool {
| max-size = 10 // recommended value = number_of_cpu_cores * 2
| connection-timeout = 30 seconds
| idle-timeout = 10 minutes
| max-life-time = 30 minutes
| }
| lock-type = "lease" // lease or none (do not use none in production)
| lease {
| interval = 5 seconds // lease-interval must be greater than lease-renew-interval
| renew-interval = 2 seconds
| lock-timeout = 5 seconds // timeout for the lock statement on the lease table
| }
| lock-type = "lease" // lease or none
|}
|""".stripMargin
)
Expand Down