Skip to content

Commit

Permalink
storage: add log lines about removed records
Browse files Browse the repository at this point in the history
CONFLICT:
- conflict because this branch doesn't have
  pb.add_segment_marked_tombstone_free()

Removing records from the storage layer is currently very silent, making
it difficult to debug. This commit exposes the new
copy_data_segment_reducer stats in as a log message, at info level when
data is removed, and debug level when not (presumably we mostly care
about these stats when data is actually being removed).

(cherry picked from commit 8c84ffd)
  • Loading branch information
andrwng committed Jan 9, 2025
1 parent b0b4545 commit f125faf
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
11 changes: 10 additions & 1 deletion src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ class copy_data_segment_reducer : public compaction_reducer {
// of a compactible type.
size_t non_compactible_batches{0};

// Returns whether any data was removed by this reducer.
bool has_removed_data() const {
return batches_discarded > 0 || records_discarded > 0;
}

friend std::ostream& operator<<(std::ostream& os, const stats& s) {
fmt::print(
os,
Expand All @@ -145,6 +150,10 @@ class copy_data_segment_reducer : public compaction_reducer {
return os;
}
};
struct idx_and_stats {
index_state new_idx;
stats reducer_stats;
};

copy_data_segment_reducer(
filter_t f,
Expand All @@ -165,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer {
, _as(as) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
storage::index_state end_of_stream() { return std::move(_idx); }
idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; }

private:
ss::future<ss::stop_iteration>
Expand Down
19 changes: 17 additions & 2 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,32 @@ ss::future<index_state> deduplicate_segment(
inject_reader_failure,
cfg.asrc);

auto new_idx = co_await std::move(rdr).consume(
auto res = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
"Windowed compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Windowed compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}

// restore broker timestamp and clean compact timestamp
auto& new_idx = res.new_idx;
new_idx.broker_timestamp = seg->index().broker_timestamp();
new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp();

// Set may_have_tombstone_records
new_idx.may_have_tombstone_records = may_have_tombstone_records;

co_return new_idx;
co_return std::move(new_idx);
}

} // namespace storage
39 changes: 27 additions & 12 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,33 @@ ss::future<storage::index_state> do_copy_segment_data(
cfg.asrc);

// create the segment, get the in-memory index for the new segment
auto new_index = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
auto res = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
});
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
"Self compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Self compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}
auto& new_index = res.new_idx;

// restore broker timestamp and clean compact timestamp
new_index.broker_timestamp = old_broker_timestamp;
Expand All @@ -460,7 +475,7 @@ ss::future<storage::index_state> do_copy_segment_data(
// Set may_have_tombstone_records
new_index.may_have_tombstone_records = may_have_tombstone_records;

co_return new_index;
co_return std::move(new_index);
}

model::record_batch_reader create_segment_full_reader(
Expand Down

0 comments on commit f125faf

Please sign in to comment.