Skip to content

Commit

Permalink
lib.interlink: make queue size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Oct 17, 2022
1 parent 4eca2e7 commit f948fa6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 45 deletions.
19 changes: 15 additions & 4 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ module(...,package.seeall)
local shm = require("core.shm")
local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}
local Receiver = {
name = "apps.interlink.Receiver",
config = {
queue = {},
size = {default=1024}
}
}

function Receiver:new (queue)
function Receiver:new (conf)
local self = {
attached = false,
queue = conf.queue,
size = conf.size
}
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Receiver})
return setmetatable(self, {__index=Receiver})
end

function Receiver:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/receiver/"..queue..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
self.interlink = interlink.attach_receiver(self.shm_name, self.size)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
Expand Down
19 changes: 15 additions & 4 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ module(...,package.seeall)
local shm = require("core.shm")
local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}
local Transmitter = {
name = "apps.interlink.Transmitter",
config = {
queue = {},
size = {default=1024}
}
}

function Transmitter:new (queue)
function Transmitter:new (conf)
local self = {
attached = false,
queue = conf.queue,
size = conf.size
}
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Transmitter})
return setmetatable(self, {__index=Transmitter})
end

function Transmitter:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/transmitter/"..queue..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
self.interlink = interlink.attach_transmitter(self.shm_name, self.size)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
Expand Down
89 changes: 52 additions & 37 deletions src/lib/interlink.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,25 @@ local band = require("bit").band
local waitfor = require("core.lib").waitfor
local sync = require("core.sync")

local SIZE = 1024
local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("int")

assert(band(SIZE, SIZE-1) == 0, "SIZE is not a power of two")

-- Based on MCRingBuffer, see
-- http://www.cse.cuhk.edu.hk/%7Epclee/www/pubs/ipdps10.pdf

ffi.cdef([[ struct interlink {
int read, write, state[1];
char pad1[]]..CACHELINE-3*INT..[[];
int lwrite, nread;
char pad2[]]..CACHELINE-2*INT..[[];
int lread, nwrite;
char pad3[]]..CACHELINE-2*INT..[[];
struct packet *packets[]]..SIZE..[[];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))]])
local interlink_t = ffi.typeof([[
struct {
int size;
char pad0[]]..CACHELINE-1*INT..[[];
int read, write, state[1];
char pad1[]]..CACHELINE-3*INT..[[];
int lwrite, nread;
char pad2[]]..CACHELINE-2*INT..[[];
int lread, nwrite;
char pad3[]]..CACHELINE-2*INT..[[];
struct packet *packets[?];
} __attribute__((packed))
]])

-- The life cycle of an interlink is managed using a state machine. This is
-- necessary because we allow receiving and transmitting processes to attach
Expand All @@ -92,13 +93,14 @@ ffi.cdef([[ struct interlink {
-- once the former receiver has detached while the transmitter stays attached
-- throughout, and vice-versa.
--
-- Interlinks can be in one of five states:
-- Interlinks can be in one of six states:

local FREE = 0 -- Implicit initial state due to 0 value.
local RXUP = 1 -- Receiver has attached.
local TXUP = 2 -- Transmitter has attached.
local DXUP = 3 -- Both ends have attached.
local DOWN = 4 -- Both ends have detached; must be re-allocated.
local INIT = 0 -- Implicit initial state due to 0 value.
local FREE = 1 -- Queue is in free state, ready to attach.
local RXUP = 2 -- Receiver has attached.
local TXUP = 3 -- Transmitter has attached.
local DXUP = 4 -- Both ends have attached.
local DOWN = 5 -- Both ends have detached; must be re-allocated.

-- If at any point both ends have detached from an interlink it stays in the
-- DOWN state until it is deallocated.
Expand All @@ -107,7 +109,8 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated.
--
-- Who Change Why
-- ------ ------------- ---------------------------------------------------
-- (any) none -> FREE A process creates the queue (initial state).
-- (any) none -> INIT A process creates the queue (initial state).
-- (any) INIT -> FREE A process has initialized the queue.
-- recv. FREE -> RXUP Receiver attaches to free queue.
-- recv. TXUP -> DXUP Receiver attaches to queue with ready transmitter.
-- recv. DXUP -> TXUP Receiver detaches from queue.
Expand All @@ -121,6 +124,8 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated.
--
-- Who Change Why *PROHIBITED*
-- ------ ----------- --------------------------------------------------------
-- recv. INIT->RXUP Can not attach to uninitialized queue.
-- trans. INIT->TXUP Can not attach to uninitialized queue.
-- (any) FREE->DEAD Cannot shutdown before having attached.
-- (any) *->FREE Cannot transition to FREE except by reallocating.
-- recv. TXUP->DEAD Receiver cannot mutate queue after it has detached.
Expand All @@ -130,15 +135,20 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated.
-- (any) DXUP->DOWN Cannot shutdown queue while it is in use.
-- (any) DOWN->* Cannot transition from DOWN (must create new queue.)

local function attach (name, initialize)
local function attach (name, size, transitions)
assert(band(size, size-1) == 0, "size is not a power of two")
local r
local first_try = true
waitfor(
function ()
-- Create/open the queue.
r = shm.create(name, "struct interlink")
-- Return if we succeed to initialize it.
if initialize(r) then return true end
r = shm.create(name, interlink_t, size)
-- Initialize queue (only one process can set size).
if sync.cas(r.state, INIT, FREE) then
r.size = size
end
-- Return if we succeed to attach.
if transitions(r) then return true end
-- We failed; handle error and try again.
shm.unmap(r)
if first_try then
Expand All @@ -147,20 +157,22 @@ local function attach (name, initialize)
end
end
)
-- Make sure we agree on the queue size.
assert(r.size == size, "interlink: queue size mismatch on: "..name)
-- Ready for action :)
return r
end

function attach_receiver (name)
return attach(name,
function attach_receiver (name, size)
return attach(name, size,
-- Attach to free queue as receiver (FREE -> RXUP)
-- or queue with ready transmitter (TXUP -> DXUP.)
function (r) return sync.cas(r.state, FREE, RXUP)
or sync.cas(r.state, TXUP, DXUP) end)
end

function attach_transmitter (name)
return attach(name,
function attach_transmitter (name, size)
return attach(name, size,
-- Attach to free queue as transmitter (FREE -> TXUP)
-- or queue with ready receiver (RXUP -> DXUP.)
function (r) return sync.cas(r.state, FREE, TXUP)
Expand Down Expand Up @@ -206,12 +218,12 @@ end

-- Queue operations follow below.

local function NEXT (i)
return band(i + 1, SIZE - 1)
local function NEXT (size, i)
return band(i + 1, size - 1)
end

function full (r)
local after_nwrite = NEXT(r.nwrite)
local after_nwrite = NEXT(r.size, r.nwrite)
if after_nwrite == r.lread then
if after_nwrite == r.read then
return true
Expand All @@ -222,7 +234,7 @@ end

function insert (r, p)
r.packets[r.nwrite] = p
r.nwrite = NEXT(r.nwrite)
r.nwrite = NEXT(r.size, r.nwrite)
end

function push (r)
Expand All @@ -241,7 +253,7 @@ end

function extract (r)
local p = r.packets[r.nread]
r.nread = NEXT(r.nread)
r.nread = NEXT(r.size, r.nread)
return p
end

Expand All @@ -258,13 +270,16 @@ end
shm.register('interlink', getfenv())

function open (name, readonly)
return shm.open(name, "struct interlink", readonly)
local r = shm.open(name, interlink_t, 'read-only', 1)
local size = r.size
shm.unmap(r)
return shm.open(name, interlink_t, readonly, size)
end

local function describe (r)
local function queue_fill (r)
local read, write = r.read, r.write
return read > write and write + SIZE - read or write - read
local read, write, size = r.read, r.write, r.size
return read > write and write + size - read or write - read
end
local function status (r)
return ({
Expand All @@ -275,7 +290,7 @@ local function describe (r)
[DOWN] = "deallocating"
})[r.state[0]]
end
return ("%d/%d (%s)"):format(queue_fill(r), SIZE - 1, status(r))
return ("%d/%d (%s)"):format(queue_fill(r), size - 1, status(r))
end

ffi.metatype(ffi.typeof("struct interlink"), {__tostring=describe})
ffi.metatype(interlink_t, {__tostring=describe})

0 comments on commit f948fa6

Please sign in to comment.