Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve database compaction and prune-states #5142

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2376,6 +2376,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.cold_db.do_atomically(cold_ops)?;
}

// In order to reclaim space, we need to compact the freezer DB as well.
self.cold_db.compact()?;

Ok(())
}
}
Expand Down
28 changes: 9 additions & 19 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,15 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
self.transaction_mutex.lock()
}

/// Compact all values in the states and states flag columns.
fn compact(&self) -> Result<(), Error> {
let endpoints = |column: DBColumn| {
(
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
BytesKey::from_vec(get_key_for_col(
column.as_str(),
Hash256::repeat_byte(0xff).as_bytes(),
)),
)
};

for (start_key, end_key) in [
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
fn compact_column(&self, column: DBColumn) -> Result<(), Error> {
// Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for
// columns that may change size between sub-databases or schema versions.
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[]));
let end_key = BytesKey::from_vec(get_key_for_col(
column.as_str(),
&vec![0; std::cmp::max(column.key_size(), 32)],
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
));
self.db.compact(&start_key, &end_key);
Ok(())
}

Expand Down
18 changes: 16 additions & 2 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,22 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;

/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
/// Compact a single column in the database, freeing space used by deleted items.
fn compact_column(&self, column: DBColumn) -> Result<(), Error>;

/// Compact a default set of columns that are likely to free substantial space.
fn compact(&self) -> Result<(), Error> {
// Compact state and block related columns as they are likely to have the most churn,
// i.e. entries being created and deleted.
for column in [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconBlock,
] {
self.compact_column(column)?;
}
Ok(())
}

/// Iterate through all keys and values in a particular column.
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
self.transaction_mutex.lock()
}

fn compact(&self) -> Result<(), Error> {
fn compact_column(&self, _column: DBColumn) -> Result<(), Error> {
Ok(())
}
}
Expand Down
117 changes: 98 additions & 19 deletions database_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("freezer")
.long("freezer")
.help("Inspect the freezer DB rather than the hot DB")
.takes_value(false),
.takes_value(false)
.conflicts_with("blobs-db"),
)
.arg(
Arg::with_name("blobs-db")
.long("blobs-db")
.help("Inspect the blobs DB rather than the hot DB")
.takes_value(false)
.conflicts_with("freezer"),
)
.arg(
Arg::with_name("output-dir")
Expand All @@ -88,6 +96,34 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
)
}

pub fn compact_cli_app<'a, 'b>() -> App<'a, 'b> {
App::new("compact")
.setting(clap::AppSettings::ColoredHelp)
.about("Compact database manually")
.arg(
Arg::with_name("column")
.long("column")
.value_name("TAG")
.help("3-byte column ID (see `DBColumn`)")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("freezer")
.long("freezer")
.help("Inspect the freezer DB rather than the hot DB")
.takes_value(false)
.conflicts_with("blobs-db"),
)
.arg(
Arg::with_name("blobs-db")
.long("blobs-db")
.help("Inspect the blobs DB rather than the hot DB")
.takes_value(false)
.conflicts_with("freezer"),
)
}

pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune-payloads")
.alias("prune_payloads")
Expand Down Expand Up @@ -162,6 +198,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(migrate_cli_app())
.subcommand(version_cli_app())
.subcommand(inspect_cli_app())
.subcommand(compact_cli_app())
.subcommand(prune_payloads_app())
.subcommand(prune_blobs_app())
.subcommand(prune_states_app())
Expand Down Expand Up @@ -251,6 +288,7 @@ pub struct InspectConfig {
skip: Option<usize>,
limit: Option<usize>,
freezer: bool,
blobs_db: bool,
/// Configures where the inspect output should be stored.
output_dir: PathBuf,
}
Expand All @@ -261,6 +299,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
let skip = clap_utils::parse_optional(cli_args, "skip")?;
let limit = clap_utils::parse_optional(cli_args, "limit")?;
let freezer = cli_args.is_present("freezer");
let blobs_db = cli_args.is_present("blobs-db");

let output_dir: PathBuf =
clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new);
Expand All @@ -270,39 +309,28 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
skip,
limit,
freezer,
blobs_db,
output_dir,
})
}

pub fn inspect_db<E: EthSpec>(
inspect_config: InspectConfig,
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), String> {
let spec = runtime_context.eth2_config.spec.clone();
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec,
log,
)
.map_err(|e| format!("{:?}", e))?;

let mut total = 0;
let mut num_keys = 0;

let sub_db = if inspect_config.freezer {
&db.cold_db
LevelDB::<E>::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))?
} else if inspect_config.blobs_db {
LevelDB::<E>::open(&blobs_path).map_err(|e| format!("Unable to open blobs DB: {e:?}"))?
} else {
&db.hot_db
LevelDB::<E>::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))?
};

let skip = inspect_config.skip.unwrap_or(0);
Expand Down Expand Up @@ -384,6 +412,50 @@ pub fn inspect_db<E: EthSpec>(
Ok(())
}

pub struct CompactConfig {
column: DBColumn,
freezer: bool,
blobs_db: bool,
}

fn parse_compact_config(cli_args: &ArgMatches) -> Result<CompactConfig, String> {
let column = clap_utils::parse_required(cli_args, "column")?;
let freezer = cli_args.is_present("freezer");
let blobs_db = cli_args.is_present("blobs-db");
Ok(CompactConfig {
column,
freezer,
blobs_db,
})
}

pub fn compact_db<E: EthSpec>(
compact_config: CompactConfig,
client_config: ClientConfig,
log: Logger,
) -> Result<(), Error> {
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let column = compact_config.column;

let (sub_db, db_name) = if compact_config.freezer {
(LevelDB::<E>::open(&cold_path)?, "freezer_db")
} else if compact_config.blobs_db {
(LevelDB::<E>::open(&blobs_path)?, "blobs_db")
} else {
(LevelDB::<E>::open(&hot_path)?, "hot_db")
};
info!(
log,
"Compacting database";
"db" => db_name,
"column" => ?column
);
sub_db.compact_column(column)?;
Ok(())
}

pub struct MigrateConfig {
to: SchemaVersion,
}
Expand Down Expand Up @@ -537,7 +609,10 @@ pub fn prune_states<E: EthSpec>(
// Check that the user has confirmed they want to proceed.
if !prune_config.confirm {
match db.get_anchor_info() {
Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => {
Some(anchor_info)
if anchor_info.state_lower_limit == 0
&& anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN =>
{
info!(log, "States have already been pruned");
return Ok(());
}
Expand Down Expand Up @@ -585,7 +660,11 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
}
("inspect", Some(cli_args)) => {
let inspect_config = parse_inspect_config(cli_args)?;
inspect_db(inspect_config, client_config, &context, log)
inspect_db::<T>(inspect_config, client_config)
}
("compact", Some(cli_args)) => {
let compact_config = parse_compact_config(cli_args)?;
compact_db::<T>(compact_config, client_config, log).map_err(format_err)
}
("prune-payloads", Some(_)) => {
prune_payloads(client_config, &context, log).map_err(format_err)
Expand Down
Loading