From 7780c8e4cb3176e4247fb18d37640db654152dc2 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 2 Aug 2024 21:37:46 +0000 Subject: [PATCH] serde/proto: seastar friendly protobuf parser TODO explain yourself --- MODULE.bazel | 3 +- MODULE.bazel.lock | 93 +-- bazel/repositories.bzl | 8 + bazel/thirdparty/libprotobuf-mutator.BUILD | 25 + src/v/serde/protobuf/BUILD | 21 + src/v/serde/protobuf/README.md | 0 src/v/serde/protobuf/parser.cc | 522 +++++++++++++++++ src/v/serde/protobuf/parser.h | 109 ++++ src/v/serde/protobuf/tests/BUILD | 57 ++ src/v/serde/protobuf/tests/parser_test.cc | 548 ++++++++++++++++++ .../tests/test_messages_edition2023.proto | 183 ++++++ src/v/serde/protobuf/tests/three.proto | 66 +++ src/v/serde/protobuf/tests/two.proto | 20 + 13 files changed, 1617 insertions(+), 38 deletions(-) create mode 100644 bazel/thirdparty/libprotobuf-mutator.BUILD create mode 100644 src/v/serde/protobuf/BUILD create mode 100644 src/v/serde/protobuf/README.md create mode 100644 src/v/serde/protobuf/parser.cc create mode 100644 src/v/serde/protobuf/parser.h create mode 100644 src/v/serde/protobuf/tests/BUILD create mode 100644 src/v/serde/protobuf/tests/parser_test.cc create mode 100644 src/v/serde/protobuf/tests/test_messages_edition2023.proto create mode 100644 src/v/serde/protobuf/tests/three.proto create mode 100644 src/v/serde/protobuf/tests/two.proto diff --git a/MODULE.bazel b/MODULE.bazel index 2352c1cc07233..0131048d546a9 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -33,7 +33,7 @@ bazel_dep(name = "googletest", version = "1.14.0.bcr.1") bazel_dep(name = "liburing", version = "2.5") bazel_dep(name = "lz4", version = "1.9.4") bazel_dep(name = "platforms", version = "0.0.10") -bazel_dep(name = "protobuf", version = "24.4") +bazel_dep(name = "protobuf", version = "27.3") bazel_dep(name = "re2", version = "2023-09-01") bazel_dep(name = "rules_foreign_cc", version = "0.10.1") bazel_dep(name = "rules_go", version = "0.49.0") @@ -71,6 +71,7 @@ use_repo(non_module_dependencies, "hwloc") use_repo(non_module_dependencies, "jsoncons") use_repo(non_module_dependencies, "krb5") use_repo(non_module_dependencies, "libpciaccess") +use_repo(non_module_dependencies, "libprotobuf_mutator") use_repo(non_module_dependencies, "libxml2") use_repo(non_module_dependencies, "lksctp") use_repo(non_module_dependencies, "numactl") diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 84b0a817c59cc..ef2e5305c512a 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -50,6 +50,8 @@ "https://bcr.bazel.build/modules/googletest/1.14.0.bcr.1/MODULE.bazel": "22c31a561553727960057361aa33bf20fb2e98584bc4fec007906e27053f80c6", "https://bcr.bazel.build/modules/googletest/1.14.0.bcr.1/source.json": "41e9e129f80d8c8bf103a7acc337b76e54fad1214ac0a7084bf24f4cd924b8b4", "https://bcr.bazel.build/modules/googletest/1.14.0/MODULE.bazel": "cfbcbf3e6eac06ef9d85900f64424708cc08687d1b527f0ef65aa7517af8118f", + "https://bcr.bazel.build/modules/jsoncpp/1.9.5/MODULE.bazel": "31271aedc59e815656f5736f282bb7509a97c7ecb43e927ac1a37966e0578075", + "https://bcr.bazel.build/modules/jsoncpp/1.9.5/source.json": "4108ee5085dd2885a341c7fab149429db457b3169b86eb081fa245eadf69169d", "https://bcr.bazel.build/modules/libpfm/4.11.0/MODULE.bazel": "45061ff025b301940f1e30d2c16bea596c25b176c8b6b3087e92615adbd52902", "https://bcr.bazel.build/modules/liburing/2.5/MODULE.bazel": "51768194b0b344123b2d7237b65928c9c119e7fc18a3f86f4870e83c0b71c00a", "https://bcr.bazel.build/modules/liburing/2.5/source.json": "1ff3e7c04563757f99cc7d33c2d4cb4900c45d6aca2cf54d58a1b199e66460ec", @@ -64,9 +66,8 @@ "https://bcr.bazel.build/modules/platforms/0.0.8/MODULE.bazel": "9f142c03e348f6d263719f5074b21ef3adf0b139ee4c5133e2aa35664da9eb2d", "https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc", "https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7", - "https://bcr.bazel.build/modules/protobuf/23.1/MODULE.bazel": "88b393b3eb4101d18129e5db51847cd40a5517a53e81216144a8c32dfeeca52a", - "https://bcr.bazel.build/modules/protobuf/24.4/MODULE.bazel": "7bc7ce5f2abf36b3b7b7c8218d3acdebb9426aeb35c2257c96445756f970eb12", - "https://bcr.bazel.build/modules/protobuf/24.4/source.json": "ace4b8c65d4cfe64efe544f09fc5e5df77faf3a67fbb29c5341e0d755d9b15d6", + "https://bcr.bazel.build/modules/protobuf/27.3/MODULE.bazel": "d94898cbf9d6d25c0edca2521211413506b68a109a6b01776832ed25154d23d7", + "https://bcr.bazel.build/modules/protobuf/27.3/source.json": "d6fdc641a99c600df6eb0fa5b99879ca497dbcf6fd1287372576a83f82dd93b6", "https://bcr.bazel.build/modules/protobuf/3.19.0/MODULE.bazel": "6b5fbb433f760a99a22b18b6850ed5784ef0e9928a72668b66e4d7ccd47db9b0", "https://bcr.bazel.build/modules/protobuf/3.19.2/MODULE.bazel": "532ffe5f2186b69fdde039efe6df13ba726ff338c6bc82275ad433013fa10573", "https://bcr.bazel.build/modules/protobuf/3.19.6/MODULE.bazel": "9233edc5e1f2ee276a60de3eaa47ac4132302ef9643238f23128fea53ea12858", @@ -89,7 +90,7 @@ "https://bcr.bazel.build/modules/rules_go/0.49.0/MODULE.bazel": "61cfc1ba17123356d1b12b6c50f6e0162b2cc7fd6f51753c12471e973a0f72a5", "https://bcr.bazel.build/modules/rules_go/0.49.0/source.json": "ab2261ea5e29d29a41c8e5c67896f946ab7855b786d28fe25d74987b84e5e85d", "https://bcr.bazel.build/modules/rules_java/4.0.0/MODULE.bazel": "5a78a7ae82cd1a33cef56dc578c7d2a46ed0dca12643ee45edbb8417899e6f74", - "https://bcr.bazel.build/modules/rules_java/7.1.0/MODULE.bazel": "30d9135a2b6561c761bd67bd4990da591e6bdc128790ce3e7afd6a3558b2fb64", + "https://bcr.bazel.build/modules/rules_java/5.3.5/MODULE.bazel": "a4ec4f2db570171e3e5eb753276ee4b389bae16b96207e9d3230895c99644b86", "https://bcr.bazel.build/modules/rules_java/7.6.1/MODULE.bazel": "2f14b7e8a1aa2f67ae92bc69d1ec0fa8d9f827c4e17ff5e5f02e91caa3b2d0fe", "https://bcr.bazel.build/modules/rules_java/7.6.1/source.json": "8f3f3076554e1558e8e468b2232991c510ecbcbed9e6f8c06ac31c93bcf38362", "https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/MODULE.bazel": "a56b85e418c83eb1839819f0b515c431010160383306d13ec21959ac412d2fe7", @@ -117,8 +118,6 @@ "https://bcr.bazel.build/modules/stardoc/0.5.3/MODULE.bazel": "c7f6948dae6999bf0db32c1858ae345f112cacf98f174c7a8bb707e41b974f1c", "https://bcr.bazel.build/modules/stardoc/0.5.3/source.json": "cd53fe968dc8cd98197c052db3db6d82562960c87b61e7a90ee96f8e4e0dda97", "https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/MODULE.bazel": "7298990c00040a0e2f121f6c32544bab27d4452f80d9ce51349b1a28f3005c43", - "https://bcr.bazel.build/modules/upb/0.0.0-20230516-61a97ef/MODULE.bazel": "c0df5e35ad55e264160417fd0875932ee3c9dda63d9fccace35ac62f45e1b6f9", - "https://bcr.bazel.build/modules/upb/0.0.0-20230516-61a97ef/source.json": "b2150404947339e8b947c6b16baa39fa75657f4ddec5e37272c7b11c7ab533bc", "https://bcr.bazel.build/modules/xz/5.4.5.bcr.3/MODULE.bazel": "1f4f514bd40df12c54548bb7df07643d65e859b272e3b76e90181dfc1a55cb1e", "https://bcr.bazel.build/modules/xz/5.4.5.bcr.3/source.json": "58cfd8a917944f183a3a808edb2e87de9e60b6269b643d7e3d1597267eaad6a2", "https://bcr.bazel.build/modules/yaml-cpp/0.8.0/MODULE.bazel": "879443fbbf128457a187bea6f278d05789f3fc465bb22c2e0fe7fdb52e45eef0", @@ -135,7 +134,7 @@ "moduleExtensions": { "//bazel:extensions.bzl%non_module_dependencies": { "general": { - "bzlTransitiveDigest": "HBFEkvJc7CQGKz81eF4jaFNMXECW7gQi083SNLiRGvQ=", + "bzlTransitiveDigest": "deAmSqwFISNBXDJUZUTguITT2GEhobXeWI8NguILvqI=", "usagesDigest": "bsXDsdl5Gq0iZDf6R9bhf3oHUM35HAGF1usXoFeQrF0=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, @@ -311,6 +310,16 @@ "url": "/~https://github.com/danielaparker/jsoncons/archive/ffd2540bc9cfb54c16ef4d29d80622605d8dfbe8.tar.gz" } }, + "libprotobuf_mutator": { + "bzlFile": "@@bazel_tools//tools/build_defs/repo:http.bzl", + "ruleClassName": "http_archive", + "attributes": { + "build_file": "@@//bazel/thirdparty:libprotobuf-mutator.BUILD", + "integrity": "sha256-KWUbFgNpDJtAO6Kr0eTo1v6iczEOta72jSle9oivFhg=", + "strip_prefix": "libprotobuf-mutator-b922c8ab9004ef9944982e4f165e2747b13223fa", + "url": "/~https://github.com/google/libprotobuf-mutator/archive/b922c8ab9004ef9944982e4f165e2747b13223fa.zip" + } + }, "openssl": { "bzlFile": "@@bazel_tools//tools/build_defs/repo:http.bzl", "ruleClassName": "http_archive", @@ -616,35 +625,6 @@ "recordedRepoMappingEntries": [] } }, - "@@protobuf~//:non_module_deps.bzl%non_module_deps": { - "general": { - "bzlTransitiveDigest": "jsbfONl9OksDWiAs7KDFK5chH/tYI3DngdM30NKdk5Y=", - "usagesDigest": "eVrT3hFCIZNRuTKpfWDzSIwTi2p6U6PWbt+tNWl/Tqk=", - "recordedFileInputs": {}, - "recordedDirentsInputs": {}, - "envVariables": {}, - "generatedRepoSpecs": { - "utf8_range": { - "bzlFile": "@@bazel_tools//tools/build_defs/repo:http.bzl", - "ruleClassName": "http_archive", - "attributes": { - "urls": [ - "/~https://github.com/protocolbuffers/utf8_range/archive/de0b4a8ff9b5d4c98108bdfe723291a33c52c54f.zip" - ], - "strip_prefix": "utf8_range-de0b4a8ff9b5d4c98108bdfe723291a33c52c54f", - "sha256": "5da960e5e5d92394c809629a03af3c7709d2d3d0ca731dacb3a9fb4bf28f7702" - } - } - }, - "recordedRepoMappingEntries": [ - [ - "protobuf~", - "bazel_tools", - "bazel_tools" - ] - ] - } - }, "@@pybind11_bazel~//:python_configure.bzl%extension": { "general": { "bzlTransitiveDigest": "3LPSHhLo7VQLO+x5c48KQmJdPDwEMqMdeng5XVAZm4Y=", @@ -1069,7 +1049,7 @@ "@@rules_jvm_external~//:extensions.bzl%maven": { "general": { "bzlTransitiveDigest": "4ijz6uc3T4E+d+U8LQv4EAt+8OqZNVY/lzvhLx3y1yg=", - "usagesDigest": "OjLvK9v56sSYg9fWBGDp03uaz8IwSP9Vg23Iv73BRdY=", + "usagesDigest": "WfVTcbopbu3jyxPgDWx1iqIv1QV6L/T7utvDxAj5k84=", "recordedFileInputs": { "@@rules_jvm_external~//rules_jvm_external_deps_install.json": "3ab1f67b0de4815df110bc72ccd6c77882b3b21d3d1e0a84445847b6ce3235a3" }, @@ -1426,6 +1406,45 @@ "downloaded_file_path": "com/google/oauth-client/google-oauth-client/1.31.1/google-oauth-client-1.31.1.jar" } }, + "maven": { + "bzlFile": "@@rules_jvm_external~//:coursier.bzl", + "ruleClassName": "coursier_fetch", + "attributes": { + "repositories": [ + "{ \"repo_url\": \"https://repo1.maven.org/maven2\" }" + ], + "artifacts": [ + "{ \"group\": \"com.google.code.findbugs\", \"artifact\": \"jsr305\", \"version\": \"3.0.2\" }", + "{ \"group\": \"com.google.code.gson\", \"artifact\": \"gson\", \"version\": \"2.8.9\" }", + "{ \"group\": \"com.google.errorprone\", \"artifact\": \"error_prone_annotations\", \"version\": \"2.3.2\" }", + "{ \"group\": \"com.google.j2objc\", \"artifact\": \"j2objc-annotations\", \"version\": \"1.3\" }", + "{ \"group\": \"com.google.guava\", \"artifact\": \"guava\", \"version\": \"31.1-jre\" }", + "{ \"group\": \"com.google.guava\", \"artifact\": \"guava-testlib\", \"version\": \"31.1-jre\" }", + "{ \"group\": \"com.google.truth\", \"artifact\": \"truth\", \"version\": \"1.1.2\" }", + "{ \"group\": \"junit\", \"artifact\": \"junit\", \"version\": \"4.13.2\" }", + "{ \"group\": \"org.mockito\", \"artifact\": \"mockito-core\", \"version\": \"4.3.1\" }" + ], + "fail_on_missing_checksum": true, + "fetch_sources": true, + "fetch_javadoc": false, + "excluded_artifacts": [], + "generate_compat_repositories": false, + "version_conflict_policy": "default", + "override_targets": {}, + "strict_visibility": false, + "strict_visibility_value": [ + "@@//visibility:private" + ], + "resolve_timeout": 600, + "jetify": false, + "jetify_include_list": [ + "*" + ], + "use_starlark_android_rules": false, + "aar_import_bzl_label": "@build_bazel_rules_android//android:rules.bzl", + "duplicate_version_warning": "warn" + } + }, "software_amazon_awssdk_aws_xml_protocol_2_17_183": { "bzlFile": "@@bazel_tools//tools/build_defs/repo:http.bzl", "ruleClassName": "http_file", diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 4e1a579f36788..71b357c05ea09 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -80,6 +80,14 @@ def data_dependency(): url = "https://gitlab.freedesktop.org/xorg/lib/libpciaccess/-/archive/2ec2576cabefef1eaa5dd9307c97de2e887fc347/libpciaccess-2ec2576cabefef1eaa5dd9307c97de2e887fc347.tar.gz", ) + http_archive( + name = "libprotobuf_mutator", + build_file = "//bazel/thirdparty:libprotobuf-mutator.BUILD", + integrity = "sha256-KWUbFgNpDJtAO6Kr0eTo1v6iczEOta72jSle9oivFhg=", + strip_prefix = "libprotobuf-mutator-b922c8ab9004ef9944982e4f165e2747b13223fa", + url = "/~https://github.com/google/libprotobuf-mutator/archive/b922c8ab9004ef9944982e4f165e2747b13223fa.zip", + ) + http_archive( name = "libxml2", build_file = "//bazel/thirdparty:libxml2.BUILD", diff --git a/bazel/thirdparty/libprotobuf-mutator.BUILD b/bazel/thirdparty/libprotobuf-mutator.BUILD new file mode 100644 index 0000000000000..8bccd1f8cb974 --- /dev/null +++ b/bazel/thirdparty/libprotobuf-mutator.BUILD @@ -0,0 +1,25 @@ +# See google/libprotobuf-mutator#91 + +cc_library( + name = "libprotobuf_mutator", + testonly = 1, + srcs = glob( + [ + "src/*.cc", + "src/*.h", + ], + exclude = [ + "**/*_test.cc", + "src/mutator.h", + ], + ) + [ + "port/protobuf.h", + ], + hdrs = [ + "src/mutator.h", + ], + include_prefix = "protobuf_mutator", + strip_include_prefix = "src", + visibility = ["//visibility:public"], + deps = ["@protobuf"], +) diff --git a/src/v/serde/protobuf/BUILD b/src/v/serde/protobuf/BUILD new file mode 100644 index 0000000000000..bc9c4147d3ebd --- /dev/null +++ b/src/v/serde/protobuf/BUILD @@ -0,0 +1,21 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +redpanda_cc_library( + name = "protobuf", + srcs = [ + "parser.cc", + ], + hdrs = [ + "parser.h", + ], + include_prefix = "serde/protobuf", + visibility = ["//visibility:public"], + deps = [ + "//src/v/bytes:iobuf", + "//src/v/bytes:iobuf_parser", + "//src/v/utils:vint", + "//src/v/container:chunked_hash_map", + "//src/v/container:fragmented_vector", + "@protobuf", + ], +) diff --git a/src/v/serde/protobuf/README.md b/src/v/serde/protobuf/README.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/v/serde/protobuf/parser.cc b/src/v/serde/protobuf/parser.cc new file mode 100644 index 0000000000000..2624f0d6347b0 --- /dev/null +++ b/src/v/serde/protobuf/parser.cc @@ -0,0 +1,522 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "serde/protobuf/parser.h" + +#include "bytes/iobuf_parser.h" +#include "utils/vint.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace serde::pb { + +namespace pb = google::protobuf; + +enum class wire_type : uint8_t { + variant = 0, + i64 = 1, + length = 2, + group_start = 3, + group_end = 4, + i32 = 5, + max = 5, +}; + +struct tag { + wire_type wire_type; + int32_t field_number; +}; + +int32_t zig_zag_decode(uint32_t n) { + // Note: Using unsigned types prevent undefined behavior + return static_cast((n >> 1U) ^ (~(n & 1U) + 1)); +} + +template +T read_varint(iobuf_parser_base& parser) { + constexpr size_t limit = sizeof(T) == 8 ? 10 : 5; + auto buf = parser.peek_bytes(std::min(limit, parser.bytes_left())); + if constexpr (std::is_same_v) { + auto [v, len] = unsigned_vint::detail::deserialize(buf, limit); + parser.skip(len); + return static_cast(v); + } else if constexpr (std::is_same_v) { + auto [v, len] = unsigned_vint::detail::deserialize(buf, limit); + parser.skip(len); + return zig_zag_decode(static_cast(v)); + } else if constexpr (std::is_same_v) { + auto [v, len] = unsigned_vint::detail::deserialize(buf, limit); + parser.skip(len); + return v; + } else { + static_assert(std::is_same_v); + auto [v, len] = unsigned_vint::detail::deserialize(buf, limit); + parser.skip(len); + return vint::decode_zigzag(v); + } +} + +class parser { + static constexpr int32_t top_level_field_number = -1; + +public: + ss::future> + parse(iobuf iobuf, const pb::Descriptor& descriptor) { + stage(top_level_field_number, std::move(iobuf), descriptor); + while (true) { + co_await parse_until_current_done(); + auto field_number = current_->field_number; + auto completed = std::move(current_->message); + state_.pop_back(); + if (state_.empty()) { + co_return completed; + } + current_ = &state_.back(); + update_field(field_number, std::move(completed)); + } + } + +private: + // Parse the current message (which can change during this method) until + // it's complete and we need to pop our stack. + // + // This method will descend into the serialized protobuf until it reaches a + // leaf message or completes message. + // + // When this method returns current_ points to a message that has been + // finished parsed. + ss::future<> parse_until_current_done() { + if (current_->parser.bytes_left() == 0) { + return ss::now(); + } + // Wrap in ss::repeat so that preemption is possible for big/complex + // protocol buffers + return ss::repeat([this] { + auto tag = read_tag(); + switch (tag.wire_type) { + case wire_type::variant: + read_variant_field(tag.field_number); + break; + case wire_type::i64: + read_fixed64_field(tag.field_number); + break; + case wire_type::length: + read_length_field(tag.field_number); + break; + case wire_type::i32: + read_fixed32_field(tag.field_number); + break; + case wire_type::group_start: + case wire_type::group_end: + throw std::runtime_error(fmt::format( + "legacy proto2 groups not supported (field={})", + tag.field_number)); + } + return current_->parser.bytes_left() > 0 ? ss::stop_iteration::no + : ss::stop_iteration::yes; + }); + } + + void read_variant_field(int32_t field_number) { + auto* field_descriptor = current_->descriptor->FindFieldByNumber( + field_number); + if (field_descriptor == nullptr) { + std::ignore = read_varint(current_->parser); + return; + } + switch (field_descriptor->type()) { + case google::protobuf::FieldDescriptor::TYPE_INT64: + update_field( + *field_descriptor, + static_cast(read_varint(current_->parser))); + break; + case google::protobuf::FieldDescriptor::TYPE_UINT64: + update_field( + *field_descriptor, read_varint(current_->parser)); + break; + case google::protobuf::FieldDescriptor::TYPE_INT32: + update_field( + *field_descriptor, + static_cast(read_varint(current_->parser))); + break; + case google::protobuf::FieldDescriptor::TYPE_SINT32: + update_field( + *field_descriptor, read_varint(current_->parser)); + break; + case google::protobuf::FieldDescriptor::TYPE_SINT64: + update_field( + *field_descriptor, read_varint(current_->parser)); + break; + case google::protobuf::FieldDescriptor::TYPE_BOOL: + update_field( + *field_descriptor, + static_cast(read_varint(current_->parser))); + break; + case google::protobuf::FieldDescriptor::TYPE_UINT32: + update_field( + *field_descriptor, read_varint(current_->parser)); + break; + case google::protobuf::FieldDescriptor::TYPE_ENUM: + update_field( + *field_descriptor, + static_cast(read_varint(current_->parser))); + break; + case google::protobuf::FieldDescriptor::TYPE_STRING: + case google::protobuf::FieldDescriptor::TYPE_GROUP: + case google::protobuf::FieldDescriptor::TYPE_MESSAGE: + case google::protobuf::FieldDescriptor::TYPE_BYTES: + case google::protobuf::FieldDescriptor::TYPE_FIXED64: + case google::protobuf::FieldDescriptor::TYPE_FIXED32: + case google::protobuf::FieldDescriptor::TYPE_SFIXED32: + case google::protobuf::FieldDescriptor::TYPE_SFIXED64: + case google::protobuf::FieldDescriptor::TYPE_DOUBLE: + case google::protobuf::FieldDescriptor::TYPE_FLOAT: + throw std::runtime_error(fmt::format( + "invalid variant type: {}", field_descriptor->type_name())); + } + } + + void read_fixed64_field(int32_t field_number) { + auto* field_descriptor = current_->descriptor->FindFieldByNumber( + field_number); + if (field_descriptor == nullptr) { + current_->parser.skip(sizeof(uint64_t)); + return; + } + switch (field_descriptor->type()) { + case google::protobuf::FieldDescriptor::TYPE_DOUBLE: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_FIXED64: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_SFIXED64: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_FLOAT: + case google::protobuf::FieldDescriptor::TYPE_INT64: + case google::protobuf::FieldDescriptor::TYPE_UINT64: + case google::protobuf::FieldDescriptor::TYPE_INT32: + case google::protobuf::FieldDescriptor::TYPE_FIXED32: + case google::protobuf::FieldDescriptor::TYPE_BOOL: + case google::protobuf::FieldDescriptor::TYPE_STRING: + case google::protobuf::FieldDescriptor::TYPE_GROUP: + case google::protobuf::FieldDescriptor::TYPE_MESSAGE: + case google::protobuf::FieldDescriptor::TYPE_BYTES: + case google::protobuf::FieldDescriptor::TYPE_UINT32: + case google::protobuf::FieldDescriptor::TYPE_ENUM: + case google::protobuf::FieldDescriptor::TYPE_SFIXED32: + case google::protobuf::FieldDescriptor::TYPE_SINT32: + case google::protobuf::FieldDescriptor::TYPE_SINT64: + throw std::runtime_error(fmt::format( + "invalid fixed64 type: {}", field_descriptor->type_name())); + } + } + + void read_fixed32_field(int32_t field_number) { + auto* field_descriptor = current_->descriptor->FindFieldByNumber( + field_number); + if (field_descriptor == nullptr) { + current_->parser.skip(sizeof(uint32_t)); + return; + } + switch (field_descriptor->type()) { + case google::protobuf::FieldDescriptor::TYPE_FLOAT: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_FIXED32: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_SFIXED32: + update_field( + *field_descriptor, current_->parser.consume_type()); + break; + case google::protobuf::FieldDescriptor::TYPE_DOUBLE: + case google::protobuf::FieldDescriptor::TYPE_INT64: + case google::protobuf::FieldDescriptor::TYPE_UINT64: + case google::protobuf::FieldDescriptor::TYPE_INT32: + case google::protobuf::FieldDescriptor::TYPE_FIXED64: + case google::protobuf::FieldDescriptor::TYPE_BOOL: + case google::protobuf::FieldDescriptor::TYPE_STRING: + case google::protobuf::FieldDescriptor::TYPE_GROUP: + case google::protobuf::FieldDescriptor::TYPE_MESSAGE: + case google::protobuf::FieldDescriptor::TYPE_BYTES: + case google::protobuf::FieldDescriptor::TYPE_UINT32: + case google::protobuf::FieldDescriptor::TYPE_ENUM: + case google::protobuf::FieldDescriptor::TYPE_SFIXED64: + case google::protobuf::FieldDescriptor::TYPE_SINT32: + case google::protobuf::FieldDescriptor::TYPE_SINT64: + throw std::runtime_error(fmt::format( + "invalid fixed32 type: {}", field_descriptor->type_name())); + } + } + + template + void update_field(int32_t field_number, T&& value) { + auto* field = current_->descriptor->FindFieldByNumber(field_number); + if (field == nullptr) { + throw std::runtime_error( + fmt::format("unknown field number: {}", field_number)); + } else { + update_field(*field, std::forward(value)); + } + } + + template + void update_field(const pb::FieldDescriptor& field, T&& value) { + if (field.is_repeated()) { + append_repeated_field(field.number(), std::forward(value)); + } else { + set_field(field, std::forward(value)); + } + } + + template + void set_field(const pb::FieldDescriptor& field, T&& value) { + current_->message->fields.insert_or_assign( + field.number(), std::forward(value)); + // Erase all but the last oneof field when there are multiple. + auto* oneof = field.containing_oneof(); + if (oneof == nullptr) { + return; + } + for (int i = 0; i < oneof->field_count(); ++i) { + auto oneof_field = oneof->field(i); + if (oneof_field->number() == field.number()) { + continue; + } + current_->message->fields.erase(oneof_field->number()); + } + } + + template + void append_repeated_field(int32_t field_number, T&& value) { + auto it = current_->message->fields.find(field_number); + if (it == current_->message->fields.end()) { + if constexpr ( + std::is_same_v + || std::is_same_v) { + current_->message->fields.emplace( + field_number, std::forward(value)); + } else { + chunked_vector vec; + vec.push_back(std::forward(value)); + parsed::repeated repeated{std::move(vec)}; + current_->message->fields.emplace( + field_number, std::move(repeated)); + } + } else if constexpr (std::is_same_v) { + auto& map = std::get(it->second); + for (auto& [key, val] : value) { + map.entries.insert_or_assign(std::move(key), std::move(val)); + } + } else { + auto& repeated = std::get(it->second); + if constexpr (std::is_same_v) { + std::visit( + [&value](auto& vec) { + using vec_t = std::decay_t; + auto& elems = std::get(value.elements); + std::move( + std::make_move_iterator(elems.begin()), + std::make_move_iterator(elems.end()), + std::back_inserter(vec)); + }, + repeated.elements); + } else { + auto& vec = std::get>(repeated.elements); + vec.push_back(std::forward(value)); + } + } + } + + void + stage(int32_t field_number, iobuf iobuf, const pb::Descriptor& descriptor) { + current_ = &state_.emplace_back( + field_number, + iobuf_parser(std::move(iobuf)), + std::make_unique(), + &descriptor); + } + + tag read_tag() { + auto tag_value = read_varint(current_->parser); + constexpr uint64_t wire_mask = 0b111; + uint64_t wtype = tag_value & wire_mask; + if (wtype > static_cast(wire_type::max)) { + throw std::runtime_error( + fmt::format("invalid wire type: {}", wtype)); + } + uint64_t field_number = tag_value >> 3ULL; + constexpr auto max_field_number = static_cast( + std::numeric_limits::max()); + if (field_number > max_field_number) { + throw std::runtime_error( + fmt::format("invalid field number: {}", field_number)); + } + return { + static_cast(wtype), static_cast(field_number)}; + } + + void read_packed_elements( + const pb::FieldDescriptor& field, size_t amount, auto reader) { + if (!field.is_packable()) { + throw std::runtime_error( + fmt::format("invalid packed field: {}", field.DebugString())); + } + chunked_vector> vec; + auto target = current_->parser.bytes_consumed() + amount; + while (current_->parser.bytes_consumed() < target) { + vec.push_back(reader()); + } + append_repeated_field(field.number(), parsed::repeated(std::move(vec))); + } + + void read_length_field(int32_t field_number) { + auto length = static_cast( + read_varint(current_->parser)); + if (length < 0) { + throw std::runtime_error( + fmt::format("invalid length field: (length={})", length)); + } + const auto* descriptor = current_->descriptor->FindFieldByNumber( + field_number); + if (!descriptor) { + return; + } + switch (descriptor->type()) { + case pb::FieldDescriptor::TYPE_STRING: + case pb::FieldDescriptor::TYPE_BYTES: { + auto buf = current_->parser.share(length); + update_field(*descriptor, std::move(buf)); + break; + } + case pb::FieldDescriptor::TYPE_MESSAGE: { + auto buf = current_->parser.share(length); + stage( + descriptor->number(), + std::move(buf), + *descriptor->message_type()); + break; + } + // Handle packed fields + case pb::FieldDescriptor::TYPE_FLOAT: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_DOUBLE: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_INT64: + read_packed_elements(*descriptor, length, [this] { + return static_cast( + read_varint(current_->parser)); + }); + break; + case pb::FieldDescriptor::TYPE_UINT64: + read_packed_elements(*descriptor, length, [this] { + return read_varint(current_->parser); + }); + break; + case pb::FieldDescriptor::TYPE_INT32: + read_packed_elements(*descriptor, length, [this] { + return static_cast( + read_varint(current_->parser)); + }); + break; + case pb::FieldDescriptor::TYPE_FIXED64: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_FIXED32: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_BOOL: + read_packed_elements(*descriptor, length, [this] { + return static_cast( + read_varint(current_->parser)); + }); + break; + case pb::FieldDescriptor::TYPE_UINT32: + read_packed_elements(*descriptor, length, [this] { + return read_varint(current_->parser); + }); + break; + case pb::FieldDescriptor::TYPE_SFIXED32: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_SFIXED64: + read_packed_elements(*descriptor, length, [this] { + return current_->parser.consume_type(); + }); + break; + case pb::FieldDescriptor::TYPE_SINT32: + read_packed_elements(*descriptor, length, [this] { + return read_varint(current_->parser); + }); + break; + case pb::FieldDescriptor::TYPE_SINT64: + read_packed_elements(*descriptor, length, [this] { + return read_varint(current_->parser); + }); + break; + case pb::FieldDescriptor::TYPE_ENUM: + read_packed_elements(*descriptor, length, [this] { + return static_cast( + read_varint(current_->parser)); + }); + break; + case pb::FieldDescriptor::TYPE_GROUP: + throw std::runtime_error(fmt::format( + "legacy proto2 groups not supported (field={})", + descriptor->number())); + } + } + + struct state { + int32_t field_number; + iobuf_parser parser; + std::unique_ptr message; + const pb::Descriptor* descriptor; + }; + + chunked_vector state_; + state* current_ = nullptr; +}; + +ss::future> +parse(iobuf iobuf, const pb::Descriptor& descriptor) { + parser p; + co_return co_await p.parse(std::move(iobuf), descriptor); +} + +} // namespace serde::pb diff --git a/src/v/serde/protobuf/parser.h b/src/v/serde/protobuf/parser.h new file mode 100644 index 0000000000000..cc93ce867fb8c --- /dev/null +++ b/src/v/serde/protobuf/parser.h @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "bytes/iobuf.h" +#include "container/chunked_hash_map.h" +#include "container/fragmented_vector.h" + +#include + +namespace google::protobuf { +class Descriptor; +} // namespace google::protobuf + +namespace serde::pb { + +namespace parsed { + +struct message; + +/** + * A dynamic representation of a repeated field in a protobuf message. + */ +struct repeated { + std::variant< + chunked_vector, + chunked_vector, + chunked_vector, // Can be an enum value + chunked_vector, + chunked_vector, + chunked_vector, + chunked_vector, + chunked_vector, // string or bytes + chunked_vector>> + elements; +}; + +/** + * A dynamic representation of a proto3 map. + */ +struct map { + using key = std:: + variant; + + using value = std::variant< + double, + float, + int32_t, // Can be an enum value + int64_t, + uint32_t, + uint64_t, + bool, + iobuf, // string or bytes + std::unique_ptr>; + + // Technically every key and value here are the same type, but that would be + // 72 different combinations of maps here, which seems a bit extreme. + chunked_hash_map entries; +}; + +/** + * A dynamic representation of a message. Supports everything except deprecated + * proto2 groups and the unknown fields that the offical protobuf library + * supports. + */ +struct message { + using field = std::variant< + double, + float, + int32_t, // Can be an enum value + int64_t, + uint32_t, + uint64_t, + bool, + iobuf, // string or bytes + std::unique_ptr, + repeated, + map>; + + // Fields of the message keyed by their field number. + chunked_hash_map fields; +}; + +} // namespace parsed + +/** + * Parse a protobuf from the given bytes using the descriptor provided. + * + * The returned message is a dynamic representation of the protocol buffer, and + * the descriptor will still be needed to full decern the types of the protobuf. + * The returned message in the case of strings or bytes fields will hold a + * reference (via share) to the original input data iobuf. + * + * Messages must be fully accumulated (so no streaming parsing) due to last + * field wins semantics within the protocol buffer specification. + * + * Throws if there are invalid messages. + */ +ss::future> +parse(iobuf, const google::protobuf::Descriptor&); + +} // namespace serde::pb diff --git a/src/v/serde/protobuf/tests/BUILD b/src/v/serde/protobuf/tests/BUILD new file mode 100644 index 0000000000000..f70623f09e1d2 --- /dev/null +++ b/src/v/serde/protobuf/tests/BUILD @@ -0,0 +1,57 @@ +load("//bazel:test.bzl", "redpanda_cc_gtest") + +proto_library( + name = "two_proto", + srcs = ["two.proto"], +) + +cc_proto_library( + name = "two_cc_proto", + deps = [":two_proto"], +) + +proto_library( + name = "three_proto", + srcs = ["three.proto"], +) + +cc_proto_library( + name = "three_cc_proto", + deps = [":three_proto"], +) + +proto_library( + name = "test_messages_edition2023_proto", + srcs = ["test_messages_edition2023.proto"], +) + +cc_proto_library( + name = "test_messages_edition2023_cc_proto", + deps = [":test_messages_edition2023_proto"], +) + +redpanda_cc_gtest( + name = "parser_test", + timeout = "short", + srcs = [ + "parser_test.cc", + ], + deps = [ + ":test_messages_edition2023_cc_proto", + ":three_cc_proto", + ":two_cc_proto", + "//src/v/base", + "//src/v/bytes:iobuf", + "//src/v/bytes:iobuf_parser", + "//src/v/random:generators", + "//src/v/serde/protobuf", + "//src/v/test_utils:gtest", + "@fmt", + "@googletest//:gtest", + "@googletest//:gtest_main", + "@libprotobuf_mutator", + "@protobuf", + "@protobuf//:differencer", + "@seastar", + ], +) diff --git a/src/v/serde/protobuf/tests/parser_test.cc b/src/v/serde/protobuf/tests/parser_test.cc new file mode 100644 index 0000000000000..25dc53062f39d --- /dev/null +++ b/src/v/serde/protobuf/tests/parser_test.cc @@ -0,0 +1,548 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "base/units.h" +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" +#include "gtest/gtest.h" +#include "random/generators.h" +#include "serde/protobuf/parser.h" +// TODO: Fix bazelbuild/bazel#4446 +#include "src/v/serde/protobuf/tests/test_messages_edition2023.pb.h" +#include "src/v/serde/protobuf/tests/three.pb.h" +#include "src/v/serde/protobuf/tests/two.pb.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace serde::pb { +namespace { + +namespace pb = google::protobuf; + +class IgnoreUnknownFields + : public pb::util::MessageDifferencer::IgnoreCriteria { + bool IsIgnored( + const pb::Message& /* message1 */, + const pb::Message& /* message2 */, + const pb::FieldDescriptor* /* field */, + const std::vector< + pb::util::MessageDifferencer::SpecificField>& /* parent_fields */) + override { + return false; + } + bool IsUnknownFieldIgnored( + const pb::Message& /* message1 */, + const pb::Message& /* message2 */, + const pb::util::MessageDifferencer::SpecificField& /* field */, + const std::vector< + pb::util::MessageDifferencer::SpecificField>& /* parent_fields */) + override { + return true; + } +}; + +class ProtobufParserFixture : public testing::Test { +public: + std::unique_ptr construct_message(std::string_view txtpb) { + const static std::unordered_map + global_protos = { + {"two.proto", pbtwo::SearchRequest::descriptor()->file()}, + {"three.proto", pbthree::SearchRequest::descriptor()->file()}, + {"test_messages_edition2023.proto", + protobuf_test_messages::editions::TestAllTypesEdition2023:: + descriptor() + ->file()}, + }; + for (const auto& [name, file] : global_protos) { + auto comment = fmt::format("# proto-file: {}", name); + if (txtpb.find(comment) == std::string_view::npos) { + continue; + } + for (int i = 0; i < file->message_type_count(); ++i) { + auto* descriptor = file->message_type(i); + auto comment = fmt::format( + "# proto-message: {}", descriptor->name()); + if (txtpb.find(comment) != std::string_view::npos) { + return construct_message(descriptor, txtpb); + } + } + throw std::runtime_error("unable to find proto message"); + } + throw std::runtime_error("unable to find proto file"); + } + + std::unique_ptr construct_message( + const pb::Descriptor* descriptor, std::string_view txtpb) { + const pb::Message* prototype = factory_.GetPrototype(descriptor); + auto output = std::unique_ptr(prototype->New()); + if (!pb::TextFormat::ParseFromString(txtpb, output.get())) { + throw std::runtime_error("Parsing txtpb failed"); + } + return output; + } + + testing::AssertionResult parse(std::string_view txtpb) { + auto expected = construct_message(txtpb); + iobuf out; + auto binpb = expected->SerializeAsString(); + out.append(binpb.data(), binpb.size()); + auto parsed = ::serde::pb::parse( + std::move(out), *expected->GetDescriptor()) + .get(); + auto actual = convert_to_protobuf(*parsed, expected->GetDescriptor()); + return proto_equals(*actual, *expected); + } + + template + testing::AssertionResult parse_as(std::string_view txtpb) { + auto original = construct_message(txtpb); + iobuf out; + auto binpb = original->SerializeAsString(); + out.append(binpb.data(), binpb.size()); + auto parsed + = ::serde::pb::parse(std::move(out), *T::descriptor()).get(); + auto actual = convert_to_protobuf(*parsed, T::descriptor()); + T expected; + if (!expected.ParseFromString(binpb)) { + return testing::AssertionFailure() << "Not wire compatible"; + } + return proto_equals(*actual, expected); + } + + template + testing::AssertionResult + parse_merged_as(const std::vector& txtpbs) { + std::vector> messages; + std::string binpb; + for (const auto& txtpb : txtpbs) { + auto original = construct_message(txtpb); + binpb += original->SerializeAsString(); + } + iobuf out; + out.append(binpb.data(), binpb.size()); + auto parsed + = ::serde::pb::parse(std::move(out), *T::descriptor()).get(); + auto actual = convert_to_protobuf(*parsed, T::descriptor()); + T expected; + if (!expected.ParseFromString(binpb)) { + return testing::AssertionFailure() << "Not wire compatible"; + } + return proto_equals(*actual, expected); + } + + testing::AssertionResult + proto_equals(const pb::Message& actual, const pb::Message& expected) { + pb::util::MessageDifferencer differ; + std::string diff; + differ.ReportDifferencesToString(&diff); + // Currently, we don't track unknown fields - so ignore them. + differ.AddIgnoreCriteria(std::make_unique()); + if (differ.Compare(actual, expected)) { + return testing::AssertionSuccess(); + } else { + return testing::AssertionFailure() << diff; + } + } + + std::unique_ptr convert_to_protobuf( + const parsed::message& msg, const pb::Descriptor* desc) { + auto output = std::unique_ptr( + factory_.GetPrototype(desc)->New()); + for (const auto& [index, value] : msg.fields) { + validate_oneof(msg, index, desc); + convert_to_protobuf(index, value, output.get()); + } + return output; + } + +private: + void validate_oneof( + const parsed::message& msg, int32_t num, const pb::Descriptor* desc) { + auto field = desc->FindFieldByNumber(num); + auto oneof = field->containing_oneof(); + if (oneof == nullptr) { + return; + } + std::set set; + for (int32_t i = 0; i < oneof->field_count(); ++i) { + auto discrimiant = oneof->field(i); + if (msg.fields.contains(discrimiant->number())) { + set.insert(discrimiant->number()); + } + } + if (set.size() > 1) { + throw std::runtime_error(fmt::format( + "multiple oneof values ({}) set: {}", + oneof->name(), + fmt::join(set, ", "))); + } + } + + void convert_to_protobuf( + int32_t field_number, + const parsed::message::field& value, + pb::Message* output) { + auto* reflect = output->GetReflection(); + auto* field = output->GetDescriptor()->FindFieldByNumber(field_number); + ss::visit( + value, + [=](double v) { reflect->SetDouble(output, field, v); }, + [=](float v) { reflect->SetFloat(output, field, v); }, + [=](int32_t v) { + if (field->enum_type()) { + reflect->SetEnumValue(output, field, v); + } else { + reflect->SetInt32(output, field, v); + } + }, + [=](int64_t v) { reflect->SetInt64(output, field, v); }, + [=](uint32_t v) { reflect->SetUInt32(output, field, v); }, + [=](uint64_t v) { reflect->SetUInt64(output, field, v); }, + [=](bool v) { reflect->SetBool(output, field, v); }, + [=, this](const std::unique_ptr& v) { + auto pbmsg = convert_to_protobuf(*v, field->message_type()); + reflect->SetAllocatedMessage(output, pbmsg.release(), field); + }, + [=, this](const parsed::repeated& v) { + convert_to_protobuf(v, field, output); + }, + [=](const iobuf& v) { + iobuf_const_parser parser(v); + auto str = parser.read_string(v.size_bytes()); + reflect->SetString(output, field, std::move(str)); + }, + [=, this](const parsed::map& v) { + auto pbmap = convert_to_protobuf(v, field->message_type()); + reflect->SetAllocatedMessage(output, pbmap.release(), field); + }); + } + + void convert_to_protobuf( + const parsed::repeated& list, + const pb::FieldDescriptor* field, + pb::Message* output) { + auto* reflect = output->GetReflection(); + ss::visit( + list.elements, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddDouble(output, field, v); + } + }, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddFloat(output, field, v); + } + }, + [=](const chunked_vector& vec) { + if (field->enum_type()) { + for (auto v : vec) { + reflect->AddEnumValue(output, field, v); + } + } else { + for (auto v : vec) { + reflect->AddInt32(output, field, v); + } + } + }, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddInt64(output, field, v); + } + }, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddUInt32(output, field, v); + } + }, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddUInt64(output, field, v); + } + }, + [=](const chunked_vector& vec) { + for (auto v : vec) { + reflect->AddBool(output, field, v); + } + }, + [=](const chunked_vector& vec) { + for (const auto& v : vec) { + iobuf_const_parser parser(v); + auto str = parser.read_string(v.size_bytes()); + reflect->AddString(output, field, std::move(str)); + } + }, + [=, + this](const chunked_vector>& vec) { + for (const auto& v : vec) { + auto msg = convert_to_protobuf(*v, field->message_type()); + reflect->AddAllocatedMessage(output, field, msg.release()); + } + }); + } + + std::unique_ptr + convert_to_protobuf(const parsed::map& map, const pb::Descriptor* desc) { + auto output = std::unique_ptr( + factory_.GetPrototype(desc)->New()); + for (const auto& [k, v] : map.entries) { + auto k_field = ss::visit( + k, + [](const iobuf& v) { return parsed::message::field(v.copy()); }, + [](const auto& v) { return parsed::message::field(v); }); + convert_to_protobuf(1, k_field, output.get()); + ss::visit( + v, + [&, this](const iobuf& v) { + convert_to_protobuf( + 2, parsed::message::field(v.copy()), output.get()); + }, + [&](const std::unique_ptr& v) { + auto field = desc->FindFieldByNumber(2); + auto msg = convert_to_protobuf(*v, field->message_type()); + output->GetReflection()->SetAllocatedMessage( + output.get(), msg.release(), field); + }, + [&, this](const auto& v) { + convert_to_protobuf( + 2, parsed::message::field(v), output.get()); + }); + } + return output; + } + + pb::DynamicMessageFactory factory_; +}; + +TEST_F(ProtobufParserFixture, Works) { + EXPECT_TRUE(parse(R"( +# proto-file: three.proto +# proto-message: SearchRequest +query: "what's the best kafka alternative?" +page_number: 0 +results_per_page: 100 +)")); +} + +TEST_F(ProtobufParserFixture, EmptyProto) { + EXPECT_TRUE(parse(R"( +# proto-file: three.proto +# proto-message: SearchRequest +)")); +} + +TEST_F(ProtobufParserFixture, NestedProto) { + EXPECT_TRUE(parse(R"( +# proto-file: three.proto +# proto-message: SearchResponse +results: { + url: "http://redpanda.com" + title: "fastest queue in the west" + snippets: "fastest" + snippets: "queue" + snippets: "kafka alternative" +} +results: { + url: "http://docs.redpanda.com" + title: "Redpanda the best kafka alternative" + snippets: "kafka" + snippets: "alternative" +} +results: { + url: "http://redpanda.com/blog/hello-world" + title: "Introducing the best Apache Kafka Alternative - Redpanda" +} +)")); +} + +TEST_F(ProtobufParserFixture, RecursiveProto) { + EXPECT_TRUE(parse(R"( +# proto-file: three.proto +# proto-message: Node +left: { + left: { + value: 99 + } + right: { + value: 101 + } +} +right: { + value: 3000 +} +)")); +} + +TEST_F(ProtobufParserFixture, Compatibility) { + EXPECT_TRUE(parse_as(R"( +# proto-file: three.proto +# proto-message: Version1 +test: 9223372036854775807 +)")); + EXPECT_TRUE(parse_as(R"( +# proto-file: three.proto +# proto-message: Version1 +test: 9223372036854775807 +)")); + EXPECT_TRUE(parse_as(R"( +# proto-file: three.proto +# proto-message: Version1 +test: 9223372036854775807 +)")); + EXPECT_TRUE(parse_as(R"( +# proto-file: three.proto +# proto-message: Version3 +foo: "hello" +data: true +)")); + EXPECT_TRUE(parse_as(R"( +# proto-file: three.proto +# proto-message: Version2 +test: -1 +foo: "hello" +)")); +} + +TEST_F(ProtobufParserFixture, OneOf) { + EXPECT_TRUE(parse_merged_as({ + R"( +# proto-file: three.proto +# proto-message: Version4 +foo: "hello" +)", + R"( +# proto-file: three.proto +# proto-message: Version4 +data: true +)", + })); + EXPECT_TRUE(parse_merged_as({ + R"( +# proto-file: three.proto +# proto-message: Version4 +data: true +)", + R"( +# proto-file: three.proto +# proto-message: Version4 +foo: "hello" +)", + })); +} + +TEST_F(ProtobufParserFixture, NegativeEnums) { + EXPECT_TRUE(parse(R"( +# proto-file: test_messages_edition2023.proto +# proto-message: TestAllTypesEdition2023 +optional_nested_enum: NEG +)")); +} + +TEST_F(ProtobufParserFixture, NegativeInts) { + // Negative numbers are sign extended and written as u64 + EXPECT_TRUE(parse(R"( +# proto-file: test_messages_edition2023.proto +# proto-message: TestAllTypesEdition2023 +optional_int32: -1 +)")); + EXPECT_TRUE(parse(R"( +# proto-file: test_messages_edition2023.proto +# proto-message: TestAllTypesEdition2023 +optional_sint32: -1 +)")); + EXPECT_TRUE(parse(R"( +# proto-file: test_messages_edition2023.proto +# proto-message: TestAllTypesEdition2023 +optional_sfixed32: -1 +)")); +} + +TEST_F(ProtobufParserFixture, RandomData) { + protobuf_test_messages::editions::TestAllTypesEdition2023 kitchen_sink; + constexpr size_t size = 4_KiB; + constexpr int num_iterations = 10; + for (int i = 0; i < num_iterations; ++i) { + std::string buf; + buf.resize(size); + random_generators::fill_buffer_randomchars(buf.data(), buf.size()); + iobuf b; + b.append(buf.data(), buf.size()); + if (kitchen_sink.ParseFromString(buf)) { + EXPECT_NO_THROW( + ::serde::pb::parse(std::move(b), *kitchen_sink.descriptor()) + .get()); + } else { + EXPECT_ANY_THROW( + ::serde::pb::parse(std::move(b), *kitchen_sink.descriptor()) + .get()); + } + } + // TODO: Write more tests on invalid data +} + +class ProtobufParserFuzzer : public ProtobufParserFixture { +public: + ProtobufParserFuzzer() { mutator_.Seed(testing::FLAGS_gtest_random_seed); } + + void mutate(pb::Message* msg) { mutator_.Mutate(msg, 3_MiB); } + +private: + protobuf_mutator::Mutator mutator_; +}; + +TEST_F(ProtobufParserFuzzer, AllTypes) { + protobuf_test_messages::editions::TestAllTypesEdition2023 kitchen_sink; + constexpr int num_iterations = 10; + for (int i = 0; i < num_iterations; ++i) { + mutate(&kitchen_sink); + auto binpb = kitchen_sink.SerializeAsString(); + iobuf buf; + buf.append(binpb.data(), binpb.size()); + auto parsed = ::serde::pb::parse( + std::move(buf), *kitchen_sink.descriptor()) + .get(); + auto msg = convert_to_protobuf(*parsed, kitchen_sink.descriptor()); + EXPECT_TRUE(proto_equals(*msg, kitchen_sink)); + } +} + +TEST_F(ProtobufParserFuzzer, AllTypesMerged) { + protobuf_test_messages::editions::TestAllTypesEdition2023 kitchen_sink; + constexpr int num_iterations = 10; + std::string merged_binpb; + for (int i = 0; i < num_iterations; ++i) { + mutate(&kitchen_sink); + merged_binpb += kitchen_sink.SerializeAsString(); + kitchen_sink.Clear(); + } + iobuf buf; + buf.append(merged_binpb.data(), merged_binpb.size()); + auto parsed + = ::serde::pb::parse(std::move(buf), *kitchen_sink.descriptor()).get(); + auto msg = convert_to_protobuf(*parsed, kitchen_sink.descriptor()); + EXPECT_TRUE(kitchen_sink.ParseFromString(merged_binpb)); + EXPECT_TRUE(proto_equals(*msg, kitchen_sink)); +} + +} // namespace +} // namespace serde::pb diff --git a/src/v/serde/protobuf/tests/test_messages_edition2023.proto b/src/v/serde/protobuf/tests/test_messages_edition2023.proto new file mode 100644 index 0000000000000..c9f194750fcf1 --- /dev/null +++ b/src/v/serde/protobuf/tests/test_messages_edition2023.proto @@ -0,0 +1,183 @@ +// Protocol Buffers - Google's data interchange format +// Copyright 2023 Google LLC. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +// Editions are still experimental, so use proto3 version of this file. +// edition = "2023"; +syntax = "proto3"; + +package protobuf_test_messages.editions; + +option java_package = "com.google.protobuf_test_messages.edition2023"; +option java_multiple_files = true; +option objc_class_prefix = "Editions"; + +message TestAllTypesEdition2023 { + message NestedMessage { + int32 a = 1; + TestAllTypesEdition2023 corecursive = 2; + } + + enum NestedEnum { + FOO = 0; + BAR = 1; + BAZ = 2; + NEG = -1; // Intentionally negative. + } + + // Singular + int32 optional_int32 = 1; + int64 optional_int64 = 2; + uint32 optional_uint32 = 3; + uint64 optional_uint64 = 4; + sint32 optional_sint32 = 5; + sint64 optional_sint64 = 6; + fixed32 optional_fixed32 = 7; + fixed64 optional_fixed64 = 8; + sfixed32 optional_sfixed32 = 9; + sfixed64 optional_sfixed64 = 10; + float optional_float = 11; + double optional_double = 12; + bool optional_bool = 13; + string optional_string = 14; + bytes optional_bytes = 15; + + NestedMessage optional_nested_message = 18; + ForeignMessageEdition2023 optional_foreign_message = 19; + + NestedEnum optional_nested_enum = 21; + ForeignEnumEdition2023 optional_foreign_enum = 22; + + string optional_string_piece = 24 [ctype = STRING_PIECE]; + string optional_cord = 25 [ctype = CORD]; + + TestAllTypesEdition2023 recursive_message = 27; + + // Repeated + repeated int32 repeated_int32 = 31; + repeated int64 repeated_int64 = 32; + repeated uint32 repeated_uint32 = 33; + repeated uint64 repeated_uint64 = 34; + repeated sint32 repeated_sint32 = 35; + repeated sint64 repeated_sint64 = 36; + repeated fixed32 repeated_fixed32 = 37; + repeated fixed64 repeated_fixed64 = 38; + repeated sfixed32 repeated_sfixed32 = 39; + repeated sfixed64 repeated_sfixed64 = 40; + repeated float repeated_float = 41; + repeated double repeated_double = 42; + repeated bool repeated_bool = 43; + repeated string repeated_string = 44; + repeated bytes repeated_bytes = 45; + + repeated NestedMessage repeated_nested_message = 48; + repeated ForeignMessageEdition2023 repeated_foreign_message = 49; + + repeated NestedEnum repeated_nested_enum = 51; + repeated ForeignEnumEdition2023 repeated_foreign_enum = 52; + + repeated string repeated_string_piece = 54 [ctype = STRING_PIECE]; + repeated string repeated_cord = 55 [ctype = CORD]; + + // Packed + repeated int32 packed_int32 = 75 [packed = true]; + repeated int64 packed_int64 = 76 [packed = true]; + repeated uint32 packed_uint32 = 77 [packed = true]; + repeated uint64 packed_uint64 = 78 [packed = true]; + repeated sint32 packed_sint32 = 79 [packed = true]; + repeated sint64 packed_sint64 = 80 [packed = true]; + repeated fixed32 packed_fixed32 = 81 [packed = true]; + repeated fixed64 packed_fixed64 = 82 [packed = true]; + repeated sfixed32 packed_sfixed32 = 83 [packed = true]; + repeated sfixed64 packed_sfixed64 = 84 [packed = true]; + repeated float packed_float = 85 [packed = true]; + repeated double packed_double = 86 [packed = true]; + repeated bool packed_bool = 87 [packed = true]; + repeated NestedEnum packed_nested_enum = 88 [packed = true]; + + // Unpacked + repeated int32 unpacked_int32 = 89 [packed = false]; + repeated int64 unpacked_int64 = 90 [packed = false]; + repeated uint32 unpacked_uint32 = 91 [packed = false]; + repeated uint64 unpacked_uint64 = 92 [packed = false]; + repeated sint32 unpacked_sint32 = 93 [packed = false]; + repeated sint64 unpacked_sint64 = 94 [packed = false]; + repeated fixed32 unpacked_fixed32 = 95 [packed = false]; + repeated fixed64 unpacked_fixed64 = 96 [packed = false]; + repeated sfixed32 unpacked_sfixed32 = 97 [packed = false]; + repeated sfixed64 unpacked_sfixed64 = 98 [packed = false]; + repeated float unpacked_float = 99 [packed = false]; + repeated double unpacked_double = 100 [packed = false]; + repeated bool unpacked_bool = 101 [packed = false]; + repeated NestedEnum unpacked_nested_enum = 102 [packed = false]; + + // Map + map map_int32_int32 = 56; + map map_int64_int64 = 57; + map map_uint32_uint32 = 58; + map map_uint64_uint64 = 59; + map map_sint32_sint32 = 60; + map map_sint64_sint64 = 61; + map map_fixed32_fixed32 = 62; + map map_fixed64_fixed64 = 63; + map map_sfixed32_sfixed32 = 64; + map map_sfixed64_sfixed64 = 65; + map map_int32_float = 66; + map map_int32_double = 67; + map map_bool_bool = 68; + map map_string_string = 69; + map map_string_bytes = 70; + map map_string_nested_message = 71; + map map_string_foreign_message = 72; + map map_string_nested_enum = 73; + map map_string_foreign_enum = 74; + + oneof oneof_field { + uint32 oneof_uint32 = 111; + NestedMessage oneof_nested_message = 112; + string oneof_string = 113; + bytes oneof_bytes = 114; + bool oneof_bool = 115; + uint64 oneof_uint64 = 116; + float oneof_float = 117; + double oneof_double = 118; + NestedEnum oneof_enum = 119; + } + + // extensions + // extensions 120 to 200; + + // groups + message GroupLikeType { + int32 group_int32 = 202; + uint32 group_uint32 = 203; + } + GroupLikeType groupliketype = 201; + GroupLikeType delimited_field = 202; +} + +message ForeignMessageEdition2023 { + int32 c = 1; +} + +enum ForeignEnumEdition2023 { + FOREIGN_FOO = 0; + FOREIGN_BAR = 1; + FOREIGN_BAZ = 2; +} + +// extend TestAllTypesEdition2023 { +// int32 extension_int32 = 120; +// } + +message GroupLikeType { + int32 c = 1; +} + +// extend TestAllTypesEdition2023 { +// GroupLikeType groupliketype = 121; +// GroupLikeType delimited_ext = 122; +// } diff --git a/src/v/serde/protobuf/tests/three.proto b/src/v/serde/protobuf/tests/three.proto new file mode 100644 index 0000000000000..f35a8a1fec458 --- /dev/null +++ b/src/v/serde/protobuf/tests/three.proto @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +syntax = "proto3"; + +package pbthree; + +message SearchRequest { + string query = 1; + int32 page_number = 2; + int32 results_per_page = 3; +} + +message SearchResponse { + message Result { + string url = 1; + string title = 2; + repeated string snippets = 3; + } + repeated Result results = 1; +} + +message Node { + fixed32 value = 1; + Node left = 2; + Node right = 3; +} + +message Message1 {} + +message Message2 { + Message1 foo = 1; +} + +message Message3 { + Message2 bar = 1; +} + +message Version1 { + int64 test = 1; +} + +message Version2 { + int64 test = 1; + string foo = 2; +} + +message Version3 { + string foo = 2; + bool data = 3; +} + +message Version4 { + oneof justone { + string foo = 2; + bool data = 3; + } +} diff --git a/src/v/serde/protobuf/tests/two.proto b/src/v/serde/protobuf/tests/two.proto new file mode 100644 index 0000000000000..c703bc620036f --- /dev/null +++ b/src/v/serde/protobuf/tests/two.proto @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +syntax = "proto2"; + +package pbtwo; + +message SearchRequest { + optional string query = 1; + optional int32 page_number = 2; + optional int32 results_per_page = 3; +}