Skip to content

Commit

Permalink
program.ipfix.probe_rss: restart dead workers
Browse files Browse the repository at this point in the history
Add a timer in the control process to check for dead workers and
restart them if necessary.  Fail hard if a worker dies in more than 5
consecutive checks.
  • Loading branch information
alexandergall committed Oct 27, 2020
1 parent 25bc9c9 commit b9c499e
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/program/ipfix/probe_rss/README.inc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Available options:
uses sleep(3) during periods of little or no traffic to
reduce CPU usage.
-L, --log-date Include date and time in log messages. Off by default.
-c, --worker-check-interval
Interval in seconds at which the master process checks if
its worker processes are still alive. Dead workers are
restarted.
-j CMD, --jit CMD Control LuaJIT behavior. Available commands:
-jv=FILE, --jit v=FILE
Write verbose JIT trace output to FILE.
Expand Down
107 changes: 88 additions & 19 deletions src/program/ipfix/probe_rss/probe_rss.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local lib = require("core.lib")
local app_graph = require("core.config")
local worker = require("core.worker")
local shm = require("core.shm")
local timer = require("core.timer")
local pci = require("lib.hardware.pci")
local logger = require("lib.logger")
local probe = require("program.ipfix.lib")
Expand Down Expand Up @@ -130,10 +131,22 @@ local function create_workers (probe_config, duration, busywait, jit, logger, lo
return class.."_"..instance..(weight > 1 and "_"..weight or '')
end

local workers = {}
local function add_worker (name, expr, create_fn, remove_fn)
table.insert(workers,
{
name = name,
expr = expr,
create_fn = create_fn,
remove_fn = remove_fn,
restarts = 0
}
)
end

assert(type(main.interfaces) == "table")

local mellanox = {}
local rss_workers = {}
local observation_domain = ipfix.observation_domain_base

for rssq = 0, main.hw_rss_scaling - 1 do
Expand Down Expand Up @@ -221,15 +234,21 @@ local function create_workers (probe_config, duration, busywait, jit, logger, lo
probe.value_to_string(iconfig), tostring(duration),
tostring(instance.busywait), probe.value_to_string(jit)
)
local child_pid = worker.start(rss_link, worker_expr)
logger:log(string.format("Launched IPFIX worker process #%d, "..
"observation domain %d",
child_pid, od))
logger:log(string.format("Selected collector %s:%d from pool %s "
.."for process #%d ",
collector.ip, collector.port, pool,
child_pid))
shm.create("ipfix_workers/"..child_pid, "uint64_t")
add_worker(rss_link, worker_expr,
function(pid)
logger:log(string.format("Launched IPFIX worker process #%d, "..
"observation domain %d",
pid, od))
logger:log(string.format("Selected collector %s:%d from pool %s "
.."for process #%d ",
collector.ip, collector.port, pool,
pid))
shm.create("ipfix_workers/"..pid, "uint64_t")
end,
function(pid)
shm.unlink("ipfix_workers/"..pid)
end
)
end
table.insert(outputs, output)
end
Expand All @@ -242,7 +261,15 @@ local function create_workers (probe_config, duration, busywait, jit, logger, lo
tostring(busywait), probe.value_to_string(override_jit(jit, main.rss_jit, rssq)),
log_date
)
rss_workers["rss"..rssq] = worker_expr
add_worker("rss"..rssq, worker_expr,
function(pid)
logger:log("Launched RSS worker process #"..pid)
shm.create("rss_workers/"..pid, "uint64_t")
end,
function(pid)
shm.unlink("rss_workers/"..pid)
end
)

end

Expand All @@ -261,13 +288,12 @@ local function create_workers (probe_config, duration, busywait, jit, logger, lo
require(driver).ConnectX4, conf)
end

for name, expr in pairs(rss_workers) do
local child_pid = worker.start(name, expr)
logger:log("Launched RSS worker process #"..child_pid)
shm.create("rss_workers/"..child_pid, "uint64_t")
for _, spec in ipairs(workers) do
local child_pid = worker.start(spec.name, spec.expr)
spec.create_fn(child_pid)
end

return ctrl_graph, mellanox
return workers, ctrl_graph, mellanox
end

local long_opts = {
Expand All @@ -276,7 +302,8 @@ local long_opts = {
jit = "j",
help = "h",
["busy-wait"] = "b",
["log-date"] = 'L'
["log-date"] = 'L',
["worker-check-interval"] = 'c'
}

local function usage(exit_code)
Expand All @@ -285,9 +312,14 @@ local function usage(exit_code)
end

function run (parameters)
-- Limit for the number of restarts of a worker. If exceeded, the
-- entire probe is terminated. Currently not configurable.
local restart_limit = 5

local duration
local busywait = false
local log_date = false
local worker_check_interval = 5
local profiling, traceprofiling
local jit = { opts = {} }
local log_pid = string.format("[%5d]", S.getpid())
Expand All @@ -307,20 +339,57 @@ function run (parameters)
j = probe.parse_jit_option_fn(jit),
L = function(arg)
log_date = true
end,
c = function(arg)
if arg:match("^[0-9]+$") then
worker_check_interval = tonumber(arg)
end
end
}

-- Parse command line arguments
parameters = lib.dogetopt(parameters, opt, "hdj:D:l:bL", long_opts)
parameters = lib.dogetopt(parameters, opt, "hdj:D:l:bLc:", long_opts)
if #parameters ~= 1 then usage (1) end

local logger = logger.new({ rate = 30, date = log_date,
module = log_pid.." RSS master" })
local file = table.remove(parameters, 1)
local probe_config = assert(loadfile(file))()
local ctrl_graph, mellanox =
local workers, ctrl_graph, mellanox =
create_workers(probe_config, duration, busywait, jit, logger, log_date)

if worker_check_interval > 0 then
local workers_by_name = {}
for _, spec in ipairs(workers) do
workers_by_name[spec.name] = spec
end

local worker_check = function()
for n, s in pairs(worker.status()) do
if not s.alive then
logger:log(string.format("Worker process %d died (status %d)",
s.pid, s.status))
local spec = workers_by_name[n]
spec.restarts = spec.restarts + 1
if spec.restarts > restart_limit then
logger:log(string.format("Too many restarts (>%d), "
.."terminating", restart_limit))
S.exit(1)
end
logger:log(string.format("Restarting process (attempt #%d)",
spec.restarts))
spec.remove_fn(s.pid)
local new_pid = worker.start(n, spec.expr)
spec.create_fn(new_pid)
else
workers_by_name[n].restarts = 0
end
end
end
timer.activate(timer.new("workermon", worker_check,
worker_check_interval * 10e8, "repeating"))
end

engine.busywait = false
engine.Hz = 10
engine.configure(ctrl_graph)
Expand Down

0 comments on commit b9c499e

Please sign in to comment.