Skip to content

Commit

Permalink
Fix Index recovery path for split(put) (#609)
Browse files Browse the repository at this point in the history
  • Loading branch information
shosseinimotlagh authored Dec 24, 2024
1 parent c388f69 commit 877c041
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 133 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.1"
version = "6.6.2"

homepage = "/~https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 5 additions & 4 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class IndexTableBase {
virtual uint64_t used_size() const = 0;
virtual void destroy() = 0;
virtual void repair_node(IndexBufferPtr const& buf) = 0;
virtual void repair_root_node(IndexBufferPtr const& buf) = 0;
};

enum class index_buf_state_t : uint8_t {
Expand All @@ -97,7 +98,7 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting
#ifndef NDEBUG
// Down buffers are not mandatory members, but only to keep track of any bugs and asserts
std::vector<std::weak_ptr<IndexBuffer> > m_down_buffers;
std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers;
std::mutex m_down_buffers_mtx;
std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging
#endif
Expand Down Expand Up @@ -125,11 +126,11 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
std::string to_string() const;
std::string to_string_dot() const;

void add_down_buffer(const IndexBufferPtr &buf);
void add_down_buffer(const IndexBufferPtr& buf);

void remove_down_buffer(const IndexBufferPtr &buf);
void remove_down_buffer(const IndexBufferPtr& buf);
#ifndef NDEBUG
bool is_in_down_buffers(const IndexBufferPtr &buf);
bool is_in_down_buffers(const IndexBufferPtr& buf);
#endif
};

Expand Down
49 changes: 39 additions & 10 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

void destroy() override {
auto cpg = cp_mgr().cp_guard();
Btree<K, V>::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
m_sb.destroy();
}

Expand Down Expand Up @@ -114,11 +114,40 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
return ret;
}

void repair_root_node(IndexBufferPtr const& idx_buf) override {
LOGTRACEMOD(wbcache, "check if this was the previous root node {} for buf {} ", m_sb->root_node,
idx_buf->to_string());
if (m_sb->root_node == idx_buf->blkid().to_integer()) {
// This is the root node, we need to update the root node in superblk
LOGTRACEMOD(wbcache, "{} is old root so we need to update the meta node ", idx_buf->to_string());
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
BtreeNode::identify_leaf_node(idx_buf->raw_buffer()));
static_cast< IndexBtreeNode* >(n)->attach_buf(idx_buf);
auto edge_id = n->next_bnode();

BT_DBG_ASSERT(!n->has_valid_edge(),
"root {} already has a valid edge {}, so we should have found the new root node",
n->to_string(), n->get_edge_value().bnode_id());
n->set_next_bnode(empty_bnodeid);
n->set_edge_value(BtreeLinkInfo{edge_id, 0});
LOGTRACEMOD(wbcache, "change root node {}: edge updated to {} and invalidate the next node! ", n->node_id(),
edge_id);
auto cpg = cp_mgr().cp_guard();
write_node_impl(n, (void*)cpg.context(cp_consumer_t::INDEX_SVC));

} else {
LOGTRACEMOD(wbcache, "This is not the root node, so we can ignore this repair call for buf {}",
idx_buf->to_string());
}
}

void repair_node(IndexBufferPtr const& idx_buf) override {
if (idx_buf->is_meta_buf()) {
// We cannot repair the meta buf on its own, we need to repair the root node which modifies the
// meta_buf. It is ok to ignore this call, because repair will be done from root before meta_buf is
// attempted to repair, which would have updated the meta_buf already.
LOGTRACEMOD(wbcache, "Ignoring repair on meta buf {} root id {} ", idx_buf->to_string(),
this->root_node_id());
return;
}
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
Expand All @@ -134,13 +163,14 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Only for interior nodes we need to repair its links
if (!bn->is_leaf()) {
LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string());
repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}

if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means that we are the new root node, we need to update the
// meta_buf with new root as well
on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
LOGTRACEMOD(wbcache, "root change for after repairing {}\n\n", idx_buf->to_string());
on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}
}

Expand Down Expand Up @@ -227,10 +257,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context));
}

btree_status_t
on_root_changed(BtreeNodePtr const &new_root, void *context) override {
btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override {
// todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){
// return btree_status_t::success;}
LOGTRACEMOD(wbcache, "root changed for index old_root={} new_root={}", m_sb->root_node,
new_root->node_id());
m_sb->root_node = new_root->node_id();
m_sb->root_link_version = new_root->link_version();

Expand All @@ -240,7 +271,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf;
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast<CPContext *>(context));
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context));
return btree_status_t::success;
}

Expand All @@ -257,7 +288,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

// Get all original child ids as a support to check if we are beyond the last child node
std::set<bnodeid_t> orig_child_ids;
std::set< bnodeid_t > orig_child_ids;
for (uint32_t i = 0; i < parent_node->total_entries(); ++i) {
BtreeLinkInfo link_info;
parent_node->get_nth_value(i, &link_info, true);
Expand Down Expand Up @@ -391,9 +422,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
} while (true);

if (child_node) {
this->unlock_node(child_node, locktype_t::READ);
}
if (child_node) { this->unlock_node(child_node, locktype_t::READ); }

if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) {
// We shouldn't have an empty interior node in the tree, let's delete it.
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class IndexService {
uint64_t used_size() const;
uint32_t node_size() const;
void repair_index_node(uint32_t ordinal, IndexBufferPtr const& node_buf);
void update_root(uint32_t ordinal, IndexBufferPtr const& node_buf);

IndexWBCacheBase& wb_cache() {
if (!m_wb_cache) { throw std::runtime_error("Attempted to access a null pointer wb_cache"); }
Expand Down
23 changes: 11 additions & 12 deletions src/lib/index/index_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void IndexCPContext::to_string_dot(const std::string& filename) {
LOGINFO("cp dag is stored in file {}", filename);
}

uint16_t IndexCPContext::num_dags() {
uint16_t IndexCPContext::num_dags() {
// count number of buffers whose up_buffers are nullptr
uint16_t count = 0;
std::unique_lock lg{m_flush_buffer_mtx};
Expand Down Expand Up @@ -190,15 +190,18 @@ std::string IndexCPContext::to_string_with_dags() {
// Now walk through the list of graphs and prepare formatted string
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} #_of_dags={}\n",
m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size(), group_roots.size())};
int cnt = 1;
for (const auto& root : group_roots) {
std::vector< std::pair< std::shared_ptr< DagNode >, int > > stack;
stack.emplace_back(root, 0);
std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack;
stack.emplace_back(root, 0, cnt++);
while (!stack.empty()) {
auto [node, level] = stack.back();
auto [node, level, index] = stack.back();
stack.pop_back();
fmt::format_to(std::back_inserter(str), "{}{} \n", std::string(level * 4, ' '), node->buf->to_string());
fmt::format_to(std::back_inserter(str), "{}{}-{} \n", std::string(level * 4, ' '), index,
node->buf->to_string());
int c = node->down_nodes.size();
for (const auto& d : node->down_nodes) {
stack.emplace_back(d, level + 1);
stack.emplace_back(d, level + 1, c--);
}
}
}
Expand Down Expand Up @@ -266,15 +269,11 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId,
#ifndef NDEBUG
// if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;}
// Already linked with same buf or its not a sibling link to override
if (real_up_buf->is_in_down_buffers(buf)) {
return buf;
}
if (real_up_buf->is_in_down_buffers(buf)) { return buf; }
#endif

if (buf->m_up_buffer != real_up_buf) {
if (buf->m_up_buffer) {
buf->m_up_buffer->remove_down_buffer(buf);
}
if (buf->m_up_buffer) { buf->m_up_buffer->remove_down_buffer(buf); }
real_up_buf->add_down_buffer(buf);
buf->m_up_buffer = real_up_buf;
}
Expand Down
68 changes: 42 additions & 26 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ void IndexService::repair_index_node(uint32_t ordinal, IndexBufferPtr const& nod
}
}

void IndexService::update_root(uint32_t ordinal, IndexBufferPtr const& node_buf) {
auto tbl = get_index_table(ordinal);
if (tbl) {
tbl->repair_root_node(node_buf);
} else {
HS_DBG_ASSERT(false, "Index corresponding to ordinal={} has not been loaded yet, unexpected", ordinal);
}
}

uint32_t IndexService::node_size() const { return m_vdev->atomic_page_size(); }

uint64_t IndexService::used_size() const {
Expand All @@ -154,31 +163,39 @@ IndexBuffer::~IndexBuffer() {
}

std::string IndexBuffer::to_string() const {
if (m_is_meta_buf) {
return fmt::format("Buf={} [Meta] index={} state={} create/dirty_cp={}/{} down_wait#={} freed={}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed);
} else {
// store m_down_buffers in a string
std::string down_bufs = "";
static std::vector< std::string > state_str = {"CLEAN", "DIRTY", "FLUSHING"};
// store m_down_buffers in a string
std::string down_bufs = "";
#ifndef NDEBUG
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto const &down_buf: m_down_buffers) {
{
std::lock_guard lg(m_down_buffers_mtx);
if (m_down_buffers.empty()) {
fmt::format_to(std::back_inserter(down_bufs), "EMPTY");
} else {
for (auto const& down_buf : m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
}
}
fmt::format_to(std::back_inserter(down_bufs), " #down bufs={}", m_down_buffers.size());
}
}
#endif

return fmt::format("Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down=[{}]",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(),
m_node_freed ? " Freed" : "", voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet"
: r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
if (m_is_meta_buf) {
return fmt::format("[Meta] Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal,
state_str[int_cast(state())], m_created_cp_id, m_dirtied_cp_id,
m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", down_bufs);
} else {

return fmt::format(
"Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, state_str[int_cast(state())],
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "",
voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet" : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
}
}

Expand All @@ -194,7 +211,7 @@ std::string IndexBuffer::to_string_dot() const {
return str;
}

void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::add_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.increment();
#ifndef NDEBUG
{
Expand All @@ -204,10 +221,11 @@ void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
#endif
}

void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::remove_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false}; {
bool found{false};
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
Expand All @@ -222,12 +240,10 @@ void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
}

#ifndef NDEBUG
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) {
std::lock_guard<std::mutex> lg(m_down_buffers_mtx);
for (auto const &dbuf: m_down_buffers) {
if (dbuf.lock() == buf) {
return true;
}
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr& buf) {
std::lock_guard< std::mutex > lg(m_down_buffers_mtx);
for (auto const& dbuf : m_down_buffers) {
if (dbuf.lock() == buf) { return true; }
}
return false;
}
Expand Down
Loading

0 comments on commit 877c041

Please sign in to comment.