Skip to content

Commit

Permalink
apps.xdp: do not leak packets in XDP:stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Nov 20, 2019
1 parent d225e75 commit d44fb1d
Showing 1 changed file with 75 additions and 2 deletions.
77 changes: 75 additions & 2 deletions src/apps/xdp/xdp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,25 @@ function needs_wakeup (r)
return band(r.flags[0], bits{XDP_RING_NEED_WAKEUP=1})
end

-- Rewind routines for transmit/fill. These are used by XDP:stop() to reclaim
-- packet buffers left in-fight after shutdown.

function rewind_transmit (r)
r.write = tobit(r.write - 1)
local desc = ffi.cast(xdp_desc_ptr_t, r.desc)
local idx = mask(r.write)
return ffi.cast("struct packet *",
-- packet struct begins at payload - packet_overhead
from_umem(desc[idx].addr) - packet_overhead)
end

function rewind_fill (r)
r.write = tobit(r.write - 1)
local desc = ffi.cast("uint64_t *", r.desc)
local idx = mask(r.write)
return ffi.cast("struct packet *", from_umem(desc[idx]))
end


-- ---- XDP App ---------------------------------------------------------

Expand Down Expand Up @@ -480,6 +499,19 @@ function XDP:create_xsk (ifname, queue)
-- relative UMEM offsets (addr).
xsk.fr = self:xdp_map_ring(xsk.sock, layouts.fr, "uint64_t", 0x100000000ULL) -- XDP_UMEM_PGOFF_FILL_RING
xsk.cr = self:xdp_map_ring(xsk.sock, layouts.cr, "uint64_t", 0x180000000ULL) -- XDP_UMEM_PGOFF_COMPLETION_RING
-- Counters to track packets in-flight through kernel.
-- - rxq is incremented when a packet buffer is enqueued onto the
-- fill ring and decremented when a packet buffer is dequeued from the
-- tx ring. I.e., it tracks the number of unused buffers currently left
-- on the fill ring.
-- - txq is incremented when a packet buffer is enqueued onto the tx ring
-- and decremented then a packet buffer is dequeued from the
-- completion ring. I.e, it tracks number of unused buffers currently
-- left on the tx ring.
-- The rxq and txq tallies are used by XDP:stop() to perform a clean
-- socket shutdown without leaking packet buffers.
xsk.rxq = 0
xsk.txq = 0
-- Bind socket to interface
local sa = ffi.new(
sockaddr_xdp_t,
Expand Down Expand Up @@ -522,28 +554,65 @@ end
-- Instance methods

function XDP:stop ()
-- Close socket.
self.sock:close()
-- Reclaim packet buffers left on rings.
--
-- Problem: we need a way to tell apart which packets buffers on the
-- (write-only) tx and fill rings need to be freed, and which packet buffers
-- were already enqueued to the (read-only) rx and completions rings.
-- Otherwise, we might cause memory corruption by double-freeing packets.
--
-- We can not however reliably inspect the kernel's internal read cursors
-- for the tx and fill rings. Instead we solve this with a *hack* based on
-- the assumptions that 1) the kernel does not modify the rings after
-- closing the XDP socket; 2) the kernel moves packets from fill to rx rings
-- and tx to completion rings *in-order*; 3) the kernel does not clobber
-- descriptors that have not yet moved to an rx or completion ring.
--
-- First we flush the rx and completion rings, freeing any dequeued packets,
-- while updating the rxq and txq tallies (see XDP:create_xsk()).
while not empty(self.rx) do
packet.free_internal(receive(self.rx))
self.rxq = self.rxq - 1
end
while not empty(self.cr) do
packet.free_internal(reclaim(self.cr))
self.txq = self.txq - 1
end
-- Then, we use the final rxq/txq tallies to infer how many packets on the
-- transmit and fill rings are left dangling, and free those amounts of
-- packets (starting from the most recently enqueued, going backwards) from
-- each ring individually.
for _ = 1, self.txq do
packet.free_internal(rewind_transmit(self.tx))
end
for _ = 1, self.rxq do
packet.free_internal(rewind_fill(self.fr))
end
end

function XDP:pull ()
local output = self.output.output
local rx, fr = self.rx, self.fr
local rx = self.rx
self:refill()
if not output then return end
for _ = 1, engine.pull_npackets do
if empty(rx) then break end
link.transmit(output, receive(rx))
self.rxq = self.rxq - 1
end
pull(rx)
end

function XDP:push ()
local input = self.input.input
local tx, cr = self.tx, self.cr
local tx = self.tx
if not input then return end
while not link.empty(input) and not full(tx) do
local p = link.receive(input)
transmit(tx, p)
self.txq = self.txq + 1
-- Stimulate breathing: after the kernel is done with the packet buffer
-- it will either be fed back from the completion ring onto the free
-- ring, or put back onto the freelist via packet.free_internal; hence,
Expand All @@ -568,6 +637,8 @@ function XDP:refill ()
if input and output then
while not (empty(cr) or full(fr)) do
fill(fr, reclaim(cr))
self.txq = self.txq - 1
self.rxq = self.rxq + 1
end
end
-- If the queue has its output attached we make sure that the kernel does
Expand All @@ -578,6 +649,7 @@ function XDP:refill ()
if output then
while not full(fr) do
fill(fr, packet.allocate())
self.rxq = self.rxq + 1
end
end
-- If the queue has its input attached we release any packet buffers
Expand All @@ -589,6 +661,7 @@ function XDP:refill ()
-- NB: mandatory free_internal since we do not know the payload length
-- of reclaimed packets.
packet.free_internal(reclaim(cr))
self.txq = self.txq - 1
end
end
push(fr)
Expand Down

0 comments on commit d44fb1d

Please sign in to comment.