Skip to content

Commit

Permalink
Improved logger
Browse files Browse the repository at this point in the history
Factor out token buckets and add auto-throttling.
  • Loading branch information
alexandergall committed Jun 1, 2015
1 parent 8335386 commit efc1344
Showing 1 changed file with 195 additions and 36 deletions.
231 changes: 195 additions & 36 deletions src/core/lib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,83 @@ function root_check (message)
end
end

-- Simple token bucket for rate-limiting of events. A token bucket is
-- create through
--
-- local tb = token_bucket_new({ rate = <rate> })
--
-- where <rate> is the maximum allowed rate in Hz, which defaults to
-- 10. Conceptually, <rate> tokens are added to the bucket each
-- second and the bucket can hold no more than <rate> tokens but at
-- least one.
--

local token_bucket = {}
token_bucket.mt = { __index = token_bucket }
token_bucket.default = { rate = 10 }
function token_bucket_new (config)
local config = config or token_bucket.default
local tb = setmetatable({}, token_bucket.mt)
tb:rate(config.rate or token_bucket.default.rate)
tb._tstamp = C.get_monotonic_time()
return tb
end

-- The rate can be set with the rate() method at any time, which
-- empties the token bucket an also returns the previous value. If
-- called with a nil argument, returns the currently configured rate.
function token_bucket:rate (rate)
if rate ~= nil then
self._rate = rate
self._max_tokens = math.max(rate, 1)
self._tokens = 0
end
return self._rate
end

function token_bucket:_update (tokens)
local now = C.get_monotonic_time()
local tokens = math.min(self._max_tokens, tokens + self._rate*(now-self._tstamp))
self._tstamp = now
return tokens
end

-- The take() method tries to remove <n> tokens from the bucket. If
-- enough tokens are available, they are subtracted from the bucket
-- and a true value is returned. Otherwise, the bucket remains
-- unchanged and a false value is returned. For efficiency, the
-- tokens accumulated since the last call to take() or can_take() are
-- only added if the request can not be fulfilled by the state of the
-- bucket when the method is called.
function token_bucket:take (n)
local n = n or 1
local result = false
local tokens = self._tokens
if n > tokens then
tokens = self:_update(tokens)
end
if n <= tokens then
tokens = tokens - n
result = true
end
self._tokens = tokens
return result
end

-- The can_take() method returns a true value if the bucket contains
-- at least <n> tokens, false otherwise. The bucket is updated in a
-- layz fashion as described for the take() method.
function token_bucket:can_take (n)
local n = n or 1
local tokens = self._tokens
if n <= tokens then
return true
end
tokens = self:_update(tokens)
self._tokens = tokens
return n <= tokens
end

-- Simple rate-limited logging facility. Usage:
--
-- local logger = lib.logger_new({ rate = <rate>,
Expand All @@ -451,53 +528,135 @@ end
-- The output format is
-- <date> <module>: message
--
-- The logger uses an automatic throttling mechanism to dynamically
-- lower the logging rate when the rate of discarded messages exceeds
-- the maximum log rate by a factor of 5 over one or multiple adacent
-- intervals of 10 seconds. For each such interval, the logging rate
-- is reduced by a factor of 2 with a lower bound of 0.1 Hz (i.e. one
-- message per 10 seconds). For each 10-second interval for which the
-- rate of discarded messages is below the threshold, the logging rate
-- is increased by 1/4 of the original rate, i.e. it takes at least 40
-- seconds to ramp back up to the original rate.
--
local logger = {}
-- Default configuration
logger.config = { rate = 10,
fh = io.stdout,
flush = true,
module = '',
date = true }
logger.default = {
rate = 10,
fh = io.stdout,
flush = true,
module = '',
date = true,
date_fmt = "%b %Y %H:%M:%S ",
}
-- Auto-throttle configuration
logger.throttle = {
interval = 10, -- Sampling interval for discard rate
excess = 5, -- Multiple of rate at which to start throttling
increment = 4, -- Fraction of rate to increase for un-throttling
min_rate = 0.1, -- Minimum throttled rate
}
logger.mt = { __index = logger }

function logger_new (config)
local config = config or {}
local l = { config = {} }
setmetatable(l.config, { __index = logger.config })
local config = config or logger.default
local l = setmetatable({}, logger.mt)
_config = setmetatable({}, { __index = logger.default })
for k, v in pairs(config) do
assert(logger.config[k], "Unkown logger configuration "..k)
l.config[k] = v
assert(_config[k], "Logger: unknown configuration option "..k)
_config[k] = v
end
l.tstamp = C.get_unix_time()
l.token_bucket = l.config.rate
l.discard = 0
return setmetatable(l, { __index = logger })
end
l._config = _config
l._tb = token_bucket_new({ rate = _config.rate })
local _throttle = {
discards = 0,
tstamp = C.get_monotonic_time(),
rate = _config.rate * logger.throttle.excess,
increment = _config.rate/logger.throttle.increment,
}
l._throttle = setmetatable(_throttle, { __index = logger.throttle })
l._preamble = (l._config.module and l._config.module..': ') or ''
return l
end

-- Log message <msg> unless the rate limit is exceeded. Note that
-- <msg> is evaluated upon the method call in any case, which can have
-- a performance impact even when the message is discarded. This can
-- be avoided by calling the can_log() method first, i.e.
--
-- if logger:can_log() then
-- logger:log('foo')
-- end
--
-- This framework should have very low processing overhead and should
-- be safe to call even form within packet-processing loops. The
-- bottleneck currently is the call to clock_gettime(). Care has been
-- taken to make sure that this call is executed at most once in the
-- non-rate limited code path.

function logger:log (msg)
local rate = self.config.rate
local fh = self.config.fh
local flush = self.config.flush
local date = ''
if self.config.date then
date = os.date("%b %Y %H:%M:%S ")
end
local now = C.get_unix_time()
local new_tokens = rate*(now-self.tstamp)
self.token_bucket = math.min(rate, self.token_bucket+new_tokens)
if self.token_bucket >= 1 then
if self.discard > 0 then
fh:write(date..self.discard.." messages discarded\n")
self.discard = 0
if self._tb:take(1) then
local config = self._config
local throttle = self._throttle
local date = ''
if config.date then
date = os.date(config.date_fmt)
end
msg = date..(self.config.module and self.config.module..': '
or '')..msg..'\n'
fh:write(msg)
if flush then fh:flush() end
self.token_bucket = self.token_bucket-1
local preamble = date..self._preamble
local fh = config.fh
now = C.get_monotonic_time()
local interval = now-throttle.tstamp
local samples = interval/throttle.interval
local drate = throttle.discards/interval
local current_rate = self._tb:rate()
if samples >= 1 then
if throttle.discards > 0 then
fh:write(string.format(preamble.."%d messages discarded in %d seconds\n",
throttle.discards, now-throttle.tstamp))
end
if drate > throttle.rate then
local min_rate = throttle.min_rate
if current_rate > min_rate then
local throttle_rate = math.max(min_rate,
current_rate/2^samples)
fh:write(string.format(preamble.."throttling logging rate to "
.."%.2f Hz%s\n",
throttle_rate,
(throttle_rate == min_rate and ' (minimum)') or ''))
self._tb:rate(throttle_rate)
end
else
local configured_rate = config.rate
if current_rate < configured_rate then
local throttle_rate = math.min(configured_rate,
current_rate + throttle.increment*samples)
fh:write(string.format(preamble.."unthrottling logging rate to "
.."%.2f Hz%s\n",
throttle_rate,
(throttle_rate == configured_rate and ' (maximum)') or ''))
self._tb:rate(throttle_rate)
end
end
throttle.discards = 0
throttle.tstamp = now
end
fh:write(preamble..msg..'\n')
if config.flush then fh:flush() end
else
self.discard = self.discard+1
self._throttle.discards = self._throttle.discards + 1
end
end

-- Return true if a message can be logged without being discarded,
-- false otherwise. In the first case, it is guaranteed that the
-- token bucket for the logging rate-limiter contains at least one
-- token. In the second case, the rate-limit is hit and the counter
-- of discarded messages is increased.
function logger:can_log ()
if self._tb:can_take(1) then
return true
end
self.tstamp = now
self._throttle.discards = self._throttle.discards + 1
return false
end

function selftest ()
Expand Down

0 comments on commit efc1344

Please sign in to comment.