Skip to content

Commit

Permalink
Fix: liburing example
Browse files Browse the repository at this point in the history
Current variant results in a very high drop
rate, but still hash a great throughput.

We had to temporarily replace batch
capable `io_uring_wait_cqes` with a much
simpler blocking `io_uring_wait_cqe`.
  • Loading branch information
ashvardanian committed Jan 25, 2025
1 parent cae4175 commit a2a9d6c
Showing 1 changed file with 39 additions and 38 deletions.
77 changes: 39 additions & 38 deletions less_slow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5210,7 +5210,7 @@ enum class networking_route_t { loopback_k, public_k };
*/
constexpr std::size_t rpc_mtu_k = 1460;
using rpc_buffer_t = std::array<char, rpc_mtu_k>;
constexpr uint16_t rpc_server_port_k = 12345;
constexpr uint16_t rpc_port_k = 12345;

auto to_microseconds(auto duration) { return std::chrono::duration_cast<std::chrono::microseconds>(duration); }

Expand Down Expand Up @@ -5464,8 +5464,8 @@ static void rpc(bm::State &state, networking_route_t route, std::size_t batch_si
std::string address_to_talk = route == networking_route_t::loopback_k ? "127.0.0.1" : fetch_public_ip();

// Create server and client
server_t server(address_to_listen, rpc_server_port_k, batch_size);
client_t client(address_to_talk, rpc_server_port_k, batch_size);
server_t server(address_to_listen, rpc_port_k, batch_size);
client_t client(address_to_talk, rpc_port_k, batch_size);

std::thread server_thread(std::ref(server));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down Expand Up @@ -5519,12 +5519,15 @@ BENCHMARK_CAPTURE(rpc_libc, public, networking_route_t::public_k, //
* Uses:
* - `IORING_OP_RECVMSG`, `IORING_OP_SENDMSG`: since 5.3
* - `IORING_OP_LINK_TIMEOUT`: since 5.5
* - `IORING_OP_TIMEOUT`: since 5.4
*
* @see Opcode docs: https://man7.org/linux/man-pages/man2/io_uring_enter2.2.html
*
* TODO: Future work may include:
* - `io_uring_prep_recvmsg_multishot`
* - `io_uring_prep_sendmsg_zc`
* - IOSQE_IO_DRAIN - put barrier between batches
* - IOSQE_CQE_SKIP_SUCCESS - to skip timeouts?
*/
#include <liburing.h> // `io_uring`

Expand Down Expand Up @@ -5626,7 +5629,6 @@ class rpc_uring_server {
io_uring_sqe_set_flags(send_entry, IOSQE_IO_LINK); // Don't receive next before we send :)

// Prepare next receive
memset(&addressed_buffer.header, 0, sizeof(addressed_buffer.header));
struct io_uring_sqe *receive_entry = io_uring_get_sqe(&ring_);
io_uring_prep_recvmsg(receive_entry, socket_descriptor_, &addressed_buffer.header, 0);
io_uring_sqe_set_data(receive_entry, &addressed_buffer);
Expand Down Expand Up @@ -5658,7 +5660,8 @@ class rpc_uring_client {

// Pre-allocated resources
std::vector<addressed_buffer_t> addressed_buffers_;
addressed_buffer_t timeout_resource_;
addressed_buffer_t packet_timeout_handle_;
addressed_buffer_t batch_timeout_handle_;

public:
rpc_uring_client(std::string const &server_addr, std::uint16_t port, std::size_t concurrency)
Expand All @@ -5673,7 +5676,8 @@ class rpc_uring_client {
server_address_.sin_port = htons(port);

// Initialize io_uring with one slot for each send/receive/timeout operation
if (io_uring_queue_init(concurrency * 3, &ring_, 0) < 0) raise_system_error("Failed to initialize io_uring");
if (io_uring_queue_init(concurrency * 3 + 1, &ring_, 0) < 0)
raise_system_error("Failed to initialize io_uring");

// Initialize message resources
for (addressed_buffer_t &addressed_buffer : addressed_buffers_) {
Expand Down Expand Up @@ -5715,8 +5719,6 @@ class rpc_uring_client {
packet_timeout.tv_sec = static_cast<__s64>(packet_ns.count() / 1'000'000'000);
packet_timeout.tv_nsec = static_cast<__s64>(packet_ns.count() % 1'000'000'000);
}
batch_timeout.tv_sec = 1;
packet_timeout.tv_sec = 1;

// Submit tasks
for (auto &res : addressed_buffers_) {
Expand All @@ -5738,43 +5740,42 @@ class rpc_uring_client {
// Timeout operation
submitted_entry = io_uring_get_sqe(&ring_);
io_uring_prep_link_timeout(submitted_entry, &packet_timeout, IORING_TIMEOUT_BOOTTIME);
io_uring_sqe_set_data(submitted_entry, &timeout_resource_);
io_uring_sqe_set_data(submitted_entry, &packet_timeout_handle_);
result.sent_packets++;
}
// A single timeout for all requests
{
auto *submitted_entry = io_uring_get_sqe(&ring_);
io_uring_prep_timeout(submitted_entry, &batch_timeout, 0, IORING_TIMEOUT_BOOTTIME);
io_uring_sqe_set_data(submitted_entry, &batch_timeout_handle_);
}
io_uring_submit(&ring_);

// Wait until all packets are received or the batch times out
std::size_t failed_packets = 0;
while (result.received_packets + failed_packets < result.sent_packets &&
std::chrono::steady_clock::now() < batch_deadline) {

// Process the replies in batches to minimize the cost of synchronization and timing
constexpr std::size_t completed_entries_batch_k = 64;
struct io_uring_cqe *completed_entries[completed_entries_batch_k];
int completed_count =
io_uring_wait_cqes(&ring_, &completed_entries[0], completed_entries_batch_k, &batch_timeout, NULL);
if (completed_count < 0) continue;

for (int i = 0; i != completed_count; ++i) {
struct io_uring_cqe *completed_entry = completed_entries[i];
addressed_buffer_t &addressed_buffer =
*static_cast<addressed_buffer_t *>(io_uring_cqe_get_data(completed_entry));
int completion_code = completed_entry->res;
if (&addressed_buffer == &timeout_resource_) { continue; } // We don't care about timeouts
else if (completion_code < 0) { failed_packets++; } // Failed operation
else { // Successful operation
if (addressed_buffer.status == message_status_t::pending) {
addressed_buffer.status = message_status_t::sent;
}
else { // Received a reply:
auto now = std::chrono::steady_clock::now();
auto diff = now - addressed_buffer.send_time;
result.batch_latency += diff;
result.max_packet_latency = std::max(result.max_packet_latency, diff);
result.received_packets++;
}
struct io_uring_cqe *completed_entry;
int completion_code = io_uring_wait_cqe(&ring_, &completed_entry);
addressed_buffer_t &addressed_buffer =
*static_cast<addressed_buffer_t *>(io_uring_cqe_get_data(completed_entry));
io_uring_cqe_seen(&ring_, completed_entry);

if (&addressed_buffer == &packet_timeout_handle_) { continue; } // We don't care about timeouts
else if (&addressed_buffer == &batch_timeout_handle_) { break; } // Time to exit!
else if (completion_code < 0) { failed_packets++; } // Failed operation
else { // Successful operation
if (addressed_buffer.status == message_status_t::pending) {
addressed_buffer.status = message_status_t::sent;
}
else { // Received a reply:
auto now = std::chrono::steady_clock::now();
auto diff = now - addressed_buffer.send_time;
result.batch_latency += diff;
result.max_packet_latency = std::max(result.max_packet_latency, diff);
result.received_packets++;
}
io_uring_cqe_seen(&ring_, completed_entries[i]);
}
}

Expand Down Expand Up @@ -5928,14 +5929,14 @@ static void rpc_asio( //
bm::State &state, std::string const &address, //
std::size_t batch_size, std::size_t packet_size, std::chrono::microseconds timeout) {

constexpr std::uint16_t rpc_server_port_k = 12345;
constexpr std::uint16_t rpc_port_k = 12345;

// Create server and client
asio::io_context server_context;
asio::io_context client_context;

rpc_asio_server server(server_context, address, rpc_server_port_k, batch_size, timeout);
rpc_asio_client client(client_context, address, rpc_server_port_k, batch_size, timeout);
rpc_asio_server server(server_context, address, rpc_port_k, batch_size, timeout);
rpc_asio_client client(client_context, address, rpc_port_k, batch_size, timeout);

// The order of the following thread-initializations is important
std::thread server_context_thread([&]() { server_context.run(); });
Expand Down

0 comments on commit a2a9d6c

Please sign in to comment.