Skip to content

Commit

Permalink
More database nits (#1773)
Browse files Browse the repository at this point in the history
* use prepared statements for pruning

* optional read-only user

It is good practice to create a dedicated read-only user to browse the
database safely. But since the app itself creates its tables, the
postgres user is the owner and a manual `GRANT` is required everytime a
new table is added.

This PR makes it possible to specify an arbitrary username, that will be
granted read-only access to all tables in the `public` schema.

NB: The assumption here is that eclair is the only app using the
eclair database (in the `CREATE DATABASE eclair` sense), which I believe
is reasonable.

* set timezone on lease table

This only affects newly created table, there is no migration.

Users that are already using postgres will keep the previous column
type, it doesn't change anything for them.

* put back lock timeout on lease table

We use a timeout, because due to concurrency we may not be able to
obtain a lock immediately.

The timeout has been set to its original value of 5s and made
configurable.

* obtain lock before initializing tables
  • Loading branch information
pm47 authored Apr 20, 2021
1 parent 32a86a4 commit 33d52b6
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 33 deletions.
4 changes: 3 additions & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,19 @@ eclair {
port = 5432
username = ""
password = ""
readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database
pool {
max-size = 10 // recommended value = number_of_cpu_cores * 2
connection-timeout = 30 seconds
idle-timeout = 10 minutes
max-life-time = 30 minutes
}
lock-type = "lease" // lease or none (do not use none in production)
lease {
interval = 5 minutes // lease-interval must be greater than lease-renew-interval
renew-interval = 1 minute
lock-timeout = 5 seconds // timeout for the lock statement on the lease table
}
lock-type = "lease" // lease or none
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,24 @@ object Databases extends Logging {
def apply(hikariConfig: HikariConfig,
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {
jdbcUrlFile_opt: Option[File],
readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig)
implicit val implicitLock: PgLock = lock

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
l.obtainExclusiveLock(ds)
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => l.obtainExclusiveLock(ds))
}

val databases = PostgresDatabases(
network = new PgNetworkDb,
audit = new PgAuditDb,
Expand All @@ -112,14 +123,13 @@ object Databases extends Logging {
dataSource = ds,
lock = lock)

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
databases.obtainExclusiveLock()
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
logger.info(s"granting read-only access to user=$readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
}
}
}

databases
Expand Down Expand Up @@ -183,6 +193,7 @@ object Databases extends Logging {
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
Expand All @@ -200,6 +211,10 @@ object Databases extends Logging {
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
// We use a timeout for locks, because we might not be able to get the lock right away due to concurrent access
// by other threads. That timeout gives time for other transactions to complete, then ours can take the lock
val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds
hikariConfig.setConnectionInitSql(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'")
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
}
Expand All @@ -210,7 +225,8 @@ object Databases extends Logging {
hikariConfig = hikariConfig,
instanceId = instanceId,
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile)
jdbcUrlFile_opt = Some(jdbcUrlFile),
readOnlyUser_opt = readOnlyUser_opt
)
}

Expand Down
19 changes: 12 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
val batchSize = 100
inTransaction { pg =>
using(pg.createStatement) { statement =>
using(pg.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -165,8 +169,9 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(pg.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.db.pg

import fr.acinq.eclair.db.Monitoring.Metrics._
import fr.acinq.eclair.db.Monitoring.Tags
import fr.acinq.eclair.db.jdbc.JdbcUtils
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -177,14 +179,16 @@ object PgUtils extends JdbcUtils {
using(connection.createStatement()) {
statement =>
// allow only one row in the ownership lease table
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP WITH TIME ZONE NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
}
}

private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = {
using(connection.createStatement()) {
statement =>
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE NOWAIT")
withMetrics("utils/lock", Tags.DbBackends.Postgres) {
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.db.sqlite

import java.sql.Connection
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand All @@ -27,6 +26,7 @@ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncement
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
import grizzled.slf4j.Logging

import java.sql.Connection
import scala.collection.immutable.SortedMap

class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
Expand Down Expand Up @@ -132,12 +132,16 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
val batchSize = 100
using(sqlite.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -153,8 +157,9 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(sqlite.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.{ShortChannelId, TxCoordinates}

import scala.collection.mutable
import scala.compat.Platform
import scala.concurrent.duration._

object StaleChannels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object TestDatabases {

// @formatter:off
override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection]
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile))
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile), readOnlyUser_opt = None)
override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion)
override def close(): Unit = pg.close()
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, Tor2}
import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, ShortChannelId, TestDatabases, randomBytes32, randomKey}
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.{SortedMap, mutable}
import scala.collection.{SortedMap, SortedSet, mutable}
import scala.util.Random

class NetworkDbSpec extends AnyFunSuite {

Expand Down Expand Up @@ -248,7 +249,7 @@ class NetworkDbSpec extends AnyFunSuite {
updates.foreach(u => db.updateChannel(u))
assert(db.listChannels().keySet === channels.map(_.shortChannelId).toSet)

val toDelete = channels.map(_.shortChannelId).drop(500).take(2500)
val toDelete = channels.map(_.shortChannelId).take(1 + Random.nextInt(2500))
db.removeChannels(toDelete)
assert(db.listChannels().keySet === (channels.map(_.shortChannelId).toSet -- toDelete))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,19 @@ object PgUtilsSpec extends Logging {
| port = $port
| username = "postgres"
| password = ""
| readonly-user = ""
| pool {
| max-size = 10 // recommended value = number_of_cpu_cores * 2
| connection-timeout = 30 seconds
| idle-timeout = 10 minutes
| max-life-time = 30 minutes
| }
| lock-type = "lease" // lease or none (do not use none in production)
| lease {
| interval = 5 seconds // lease-interval must be greater than lease-renew-interval
| renew-interval = 2 seconds
| lock-timeout = 5 seconds // timeout for the lock statement on the lease table
| }
| lock-type = "lease" // lease or none
|}
|""".stripMargin
)
Expand Down

0 comments on commit 33d52b6

Please sign in to comment.