diff --git a/src/index.rs b/src/index.rs index 1e1cf70405..f366f3c899 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,6 +4,7 @@ use { BlockHashValue, Entry, InscriptionEntry, InscriptionEntryValue, InscriptionIdValue, OutPointValue, SatPointValue, SatRange, }, + reorg::*, updater::Updater, }, super::*, @@ -19,11 +20,11 @@ use { }, std::collections::HashMap, std::io::{BufWriter, Read, Write}, - std::sync::atomic::{self, AtomicBool}, }; mod entry; mod fetcher; +mod reorg; mod rtx; mod updater; @@ -55,18 +56,6 @@ define_table! { SAT_TO_SATPOINT, u64, &SatPointValue } define_table! { STATISTIC_TO_COUNT, u64, u64 } define_table! { WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP, u64, u128 } -pub(crate) struct Index { - client: Client, - database: Database, - path: PathBuf, - first_inscription_height: u64, - genesis_block_coinbase_transaction: Transaction, - genesis_block_coinbase_txid: Txid, - height_limit: Option, - options: Options, - reorged: AtomicBool, -} - #[derive(Debug, PartialEq)] pub(crate) enum List { Spent, @@ -143,6 +132,18 @@ impl BitcoinCoreRpcResultExt for Result { } } +pub(crate) struct Index { + client: Client, + database: Database, + path: PathBuf, + first_inscription_height: u64, + genesis_block_coinbase_transaction: Transaction, + genesis_block_coinbase_txid: Txid, + height_limit: Option, + options: Options, + unrecoverably_reorged: AtomicBool, +} + impl Index { pub(crate) fn open(options: &Options) -> Result { let client = options.bitcoin_rpc_client()?; @@ -221,11 +222,7 @@ impl Index { let mut tx = database.begin_write()?; - if cfg!(test) { - tx.set_durability(redb::Durability::None); - } else { - tx.set_durability(redb::Durability::Immediate); - }; + tx.set_durability(redb::Durability::Immediate); tx.open_table(HEIGHT_TO_BLOCK_HASH)?; tx.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?; @@ -263,8 +260,8 @@ impl Index { first_inscription_height: options.first_inscription_height(), genesis_block_coinbase_transaction, height_limit: options.height_limit, - reorged: AtomicBool::new(false), options: options.clone(), + unrecoverably_reorged: AtomicBool::new(false), }) } @@ -396,7 +393,31 @@ impl Index { } pub(crate) fn update(&self) -> Result { - Updater::update(self) + let mut updater = Updater::new(self)?; + + loop { + match updater.update_index() { + Ok(ok) => return Ok(ok), + Err(err) => { + log::info!("{}", err.to_string()); + + match err.downcast_ref() { + Some(&ReorgError::Recoverable((height, depth))) => { + Reorg::handle_reorg(self, height, depth)?; + + updater = Updater::new(self)?; + } + Some(&ReorgError::Unrecoverable) => { + self + .unrecoverably_reorged + .store(true, atomic::Ordering::Relaxed); + return Err(anyhow!(ReorgError::Unrecoverable)); + } + _ => return Err(err), + }; + } + } + } } pub(crate) fn export(&self, filename: &String, include_addresses: bool) -> Result { @@ -464,12 +485,12 @@ impl Index { Ok(()) } - pub(crate) fn is_json_api_enabled(&self) -> bool { - self.options.enable_json_api + pub(crate) fn is_unrecoverably_reorged(&self) -> bool { + self.unrecoverably_reorged.load(atomic::Ordering::Relaxed) } - pub(crate) fn is_reorged(&self) -> bool { - self.reorged.load(atomic::Ordering::Relaxed) + pub(crate) fn is_json_api_enabled(&self) -> bool { + self.options.enable_json_api } fn begin_read(&self) -> Result { @@ -477,13 +498,7 @@ impl Index { } fn begin_write(&self) -> Result { - if cfg!(test) { - let mut tx = self.database.begin_write()?; - tx.set_durability(redb::Durability::None); - Ok(tx) - } else { - Ok(self.database.begin_write()?) - } + Ok(self.database.begin_write()?) } fn increment_statistic(wtx: &WriteTransaction, statistic: Statistic, n: u64) -> Result { @@ -1004,6 +1019,65 @@ impl Index { } } + #[cfg(test)] + fn assert_non_existence_of_inscription(&self, inscription_id: InscriptionId) { + let rtx = self.database.begin_read().unwrap(); + + let inscription_id_to_satpoint = rtx.open_table(INSCRIPTION_ID_TO_SATPOINT).unwrap(); + assert!(inscription_id_to_satpoint + .get(&inscription_id.store()) + .unwrap() + .is_none()); + + let inscription_id_to_entry = rtx.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY).unwrap(); + assert!(inscription_id_to_entry + .get(&inscription_id.store()) + .unwrap() + .is_none()); + + for range in rtx + .open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID) + .unwrap() + .iter() + .into_iter() + { + for entry in range.into_iter() { + let (_number, id) = entry.unwrap(); + assert!(InscriptionId::load(*id.value()) != inscription_id); + } + } + + for range in rtx + .open_multimap_table(SATPOINT_TO_INSCRIPTION_ID) + .unwrap() + .iter() + .into_iter() + { + for entry in range.into_iter() { + let (_satpoint, ids) = entry.unwrap(); + assert!(!ids + .into_iter() + .any(|id| InscriptionId::load(*id.unwrap().value()) == inscription_id)) + } + } + + if self.has_sat_index().unwrap() { + for range in rtx + .open_multimap_table(SAT_TO_INSCRIPTION_ID) + .unwrap() + .iter() + .into_iter() + { + for entry in range.into_iter() { + let (_sat, ids) = entry.unwrap(); + assert!(!ids + .into_iter() + .any(|id| InscriptionId::load(*id.unwrap().value()) == inscription_id)) + } + } + } + } + fn inscriptions_on_output_unordered<'a: 'tx, 'tx>( satpoint_to_id: &'a impl ReadableMultimapTable<&'static SatPointValue, &'static InscriptionIdValue>, outpoint: OutPoint, @@ -3205,4 +3279,158 @@ mod tests { ) } } + + #[test] + fn recover_from_reorg() { + for context in Context::configurations() { + context.mine_blocks(1); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(1, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + let first_id = InscriptionId { txid, index: 0 }; + let first_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(6); + + context + .index + .assert_inscription_location(first_id, first_location, Some(50 * COIN_VALUE)); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(2, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + let second_id = InscriptionId { txid, index: 0 }; + let second_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(1); + + context + .index + .assert_inscription_location(second_id, second_location, Some(100 * COIN_VALUE)); + + context.rpc_server.invalidate_tip(); + context.mine_blocks(2); + + context + .index + .assert_inscription_location(first_id, first_location, Some(50 * COIN_VALUE)); + + context.index.assert_non_existence_of_inscription(second_id); + } + } + + #[test] + fn recover_from_3_block_deep_and_consecutive_reorg() { + for context in Context::configurations() { + context.mine_blocks(1); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(1, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + let first_id = InscriptionId { txid, index: 0 }; + let first_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(10); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(2, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + let second_id = InscriptionId { txid, index: 0 }; + let second_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(1); + + context + .index + .assert_inscription_location(second_id, second_location, Some(100 * COIN_VALUE)); + + context.rpc_server.invalidate_tip(); + context.rpc_server.invalidate_tip(); + context.rpc_server.invalidate_tip(); + + context.mine_blocks(4); + + context.index.assert_non_existence_of_inscription(second_id); + + context.rpc_server.invalidate_tip(); + + context.mine_blocks(2); + + context + .index + .assert_inscription_location(first_id, first_location, Some(50 * COIN_VALUE)); + } + } + + #[test] + fn recover_from_very_unlikely_7_block_deep_reorg() { + for context in Context::configurations() { + context.mine_blocks(1); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(1, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + + context.mine_blocks(11); + + let first_id = InscriptionId { txid, index: 0 }; + let first_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(2, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + + let second_id = InscriptionId { txid, index: 0 }; + let second_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(7); + + context + .index + .assert_inscription_location(second_id, second_location, Some(100 * COIN_VALUE)); + + for _ in 0..7 { + context.rpc_server.invalidate_tip(); + } + + context.mine_blocks(9); + + context.index.assert_non_existence_of_inscription(second_id); + + context + .index + .assert_inscription_location(first_id, first_location, Some(50 * COIN_VALUE)); + } + } } diff --git a/src/index/reorg.rs b/src/index/reorg.rs new file mode 100644 index 0000000000..8de5da973c --- /dev/null +++ b/src/index/reorg.rs @@ -0,0 +1,108 @@ +use {super::*, updater::BlockData}; + +#[derive(Debug, PartialEq)] +pub(crate) enum ReorgError { + Recoverable((u64, u64)), + Unrecoverable, +} + +impl fmt::Display for ReorgError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ReorgError::Recoverable((height, depth)) => { + write!(f, "{depth} block deep reorg detected at height {height}") + } + ReorgError::Unrecoverable => write!(f, "unrecoverable reorg detected"), + } + } +} + +impl std::error::Error for ReorgError {} + +const MAX_SAVEPOINTS: usize = 2; +const SAVEPOINT_INTERVAL: u64 = 10; +const CHAIN_TIP_DISTANCE: u64 = 21; + +pub(crate) struct Reorg {} + +impl Reorg { + pub(crate) fn detect_reorg(block: &BlockData, height: u64, index: &Index) -> Result { + let bitcoind_prev_blockhash = block.header.prev_blockhash; + + match index.block_hash(height.checked_sub(1))? { + Some(index_prev_blockhash) if index_prev_blockhash == bitcoind_prev_blockhash => Ok(()), + Some(index_prev_blockhash) if index_prev_blockhash != bitcoind_prev_blockhash => { + let max_recoverable_reorg_depth = + (MAX_SAVEPOINTS as u64 - 1) * SAVEPOINT_INTERVAL + height % SAVEPOINT_INTERVAL; + + for depth in 1..max_recoverable_reorg_depth { + let index_block_hash = index.block_hash(height.checked_sub(depth))?; + let bitcoind_block_hash = index + .client + .get_block_hash(height.saturating_sub(depth)) + .into_option()?; + + if index_block_hash == bitcoind_block_hash { + return Err(anyhow!(ReorgError::Recoverable((depth, height)))); + } + } + + Err(anyhow!(ReorgError::Unrecoverable)) + } + _ => Ok(()), + } + } + + pub(crate) fn handle_reorg(index: &Index, height: u64, depth: u64) -> Result { + log::info!("rolling back database after reorg of depth {depth} at height {height}"); + + let mut wtx = index.begin_write()?; + + let oldest_savepoint = + wtx.get_persistent_savepoint(wtx.list_persistent_savepoints()?.min().unwrap())?; + + wtx.restore_savepoint(&oldest_savepoint)?; + + Index::increment_statistic(&wtx, Statistic::Commits, 1)?; + wtx.commit()?; + + log::info!( + "successfully rolled back database to height {}", + index.block_count()? + ); + + Ok(()) + } + + pub(crate) fn update_savepoints(index: &Index, height: u64) -> Result { + if (height < SAVEPOINT_INTERVAL || height % SAVEPOINT_INTERVAL == 0) + && index + .client + .get_blockchain_info()? + .headers + .saturating_sub(height) + <= CHAIN_TIP_DISTANCE + { + let wtx = index.begin_write()?; + + let savepoints = wtx.list_persistent_savepoints()?.collect::>(); + + if savepoints.len() >= MAX_SAVEPOINTS { + wtx.delete_persistent_savepoint(savepoints.into_iter().min().unwrap())?; + } + + Index::increment_statistic(&wtx, Statistic::Commits, 1)?; + wtx.commit()?; + + let wtx = index.begin_write()?; + + log::debug!("creating savepoint at height {}", height); + wtx.persistent_savepoint()?; + + Index::increment_statistic(&wtx, Statistic::Commits, 1)?; + wtx.commit()?; + } + + Ok(()) + } +} diff --git a/src/index/updater.rs b/src/index/updater.rs index 7b78fb435e..0828f156d3 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -8,9 +8,9 @@ use { mod inscription_updater; -struct BlockData { - header: Header, - txdata: Vec<(Transaction, Txid)>, +pub(crate) struct BlockData { + pub(crate) header: Header, + pub(crate) txdata: Vec<(Transaction, Txid)>, } impl From for BlockData { @@ -29,9 +29,10 @@ impl From for BlockData { } } -pub(crate) struct Updater { +pub(crate) struct Updater<'index> { range_cache: HashMap>, height: u64, + index: &'index Index, index_sats: bool, sat_ranges_since_flush: u64, outputs_cached: u64, @@ -39,48 +40,34 @@ pub(crate) struct Updater { outputs_traversed: u64, } -impl Updater { - pub(crate) fn update(index: &Index) -> Result { - let wtx = index.begin_write()?; +impl<'index> Updater<'_> { + pub(crate) fn new(index: &'index Index) -> Result> { + Ok(Updater { + range_cache: HashMap::new(), + height: index.block_count()?, + index, + index_sats: index.has_sat_index()?, + sat_ranges_since_flush: 0, + outputs_cached: 0, + outputs_inserted_since_flush: 0, + outputs_traversed: 0, + }) + } - let height = wtx - .open_table(HEIGHT_TO_BLOCK_HASH)? - .range(0..)? - .next_back() - .and_then(|result| result.ok()) - .map(|(height, _hash)| height.value() + 1) - .unwrap_or(0); + pub(crate) fn update_index(&mut self) -> Result { + let mut wtx = self.index.begin_write()?; + let starting_height = self.index.client.get_block_count()? + 1; wtx .open_table(WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP)? .insert( - &height, + &self.height, &SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map(|duration| duration.as_millis()) .unwrap_or(0), )?; - let mut updater = Self { - range_cache: HashMap::new(), - height, - index_sats: index.has_sat_index()?, - sat_ranges_since_flush: 0, - outputs_cached: 0, - outputs_inserted_since_flush: 0, - outputs_traversed: 0, - }; - - updater.update_index(index, wtx) - } - - fn update_index<'index>( - &mut self, - index: &'index Index, - mut wtx: WriteTransaction<'index>, - ) -> Result { - let starting_height = index.client.get_block_count()? + 1; - let mut progress_bar = if cfg!(test) || log_enabled!(log::Level::Info) || starting_height <= self.height @@ -96,15 +83,15 @@ impl Updater { Some(progress_bar) }; - let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?; + let rx = Self::fetch_blocks_from(self.index, self.height, self.index_sats)?; - let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?; + let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(self.index)?; let mut uncommitted = 0; let mut value_cache = HashMap::new(); while let Ok(block) = rx.recv() { self.index_block( - index, + self.index, &mut outpoint_sender, &mut value_receiver, &mut wtx, @@ -116,7 +103,7 @@ impl Updater { progress_bar.inc(1); if progress_bar.position() > progress_bar.length().unwrap() { - if let Ok(count) = index.client.get_block_count() { + if let Ok(count) = self.index.client.get_block_count() { progress_bar.set_length(count + 1); } else { log::warn!("Failed to fetch latest block height"); @@ -130,7 +117,7 @@ impl Updater { self.commit(wtx, value_cache)?; value_cache = HashMap::new(); uncommitted = 0; - wtx = index.begin_write()?; + wtx = self.index.begin_write()?; let height = wtx .open_table(HEIGHT_TO_BLOCK_HASH)? .range(0..)? @@ -330,6 +317,19 @@ impl Updater { block: BlockData, value_cache: &mut HashMap, ) -> Result<()> { + Reorg::detect_reorg(&block, self.height, self.index)?; + + let start = Instant::now(); + let mut sat_ranges_written = 0; + let mut outputs_in_block = 0; + + log::info!( + "Block {} at {} with {} transactions…", + self.height, + timestamp(block.header.time), + block.txdata.len() + ); + // If value_receiver still has values something went wrong with the last block // Could be an assert, shouldn't recover from this and commit the last block let Err(TryRecvError::Empty) = value_receiver.try_recv() else { @@ -375,29 +375,6 @@ impl Updater { } let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?; - - let start = Instant::now(); - let mut sat_ranges_written = 0; - let mut outputs_in_block = 0; - - let time = timestamp(block.header.time); - - log::info!( - "Block {} at {} with {} transactions…", - self.height, - time, - block.txdata.len() - ); - - if let Some(prev_height) = self.height.checked_sub(1) { - let prev_hash = height_to_block_hash.get(&prev_height)?.unwrap(); - - if prev_hash.value() != &block.header.prev_blockhash.as_raw_hash().to_byte_array() { - index.reorged.store(true, atomic::Ordering::Relaxed); - return Err(anyhow!("reorg detected at or before {prev_height}")); - } - } - let mut inscription_id_to_inscription_entry = wtx.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?; let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?; @@ -655,8 +632,10 @@ impl Updater { Index::increment_statistic(&wtx, Statistic::SatRanges, self.sat_ranges_since_flush)?; self.sat_ranges_since_flush = 0; Index::increment_statistic(&wtx, Statistic::Commits, 1)?; - wtx.commit()?; + + Reorg::update_savepoints(self.index, self.height)?; + Ok(()) } } diff --git a/src/subcommand/server.rs b/src/subcommand/server.rs index d33290c4e3..8b1b1476b8 100644 --- a/src/subcommand/server.rs +++ b/src/subcommand/server.rs @@ -569,10 +569,10 @@ impl Server { } async fn status(Extension(index): Extension>) -> (StatusCode, &'static str) { - if index.is_reorged() { + if index.is_unrecoverably_reorged() { ( StatusCode::OK, - "reorg detected, please rebuild the database.", + "unrecoverable reorg detected, please rebuild the database.", ) } else { ( @@ -1944,17 +1944,20 @@ mod tests { } #[test] - fn detect_reorg() { + fn detect_unrecoverable_reorg() { let test_server = TestServer::new(); - test_server.mine_blocks(1); + test_server.mine_blocks(21); test_server.assert_response("/status", StatusCode::OK, "OK"); - test_server.bitcoin_rpc_server.invalidate_tip(); - test_server.bitcoin_rpc_server.mine_blocks(2); + for _ in 0..15 { + test_server.bitcoin_rpc_server.invalidate_tip(); + } + + test_server.bitcoin_rpc_server.mine_blocks(21); - test_server.assert_response_regex("/status", StatusCode::OK, "reorg detected.*"); + test_server.assert_response_regex("/status", StatusCode::OK, "unrecoverable reorg detected.*"); } #[test] @@ -2031,7 +2034,8 @@ mod tests { fn commits_are_tracked() { let server = TestServer::new(); - assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 1); + thread::sleep(Duration::from_millis(25)); + assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 3); let info = server.index.info().unwrap(); assert_eq!(info.transactions.len(), 1); @@ -2039,7 +2043,7 @@ mod tests { server.index.update().unwrap(); - assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 1); + assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 3); let info = server.index.info().unwrap(); assert_eq!(info.transactions.len(), 1); @@ -2050,7 +2054,7 @@ mod tests { thread::sleep(Duration::from_millis(10)); server.index.update().unwrap(); - assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 2); + assert_eq!(server.index.statistic(crate::index::Statistic::Commits), 6); let info = server.index.info().unwrap(); assert_eq!(info.transactions.len(), 2);