Skip to content

Commit

Permalink
skip appending dummy logs to log dev
Browse files Browse the repository at this point in the history
1. directly update indices when there are holes in baseline resync
2. update definition of logdev_key::out_of_bound_ld_key and use it to imply log dev can truncate freely.
3. remove is_active check in HomeLogStore::flush to unblock flush when there are holes in m_records
  • Loading branch information
yawzhang committed Jan 14, 2025
1 parent 96f8e43 commit b8f76c6
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 33 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.8"
version = "6.6.10"

homepage = "/~https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
10 changes: 10 additions & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {

logdev_key get_trunc_ld_key() const { return m_trunc_ld_key; }

/**
* @brief Get the truncation information for this log store. It is called during log device truncation
*
* @return tuple of (start_lsn, trunc_ld_key, tail_lsn) If the log store is empty, it will return
* an out_of_bound_ld_key as trunc_ld_key.
*
* @note ensure that no new logs are flushed between calling this function and completing the truncation,
* as this could result in an inaccurate out_of_bound_ld_key.
* */
std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > truncate_info() const;

sisl::StreamTracker< logstore_record >& log_records() { return m_records; }
Expand Down Expand Up @@ -232,6 +241,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {

auto start_lsn() const { return m_start_lsn.load(std::memory_order_acquire); }
auto tail_lsn() const { return m_tail_lsn.load(std::memory_order_acquire); }
auto next_lsn() const { return m_next_lsn.load(std::memory_order_acquire); }

nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req());

Expand Down
3 changes: 2 additions & 1 deletion src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ struct logdev_key {
std::string to_string() const { return fmt::format("Logid={} devoffset={}", idx, dev_offset); }

static const logdev_key& out_of_bound_ld_key() {
static constexpr logdev_key s_out_of_bound_ld_key{std::numeric_limits< logid_t >::max(), 0};
static constexpr logdev_key s_out_of_bound_ld_key{std::numeric_limits< logid_t >::max(),
std::numeric_limits< off_t >::max()};
return s_out_of_bound_ld_key;
}
};
Expand Down
18 changes: 9 additions & 9 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void LogDev::stop() {
m_log_idx.store(0);
m_pending_flush_size.store(0);
m_last_flush_idx = -1;
m_last_flush_ld_key = logdev_key{0, 0};
m_last_truncate_idx = -1;
m_last_crc = INVALID_CRC32_VALUE;

Expand Down Expand Up @@ -501,6 +502,7 @@ void LogDev::on_flush_completion(LogGroup* lg) {
free_log_group(lg);
m_log_records->truncate(upto_indx);
m_last_flush_idx = upto_indx;
m_last_flush_ld_key = logdev_key{from_indx, dev_offset};

// since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion
for (auto const& [idx, req] : req_map) {
Expand Down Expand Up @@ -530,20 +532,18 @@ uint64_t LogDev::truncate() {
auto lstore = store.log_store;
if (lstore == nullptr) { continue; }
auto const [trunc_lsn, trunc_ld_key, tail_lsn] = lstore->truncate_info();
if (trunc_lsn == tail_lsn) {
THIS_LOGDEV_LOG(DEBUG, "Store_id={} didn't have any writes since last truncation, skipping ", store_id);
m_logdev_meta.remove_all_rollback_records(store_id, m_stopped /* persist_now */);
continue;
}
HS_DBG_ASSERT_GE(trunc_ld_key.idx, m_last_truncate_idx, "Trying to truncate logid which is already truncated");
m_logdev_meta.update_store_superblk(store_id, logstore_superblk(trunc_lsn + 1), m_stopped /* persist_now */);

// We found a new minimum logdev_key that we can truncate to
if (trunc_ld_key.idx > 0 && trunc_ld_key.idx < min_safe_ld_key.idx) { min_safe_ld_key = trunc_ld_key; }
if (trunc_ld_key.idx < min_safe_ld_key.idx) { min_safe_ld_key = trunc_ld_key; }
}

// All log stores are empty, we can truncate logs depends on the last flushed logdev_key
if (min_safe_ld_key == logdev_key::out_of_bound_ld_key()) {
min_safe_ld_key = m_last_flush_ld_key;
}

// There are no writes or no truncation called for any of the store, so we can't truncate anything
if (min_safe_ld_key == logdev_key::out_of_bound_ld_key() || min_safe_ld_key.idx <= m_last_truncate_idx) return 0;
if (min_safe_ld_key.idx <= 0 || min_safe_ld_key.idx <= m_last_truncate_idx) return 0;

uint64_t const num_records_to_truncate = uint64_cast(min_safe_ld_key.idx - m_last_truncate_idx);

Expand Down
5 changes: 3 additions & 2 deletions src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,9 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
std::multimap< logid_t, logstore_id_t > m_garbage_store_ids;
Clock::time_point m_last_flush_time;

logid_t m_last_flush_idx{-1}; // Track last flushed, last device offset and truncated log idx
logid_t m_last_truncate_idx{std::numeric_limits< logid_t >::min()}; // logdev truncate up to this idx
logid_t m_last_flush_idx{-1}; // Track last flushed, last device offset and truncated log idx
logdev_key m_last_flush_ld_key{0,0}; // Left interval of the last flush, 0 indicates the very beginning of logdev
logid_t m_last_truncate_idx{-1}; // Logdev truncate up to this idx
crc32_t m_last_crc{INVALID_CRC32_VALUE};

// LogDev Info block related fields
Expand Down
35 changes: 25 additions & 10 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,43 @@ void HomeLogStore::truncate(logstore_seq_num_t upto_lsn, bool in_memory_truncate

#endif

// In normal write and compact path, upto_lsn is expected to be no larger than m_tail_lsn after the flush.
// So upto_lsn > m_tail_lsn is expected to exist only in baseline resync path.
// In baseline resync path, we truncate all entries up to upto_lsn, and update m_tail_lsn and m_next_lsn
// to make sure logstore's idx is always = raft's idx - 1.
if (upto_lsn > m_tail_lsn) {
THIS_LOGSTORE_LOG(WARN,
"Truncating issued on lsn={} which is greater than tail_lsn={}, truncating upto tail_lsn",
"Truncating issued on lsn={} which is greater than tail_lsn={}",
upto_lsn, m_tail_lsn.load(std::memory_order_relaxed));
m_trunc_ld_key = m_records.at(m_tail_lsn).m_trunc_key;
upto_lsn = m_tail_lsn;
// update m_tail_lsn if it is less than upto_lsn
auto current_tail_lsn = m_tail_lsn.load(std::memory_order_relaxed);
while (current_tail_lsn < upto_lsn &&
!m_tail_lsn.compare_exchange_weak(current_tail_lsn, upto_lsn, std::memory_order_relaxed)) {}

// update m_next_lsn if it is less than upto_lsn + 1
auto current_next_lsn = m_next_lsn.load(std::memory_order_relaxed);
while (current_next_lsn < upto_lsn + 1 &&
!m_next_lsn.compare_exchange_weak(current_next_lsn, upto_lsn + 1, std::memory_order_relaxed)) {}

// insert an empty record to make sure m_records has enough size to truncate
logdev_key empty_ld_key;
m_records.create_and_complete(upto_lsn, logstore_record(empty_ld_key, empty_ld_key));
} else {
m_trunc_ld_key = m_records.at(upto_lsn).m_trunc_key;
THIS_LOGSTORE_LOG(TRACE, "Truncating logstore upto lsn={} , m_trunc_ld_key index {} offset {}", upto_lsn,
m_trunc_ld_key.idx, m_trunc_ld_key.dev_offset);
}
THIS_LOGSTORE_LOG(TRACE, "Truncating logstore upto lsn={} , m_trunc_ld_key index {} offset {}", upto_lsn,
m_trunc_ld_key.idx, m_trunc_ld_key.dev_offset);
m_records.truncate(upto_lsn);
m_start_lsn.store(upto_lsn + 1);
if (!in_memory_truncate_only) { m_logdev->truncate(); }
}

std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > HomeLogStore::truncate_info() const {
auto const trunc_lsn = m_start_lsn.load(std::memory_order_relaxed) - 1;
return std::make_tuple(trunc_lsn, m_trunc_ld_key, m_tail_lsn.load(std::memory_order_relaxed));
auto const tail_lsn = m_tail_lsn.load(std::memory_order_relaxed);

return (trunc_lsn == tail_lsn) ? std::make_tuple(trunc_lsn, logdev_key::out_of_bound_ld_key(), tail_lsn)
: std::make_tuple(trunc_lsn, m_trunc_ld_key, tail_lsn);
}

void HomeLogStore::fill_gap(logstore_seq_num_t seq_num) {
Expand Down Expand Up @@ -277,10 +295,7 @@ void HomeLogStore::flush(logstore_seq_num_t upto_lsn) {
return;
}

if (upto_lsn == invalid_lsn()) { upto_lsn = m_records.active_upto(); }

// if we have flushed already, we are done, else issue a flush
if (m_records.status(upto_lsn).is_active) m_logdev->flush_under_guard();
m_logdev->flush_under_guard();
}

bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) {
Expand Down
10 changes: 4 additions & 6 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,10 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);
REPL_STORE_LOG(DEBUG, "Adding dummy entries during compact from={} upto={}", cur_max_lsn + 1,
to_store_lsn(compact_lsn));
// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
append(m_dummy_log_entry);
}

// if compact_lsn is beyond the current max_lsn, it indicates a hole from cur_max_lsn to compact_lsn.
// we directly compact and truncate up to compact_lsn assuming there are dummy logs.
REPL_STORE_LOG(DEBUG, "Compact with log holes from {} to={}", cur_max_lsn + 1, to_store_lsn(compact_lsn));
}
m_log_store->truncate(to_store_lsn(compact_lsn));
return true;
Expand Down
103 changes: 99 additions & 4 deletions src/tests/test_log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ class LogDevTest : public ::testing::Test {
read_all_verify(log_store);
}

void truncate_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t* last_lsn = nullptr) {
void truncate_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t* trunc_lsn = nullptr) {
auto upto = log_store->get_contiguous_completed_seq_num(-1);
if (last_lsn) {
ASSERT_EQ(upto, *last_lsn);
if (trunc_lsn && *trunc_lsn != upto) {
LOGWARN("Truncate issued upto {} but real upto lsn in log store is {}", *trunc_lsn, upto);
upto = *trunc_lsn;
}

LOGINFO("truncate_validate upto {}", upto);
Expand Down Expand Up @@ -314,7 +315,6 @@ TEST_F(LogDevTest, ReTruncate) {
auto logdev_id = logstore_service().create_new_logdev();
s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple();
auto log_store = logstore_service().create_new_log_store(logdev_id, false);
auto store_id = log_store->get_store_id();

LOGINFO("Step 2: Issue sequential inserts with q depth of 10");
logstore_seq_num_t cur_lsn = 0;
Expand All @@ -337,6 +337,101 @@ TEST_F(LogDevTest, ReTruncate) {
read_all_verify(log_store);
}

TEST_F(LogDevTest, TruncateWithOverlappingLSN) {
LOGINFO("Step 1: Create a single logstore to start truncate with overlapping LSN test");
auto logdev_id = logstore_service().create_new_logdev();
s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple();
auto log_store = logstore_service().create_new_log_store(logdev_id, false);

LOGINFO("Step 2: Insert 500 entries");
logstore_seq_num_t cur_lsn = 0;
kickstart_inserts(log_store, cur_lsn, 500);

LOGINFO("Step 3: Read and verify all entries");
read_all_verify(log_store);

LOGINFO("Step 4: Truncate 100 entries");
logstore_seq_num_t trunc_lsn = 99;
truncate_validate(log_store, &trunc_lsn);
ASSERT_EQ(log_store->start_lsn(), trunc_lsn + 1);
ASSERT_EQ(log_store->tail_lsn(), 499);
ASSERT_EQ(log_store->next_lsn(), 500);
ASSERT_EQ(log_store->truncated_upto(), trunc_lsn);

LOGINFO("Step 5: Read and verify all entries");
read_all_verify(log_store);

LOGINFO("Step 6: Truncate all with overlapping lsn");
trunc_lsn = 1999999;
truncate_validate(log_store, &trunc_lsn);
ASSERT_EQ(log_store->start_lsn(), trunc_lsn + 1);
ASSERT_EQ(log_store->tail_lsn(), trunc_lsn);
ASSERT_EQ(log_store->next_lsn(), 2000000);
ASSERT_EQ(log_store->truncated_upto(), trunc_lsn);

LOGINFO("Step 7 Read and verify all entries");
read_all_verify(log_store);

LOGINFO("Step 8: Append 500 entries")
cur_lsn = log_store->next_lsn();
kickstart_inserts(log_store, cur_lsn, 500);
ASSERT_EQ(log_store->next_lsn(), 2000500);

LOGINFO("Step 9: Read and verify all entries");
read_all_verify(log_store);
}

TEST_F(LogDevTest, TruncateAfterRestart) {
LOGINFO("Step 1: Create a single logstore to start truncate with overlapping LSN test");
auto logdev_id = logstore_service().create_new_logdev();
s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple();
auto log_store = logstore_service().create_new_log_store(logdev_id, false);
auto store_id = log_store->get_store_id();

auto restart = [&]() {
std::promise< bool > p;
auto starting_cb = [&]() {
logstore_service().open_logdev(logdev_id);
logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) {
log_store = store;
p.set_value(true);
});
};
start_homestore(true /* restart */, starting_cb);
p.get_future().get();
};

LOGINFO("Step 2: Insert 500 entries");
logstore_seq_num_t cur_lsn = 0;
kickstart_inserts(log_store, cur_lsn, 500);

LOGINFO("Step 3: Read and verify all entries");
read_all_verify(log_store);

LOGINFO("Step 4: Truncate 100 entries");
logstore_seq_num_t trunc_lsn = 99;
truncate_validate(log_store, &trunc_lsn);
ASSERT_EQ(log_store->start_lsn(), trunc_lsn + 1);
ASSERT_EQ(log_store->tail_lsn(), 499);
ASSERT_EQ(log_store->next_lsn(), 500);
ASSERT_EQ(log_store->truncated_upto(), trunc_lsn);

LOGINFO("Step 5: Read and verify all entries");
read_all_verify(log_store);

LOGINFO("Step 6: Restart and verify all entries");
restart();
read_all_verify(log_store);
auto const [last_trunc_lsn, trunc_ld_key, tail_lsn] = log_store->truncate_info();
ASSERT_EQ(last_trunc_lsn, trunc_lsn);
ASSERT_EQ(trunc_ld_key.idx, 0);
ASSERT_EQ(tail_lsn, log_store->tail_lsn());

LOGINFO("Step 7: call log dev truncate again and read verify")
logstore_service().device_truncate();
read_all_verify(log_store);
}

TEST_F(LogDevTest, CreateRemoveLogDev) {
auto num_logdev = SISL_OPTIONS["num_logdevs"].as< uint32_t >();
std::vector< std::shared_ptr< HomeLogStore > > log_stores;
Expand Down

0 comments on commit b8f76c6

Please sign in to comment.