Skip to content

Commit

Permalink
Merge pull request #23360 from andrwng/v24.1.x-23358
Browse files Browse the repository at this point in the history
[v24.1.x] archival: skip spillover retention if not collectable
  • Loading branch information
lf-rep authored Sep 18, 2024
2 parents 878aa3a + ca63337 commit f300619
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,10 @@ ss::future<> ntp_archiver::apply_archive_retention() {
}

const auto& ntp_conf = _parent.get_ntp_config();
if (!ntp_conf.is_collectable()) {
vlog(_rtclog.trace, "NTP is not collectable");
co_return;
}
std::optional<size_t> retention_bytes = ntp_conf.retention_bytes();
std::optional<std::chrono::milliseconds> retention_ms
= ntp_conf.retention_duration();
Expand Down
45 changes: 45 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,51 @@ class e2e_fixture
scoped_config test_local_cfg;
};

FIXTURE_TEST(test_spillover_retention_compacted_topic, e2e_fixture) {
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
.set_value(true);
test_local_cfg.get("cloud_storage_spillover_manifest_max_segments")
.set_value(std::make_optional<size_t>(5));
test_local_cfg.get("cloud_storage_spillover_manifest_size")
.set_value(std::optional<size_t>{});
test_local_cfg.get("log_retention_ms")
.set_value(std::make_optional<std::chrono::milliseconds>(1ms));
const model::topic topic_name("tapioca");
model::ntp ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction;
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

const auto records_per_seg = 5;
const auto num_segs = 100;
auto partition = app.partition_manager.local().get(ntp);
auto& archiver = partition->archiver().value().get();
tests::remote_segment_generator gen(make_kafka_client().get(), *partition);
auto total_records = gen.num_segments(num_segs)
.batches_per_segment(records_per_seg)
.produce()
.get();
BOOST_REQUIRE_GE(total_records, 500);
BOOST_REQUIRE(archiver.sync_for_tests().get());
archiver.apply_spillover().get();
ss::sleep(5s).get();
archiver.apply_archive_retention().get();

tests::kafka_list_offsets_transport lister(make_kafka_client().get());
lister.start().get();

auto offset
= lister.start_offset_for_partition(topic_name, model::partition_id(0))
.get();
BOOST_REQUIRE_EQUAL(offset(), 0);
BOOST_REQUIRE_EQUAL(
archiver.manifest().full_log_start_offset().value_or(model::offset{})(),
0);
}

FIXTURE_TEST(test_produce_consume_from_cloud, e2e_fixture) {
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
.set_value(true);
Expand Down

0 comments on commit f300619

Please sign in to comment.