Skip to content

Commit

Permalink
cluster: add parse_self_test_checks()
Browse files Browse the repository at this point in the history
Clean up the `self_test_backend` and `admin/server` parse sites
for self-test JSON parsing by adding `parse_self_test_checks()`.

This will now be the function called from `server::self_test_start_handler()`
as well as `self_test_backend::do_start_test()`.

Also, rename `unknown_check` to `unparsed_check` for clarity. These
unparsed checks are still considered "unknown" if they exist by the
end of the self-test.
  • Loading branch information
WillemKauf committed Aug 16, 2024
1 parent a33abd7 commit bbd6445
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 87 deletions.
64 changes: 21 additions & 43 deletions src/v/cluster/self_test_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,17 @@ ss::future<> self_test_backend::stop() {
co_await std::move(f);
}

void self_test_backend::parse_unknown_checks(
std::vector<cloudcheck_opts>& ctos,
std::vector<unknown_check>& unknown_checks) {
for (auto it = unknown_checks.begin(); it != unknown_checks.end();) {
if (it->test_type == "cloud") {
json::Document doc;
if (doc.Parse(it->test_json.c_str()).HasParseError()) {
++it;
continue;
}
const auto& obj = doc.GetObject();
ctos.push_back(cluster::cloudcheck_opts::from_json(obj));
it = unknown_checks.erase(it);
} else {
++it;
}
}
}

ss::future<std::vector<self_test_result>> self_test_backend::do_start_test(
std::vector<diskcheck_opts> dtos,
std::vector<netcheck_opts> ntos,
std::vector<cloudcheck_opts> ctos,
std::vector<unknown_check> unknown_checks) {
ss::future<std::vector<self_test_result>>
self_test_backend::do_start_test(start_test_request r) {
auto gate_holder = _gate.hold();
std::vector<self_test_result> results;

parse_unknown_checks(ctos, unknown_checks);
parse_self_test_checks(r);

auto dtos = std::move(r.dtos);
auto ntos = std::move(r.ntos);
auto ctos = std::move(r.ctos);
auto unparsed_checks = std::move(r.unparsed_checks);

_stage = self_test_stage::disk;
for (auto& dto : dtos) {
Expand Down Expand Up @@ -173,13 +156,13 @@ ss::future<std::vector<self_test_result>> self_test_backend::do_start_test(
}
}

for (const auto& unknown_check : unknown_checks) {
for (const auto& unparsed_check : unparsed_checks) {
results.push_back(self_test_result{
.name = "Unknown",
.test_type = unknown_check.test_type,
.test_type = unparsed_check.test_type,
.error = fmt::format(
"Unknown test type {} requested on node {}",
unknown_check.test_type,
unparsed_check.test_type,
_self)});
}

Expand All @@ -195,21 +178,16 @@ get_status_response self_test_backend::start_test(start_test_request req) {
clusterlog.debug, "Request to start self-tests with id: {}", req.id);
ssx::background
= ssx::spawn_with_gate_then(_gate, [this, req = std::move(req)]() {
return do_start_test(
std::move(req.dtos),
std::move(req.ntos),
std::move(req.ctos),
std::move(req.unknown_checks))
.then([this, id = req.id](auto results) {
for (auto& r : results) {
r.test_id = id;
}
_prev_run = get_status_response{
.id = id,
.status = self_test_status::idle,
.results = std::move(results),
.stage = _stage};
});
return do_start_test(std::move(req)).then([this](auto results) {
for (auto& r : results) {
r.test_id = _id;
}
_prev_run = get_status_response{
.id = _id,
.status = self_test_status::idle,
.results = std::move(results),
.stage = _stage};
});
}).finally([units = std::move(units)] {});
} else {
vlog(
Expand Down
18 changes: 2 additions & 16 deletions src/v/cluster/self_test_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,8 @@ class self_test_backend {
ss::future<netcheck_response> netcheck(model::node_id, iobuf&&);

private:
// In the case that the controller node is of a redpanda version lower than
// the current node, some self-test checks may have been pushed back into
// the "unknown_checks" vector in the request. We will attempt to parse the
// test json on this node instead, if we recognize it. As of versions >=
// v24.2.x, this will only affect the cloudcheck options. This function
// modifies the in/out parameters: [ctos, unknown_checks]. This function
// will need amending in the event that a new self-test is added.
void parse_unknown_checks(
std::vector<cloudcheck_opts>& ctos,
std::vector<unknown_check>& unknown_checks);

ss::future<std::vector<self_test_result>> do_start_test(
std::vector<diskcheck_opts> dtos,
std::vector<netcheck_opts> ntos,
std::vector<cloudcheck_opts> ctos,
std::vector<unknown_check> unknown_checks);
ss::future<std::vector<self_test_result>>
do_start_test(start_test_request r);

struct previous_netcheck_entity {
static const inline model::node_id unassigned{-1};
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/self_test_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ss::future<uuid_t> self_test_frontend::start_test(
}
if (
req.dtos.empty() && req.ntos.empty() && req.ctos.empty()
&& req.unknown_checks.empty()) {
&& req.unparsed_checks.empty()) {
throw self_test_exception("No tests specified to run");
}
/// Validate input
Expand Down Expand Up @@ -197,7 +197,7 @@ ss::future<uuid_t> self_test_frontend::start_test(
.id = test_id,
.dtos = std::move(req.dtos),
.ntos = std::move(new_ntos),
.unknown_checks = std::move(req.unknown_checks),
.unparsed_checks = std::move(req.unparsed_checks),
.ctos = std::move(req.ctos),
});
});
Expand Down
28 changes: 28 additions & 0 deletions src/v/cluster/self_test_rpc_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,32 @@ make_netcheck_request(model::node_id src, size_t sz) {
co_return cluster::netcheck_request{.source = src, .buf = std::move(iob)};
}

void parse_self_test_checks(start_test_request& r) {
static constexpr auto known_checks = std::to_array(
{"disk", "network", "cloud"});
for (auto it = r.unparsed_checks.begin(); it != r.unparsed_checks.end();) {
const auto& test_type = it->test_type;
if (
std::find(known_checks.begin(), known_checks.end(), test_type)
!= known_checks.end()) {
json::Document doc;
if (doc.Parse(it->test_json.c_str()).HasParseError()) {
++it;
continue;
}
const auto& obj = doc.GetObject();
if (test_type == "disk") {
r.dtos.push_back(cluster::diskcheck_opts::from_json(obj));
} else if (test_type == "network") {
r.ntos.push_back(cluster::netcheck_opts::from_json(obj));
} else if (test_type == "cloud") {
r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj));
}
it = r.unparsed_checks.erase(it);
} else {
++it;
}
}
}

} // namespace cluster
30 changes: 19 additions & 11 deletions src/v/cluster/self_test_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,21 @@ struct cloudcheck_opts
}
};

// Captures unknown test types passed to self test frontend for logging
// purposes.
struct unknown_check
// Captures unparsed test types passed to self test backend.
struct unparsed_check
: serde::
envelope<unknown_check, serde::version<0>, serde::compat_version<0>> {
envelope<unparsed_check, serde::version<0>, serde::compat_version<0>> {
ss::sstring test_type;
ss::sstring test_json;
auto serde_fields() { return std::tie(test_type, test_json); }

friend std::ostream&
operator<<(std::ostream& o, const unknown_check& unknown_check) {
operator<<(std::ostream& o, const unparsed_check& unparsed_check) {
fmt::print(
o,
"{{test_type: {}, test_json: {}}}",
unknown_check.test_type,
unknown_check.test_json);
unparsed_check.test_type,
unparsed_check.test_json);
return o;
}
};
Expand Down Expand Up @@ -348,11 +347,11 @@ struct start_test_request
uuid_t id;
std::vector<diskcheck_opts> dtos;
std::vector<netcheck_opts> ntos;
std::vector<unknown_check> unknown_checks;
std::vector<unparsed_check> unparsed_checks;
std::vector<cloudcheck_opts> ctos;

auto serde_fields() {
return std::tie(id, dtos, ntos, unknown_checks, ctos);
return std::tie(id, dtos, ntos, unparsed_checks, ctos);
}

friend std::ostream&
Expand All @@ -367,8 +366,8 @@ struct start_test_request
for (const auto& v : r.ctos) {
fmt::print(ss, "cloudcheck_opts: {}", v);
}
for (const auto& v : r.unknown_checks) {
fmt::print(ss, "unknown_check: {}", v);
for (const auto& v : r.unparsed_checks) {
fmt::print(ss, "unparsed_check: {}", v);
}
fmt::print(o, "{{id: {} {}}}", r.id, ss.str());
return o;
Expand Down Expand Up @@ -430,4 +429,13 @@ struct netcheck_response
ss::future<cluster::netcheck_request>
make_netcheck_request(model::node_id src, size_t sz);

// Parses the raw json out of the start_test_request::unparsed_checks vector
// into self-test options for the various tests, utilizing `opt_t::from_json`.
// In the case that the controller node is of a redpanda version lower than
// the current node, some self-test checks may have been left in
// the "unparsed_checks" vector in the request when the server first processes
// the self test request. We will attempt to parse the test json in the
// self_test_backend of the follower node instead, if we recognize it.
void parse_self_test_checks(start_test_request& r);

} // namespace cluster
23 changes: 8 additions & 15 deletions src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2821,22 +2821,15 @@ admin_server::self_test_start_handler(std::unique_ptr<ss::http::request> req) {
for (const auto& element : params) {
const auto& obj = element.GetObject();
const ss::sstring test_type(obj["type"].GetString());
if (test_type == "disk") {
r.dtos.push_back(cluster::diskcheck_opts::from_json(obj));
} else if (test_type == "network") {
r.ntos.push_back(cluster::netcheck_opts::from_json(obj));
} else if (test_type == "cloud") {
r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj));
} else {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
element.Accept(writer);
r.unknown_checks.push_back(cluster::unknown_check{
.test_type = test_type,
.test_json = ss::sstring{
buffer.GetString(), buffer.GetSize()}});
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
element.Accept(writer);
r.unparsed_checks.push_back(cluster::unparsed_check{
.test_type = test_type,
.test_json = ss::sstring{
buffer.GetString(), buffer.GetSize()}});
}
cluster::parse_self_test_checks(r);
} else {
/// Default test run is to start 1 disk and 1 network test with
/// default arguments
Expand Down

0 comments on commit bbd6445

Please sign in to comment.