From a2a9d6c50c2d730be947b860614f371624a6f0b7 Mon Sep 17 00:00:00 2001 From: Ash Vardanian <1983160+ashvardanian@users.noreply.github.com> Date: Sat, 25 Jan 2025 19:26:46 +0000 Subject: [PATCH] Fix: `liburing` example Current variant results in a very high drop rate, but still hash a great throughput. We had to temporarily replace batch capable `io_uring_wait_cqes` with a much simpler blocking `io_uring_wait_cqe`. --- less_slow.cpp | 77 ++++++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/less_slow.cpp b/less_slow.cpp index bf1d1e1..d834935 100644 --- a/less_slow.cpp +++ b/less_slow.cpp @@ -5210,7 +5210,7 @@ enum class networking_route_t { loopback_k, public_k }; */ constexpr std::size_t rpc_mtu_k = 1460; using rpc_buffer_t = std::array; -constexpr uint16_t rpc_server_port_k = 12345; +constexpr uint16_t rpc_port_k = 12345; auto to_microseconds(auto duration) { return std::chrono::duration_cast(duration); } @@ -5464,8 +5464,8 @@ static void rpc(bm::State &state, networking_route_t route, std::size_t batch_si std::string address_to_talk = route == networking_route_t::loopback_k ? "127.0.0.1" : fetch_public_ip(); // Create server and client - server_t server(address_to_listen, rpc_server_port_k, batch_size); - client_t client(address_to_talk, rpc_server_port_k, batch_size); + server_t server(address_to_listen, rpc_port_k, batch_size); + client_t client(address_to_talk, rpc_port_k, batch_size); std::thread server_thread(std::ref(server)); std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -5519,12 +5519,15 @@ BENCHMARK_CAPTURE(rpc_libc, public, networking_route_t::public_k, // * Uses: * - `IORING_OP_RECVMSG`, `IORING_OP_SENDMSG`: since 5.3 * - `IORING_OP_LINK_TIMEOUT`: since 5.5 + * - `IORING_OP_TIMEOUT`: since 5.4 * * @see Opcode docs: https://man7.org/linux/man-pages/man2/io_uring_enter2.2.html * * TODO: Future work may include: * - `io_uring_prep_recvmsg_multishot` * - `io_uring_prep_sendmsg_zc` + * - IOSQE_IO_DRAIN - put barrier between batches + * - IOSQE_CQE_SKIP_SUCCESS - to skip timeouts? */ #include // `io_uring` @@ -5626,7 +5629,6 @@ class rpc_uring_server { io_uring_sqe_set_flags(send_entry, IOSQE_IO_LINK); // Don't receive next before we send :) // Prepare next receive - memset(&addressed_buffer.header, 0, sizeof(addressed_buffer.header)); struct io_uring_sqe *receive_entry = io_uring_get_sqe(&ring_); io_uring_prep_recvmsg(receive_entry, socket_descriptor_, &addressed_buffer.header, 0); io_uring_sqe_set_data(receive_entry, &addressed_buffer); @@ -5658,7 +5660,8 @@ class rpc_uring_client { // Pre-allocated resources std::vector addressed_buffers_; - addressed_buffer_t timeout_resource_; + addressed_buffer_t packet_timeout_handle_; + addressed_buffer_t batch_timeout_handle_; public: rpc_uring_client(std::string const &server_addr, std::uint16_t port, std::size_t concurrency) @@ -5673,7 +5676,8 @@ class rpc_uring_client { server_address_.sin_port = htons(port); // Initialize io_uring with one slot for each send/receive/timeout operation - if (io_uring_queue_init(concurrency * 3, &ring_, 0) < 0) raise_system_error("Failed to initialize io_uring"); + if (io_uring_queue_init(concurrency * 3 + 1, &ring_, 0) < 0) + raise_system_error("Failed to initialize io_uring"); // Initialize message resources for (addressed_buffer_t &addressed_buffer : addressed_buffers_) { @@ -5715,8 +5719,6 @@ class rpc_uring_client { packet_timeout.tv_sec = static_cast<__s64>(packet_ns.count() / 1'000'000'000); packet_timeout.tv_nsec = static_cast<__s64>(packet_ns.count() % 1'000'000'000); } - batch_timeout.tv_sec = 1; - packet_timeout.tv_sec = 1; // Submit tasks for (auto &res : addressed_buffers_) { @@ -5738,9 +5740,15 @@ class rpc_uring_client { // Timeout operation submitted_entry = io_uring_get_sqe(&ring_); io_uring_prep_link_timeout(submitted_entry, &packet_timeout, IORING_TIMEOUT_BOOTTIME); - io_uring_sqe_set_data(submitted_entry, &timeout_resource_); + io_uring_sqe_set_data(submitted_entry, &packet_timeout_handle_); result.sent_packets++; } + // A single timeout for all requests + { + auto *submitted_entry = io_uring_get_sqe(&ring_); + io_uring_prep_timeout(submitted_entry, &batch_timeout, 0, IORING_TIMEOUT_BOOTTIME); + io_uring_sqe_set_data(submitted_entry, &batch_timeout_handle_); + } io_uring_submit(&ring_); // Wait until all packets are received or the batch times out @@ -5748,33 +5756,26 @@ class rpc_uring_client { while (result.received_packets + failed_packets < result.sent_packets && std::chrono::steady_clock::now() < batch_deadline) { - // Process the replies in batches to minimize the cost of synchronization and timing - constexpr std::size_t completed_entries_batch_k = 64; - struct io_uring_cqe *completed_entries[completed_entries_batch_k]; - int completed_count = - io_uring_wait_cqes(&ring_, &completed_entries[0], completed_entries_batch_k, &batch_timeout, NULL); - if (completed_count < 0) continue; - - for (int i = 0; i != completed_count; ++i) { - struct io_uring_cqe *completed_entry = completed_entries[i]; - addressed_buffer_t &addressed_buffer = - *static_cast(io_uring_cqe_get_data(completed_entry)); - int completion_code = completed_entry->res; - if (&addressed_buffer == &timeout_resource_) { continue; } // We don't care about timeouts - else if (completion_code < 0) { failed_packets++; } // Failed operation - else { // Successful operation - if (addressed_buffer.status == message_status_t::pending) { - addressed_buffer.status = message_status_t::sent; - } - else { // Received a reply: - auto now = std::chrono::steady_clock::now(); - auto diff = now - addressed_buffer.send_time; - result.batch_latency += diff; - result.max_packet_latency = std::max(result.max_packet_latency, diff); - result.received_packets++; - } + struct io_uring_cqe *completed_entry; + int completion_code = io_uring_wait_cqe(&ring_, &completed_entry); + addressed_buffer_t &addressed_buffer = + *static_cast(io_uring_cqe_get_data(completed_entry)); + io_uring_cqe_seen(&ring_, completed_entry); + + if (&addressed_buffer == &packet_timeout_handle_) { continue; } // We don't care about timeouts + else if (&addressed_buffer == &batch_timeout_handle_) { break; } // Time to exit! + else if (completion_code < 0) { failed_packets++; } // Failed operation + else { // Successful operation + if (addressed_buffer.status == message_status_t::pending) { + addressed_buffer.status = message_status_t::sent; + } + else { // Received a reply: + auto now = std::chrono::steady_clock::now(); + auto diff = now - addressed_buffer.send_time; + result.batch_latency += diff; + result.max_packet_latency = std::max(result.max_packet_latency, diff); + result.received_packets++; } - io_uring_cqe_seen(&ring_, completed_entries[i]); } } @@ -5928,14 +5929,14 @@ static void rpc_asio( // bm::State &state, std::string const &address, // std::size_t batch_size, std::size_t packet_size, std::chrono::microseconds timeout) { - constexpr std::uint16_t rpc_server_port_k = 12345; + constexpr std::uint16_t rpc_port_k = 12345; // Create server and client asio::io_context server_context; asio::io_context client_context; - rpc_asio_server server(server_context, address, rpc_server_port_k, batch_size, timeout); - rpc_asio_client client(client_context, address, rpc_server_port_k, batch_size, timeout); + rpc_asio_server server(server_context, address, rpc_port_k, batch_size, timeout); + rpc_asio_client client(client_context, address, rpc_port_k, batch_size, timeout); // The order of the following thread-initializations is important std::thread server_context_thread([&]() { server_context.run(); });