Skip to content

Commit

Permalink
force migration to new codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed Jul 12, 2021
1 parent b63527a commit 2c2078e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
import lock._

val DB_NAME = "channels"
val CURRENT_VERSION = 6
val CURRENT_VERSION = 7

inTransaction { pg =>
using(pg.createStatement()) { statement =>
Expand Down Expand Up @@ -79,6 +79,23 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("ALTER TABLE htlc_infos SET SCHEMA local")
}

def migration67(): Unit = {
migrateTable(pg, pg,
"local.channels",
s"UPDATE local.channels SET data=?, json=?::JSONB WHERE channel_id=?",
(rs, statement) => {
// This forces a re-serialization of the channel data with latest codecs, because as of codecs v3 we don't
// store local commitment signatures anymore, and we want to clean up existing data
val state = stateDataCodec.decode(BitVector(rs.getBytes("data"))).require.value
val data = stateDataCodec.encode(state).require.toByteArray
val json = serialization.writePretty(state)
statement.setBytes(1, data)
statement.setString(2, json)
statement.setString(3, state.channelId.toHex)
}
)(logger)
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
Expand All @@ -95,18 +112,25 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
migration34(statement)
migration45(statement)
migration56(statement)
migration67()
case Some(v@3) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
migration56(statement)
migration67()
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
migration67()
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
migration67()
case Some(v@6) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration67()
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Connection, Statement}

Expand All @@ -34,7 +35,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
import SqliteUtils._

val DB_NAME = "channels"
val CURRENT_VERSION = 3
val CURRENT_VERSION = 4

/**
* The SQLite documentation states that "It is not possible to enable or disable foreign key constraints in the middle
Expand All @@ -59,6 +60,21 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp INTEGER")
}

def migration34(): Unit = {
migrateTable(sqlite, sqlite,
"local_channels",
s"UPDATE local_channels SET data=? WHERE channel_id=?",
(rs, statement) => {
// This forces a re-serialization of the channel data with latest codecs, because as of codecs v3 we don't
// store local commitment signatures anymore, and we want to clean up existing data
val state = stateDataCodec.decode(BitVector(rs.getBytes("data"))).require.value
val data = stateDataCodec.encode(state).require.toByteArray
statement.setBytes(1, data)
statement.setString(2, state.channelId.toHex)
}
)(logger)
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)")
Expand All @@ -68,9 +84,14 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
migration34()
case Some(v@2) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
migration34()
case Some(v@3) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34()
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class ChannelsDbSpec extends AnyFunSuite {
}
}

test("migrate channel database v1 -> v3 (sqlite)") {
test("migrate channel database v1 -> v4 (sqlite)") {
forAllDbs {
case _: TestPgDatabases => // no migration
case dbs: TestSqliteDatabases =>
Expand Down Expand Up @@ -187,7 +187,7 @@ class ChannelsDbSpec extends AnyFunSuite {
// check that db migration works
val db = new SqliteChannelsDb(sqlite)
using(sqlite.createStatement()) { statement =>
assert(getVersion(statement, "channels").contains(3))
assert(getVersion(statement, "channels").contains(4))
}
assert(db.listLocalChannels().size === testCases.size)
for (testCase <- testCases) {
Expand All @@ -199,7 +199,7 @@ class ChannelsDbSpec extends AnyFunSuite {
}
}

test("migrate channel database v2 -> v3/v6") {
test("migrate channel database v2 -> v4/v7") {
def postCheck(channelsDb: ChannelsDb): Unit = {
assert(channelsDb.listLocalChannels().size === testCases.filterNot(_.isClosed).size)
for (testCase <- testCases.filterNot(_.isClosed)) {
Expand Down Expand Up @@ -242,7 +242,7 @@ class ChannelsDbSpec extends AnyFunSuite {
}
},
dbName = "channels",
targetVersion = 6,
targetVersion = 7,
postCheck = _ => postCheck(dbs.channels)
)
case dbs: TestSqliteDatabases =>
Expand Down Expand Up @@ -277,7 +277,7 @@ class ChannelsDbSpec extends AnyFunSuite {
}
},
dbName = "channels",
targetVersion = 3,
targetVersion = 4,
postCheck = _ => postCheck(dbs.channels)
)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ class ChannelsDbSpec extends AnyFunSuite {
}
},
dbName = "channels",
targetVersion = 6,
targetVersion = 7,
postCheck = connection => {
assert(dbs.channels.listLocalChannels().size === testCases.filterNot(_.isClosed).size)
testCases.foreach { testCase =>
Expand Down

0 comments on commit 2c2078e

Please sign in to comment.