Skip to content

Commit

Permalink
Add. skt:poll() poll method.
Browse files Browse the repository at this point in the history
  • Loading branch information
moteus committed Dec 13, 2013
1 parent 8e85963 commit c83a0fd
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 6 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ env:
# - LUAROCKS_GITTAG=v$LUAROCKS_VER
matrix:
# todo: install libffi and test ffi binding on Lua 5.1/5.2
# - PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=lua5.1 LUA_DEV=liblua5.1-dev LUA_VER=5.1 LUA_SFX=5.1 LUA_INCDIR=/usr/include/lua5.1
# - PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq4 LUA=lua5.1 LUA_DEV=liblua5.1-dev LUA_VER=5.1 LUA_SFX=5.1 LUA_INCDIR=/usr/include/lua5.1
# - PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=lua5.2 LUA_DEV=liblua5.2-dev LUA_VER=5.2 LUA_SFX=5.2 LUA_INCDIR=/usr/include/lua5.2
# - PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0
# - PERF="NO" LZMQ=ffi ZMQ_VER=zeromq3 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0
- PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=lua5.1 LUA_DEV=liblua5.1-dev LUA_VER=5.1 LUA_SFX=5.1 LUA_INCDIR=/usr/include/lua5.1
- PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq4 LUA=lua5.1 LUA_DEV=liblua5.1-dev LUA_VER=5.1 LUA_SFX=5.1 LUA_INCDIR=/usr/include/lua5.1
- PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=lua5.2 LUA_DEV=liblua5.2-dev LUA_VER=5.2 LUA_SFX=5.2 LUA_INCDIR=/usr/include/lua5.2
- PERF="NO" LZMQ=lzmq ZMQ_VER=zeromq3 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0
- PERF="NO" LZMQ=ffi ZMQ_VER=zeromq3 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0
- PERF="NO" LZMQ=ffi ZMQ_VER=zeromq4 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0
- PERF="NO" LZMQ=ffi-lua ZMQ_VER=zeromq3 LUA=luajit LUA_DEV=libluajit-5.1-dev LUA_VER=5.1 LUA_SFX=jit LUA_INCDIR=/usr/include/luajit-2.0

Expand Down
13 changes: 13 additions & 0 deletions doc/lzmq.ldoc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ function socket:unbind () end
-- @usage skt:bind{"tcp://*:5555","ipc://test.pub.ipc"}
function socket:unbind () end

--- Poll specific ZMQ Socket.
--
-- @tparam[opt=-1] number timeout timeout in milliseconds (-1 - infinity)
-- @tparam[opt=zmq.POLLIN] number events poll events.
-- @treturn[1] bool poll result (false if timeout)
-- @treturn[1] number envents
-- @treturn[2] nil
-- @treturn[2] error
--
-- @usage
-- if skt:poll(100) then return skt:recv() end
function socket:poll () end

--- Send message over Socket.
--
-- @tparam string message content of message.
Expand Down
22 changes: 22 additions & 0 deletions src/lua/lzmq/ffi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ end
local FLAGS = api.FLAGS
local ERRORS = api.ERRORS
local ZMQ_LINGER = api.SOCKET_OPTIONS.ZMQ_LINGER
local ZMQ_POLLIN = FLAGS.ZMQ_POLLIN


local unpack = unpack or table.unpack
Expand Down Expand Up @@ -637,6 +638,27 @@ function Socket:monitor(addr, events)
return addr
end

local poll_item = ffi.new(api.vla_pollitem_t, 1)

function Socket:poll(timeout, events)
timeout = timeout or -1
events = mask or ZMQ_POLLIN

poll_item[0].socket = self._private.skt
poll_item[0].fd = 0
poll_item[0].events = events
poll_item[0].revents = 0

local ret = api.zmq_poll(poll_item, 1, timeout)

poll_item[0].socket = api.NULL
local revents = poll_item[0].revents

if ret == -1 then return nil, zerror() end

return (bit.band(events, revents) ~= 0), revents
end

end

do -- Message
Expand Down
16 changes: 16 additions & 0 deletions src/zsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,21 @@ static int luazmq_skt_recvx (lua_State *L) {
return i;
}

static int luazmq_skt_poll (lua_State *L) {
zsocket *skt = luazmq_getsocket(L);
int timeout = luaL_optint(L, 2, -1);
int mask = luaL_optint(L, 3, ZMQ_POLLIN);
zmq_pollitem_t items [] = { { skt->skt, 0, mask, 0 } };

if(-1 == zmq_poll (items, 1, timeout)){
return luazmq_fail(L, skt);
}

lua_pushboolean(L, (items[0].revents & mask)?1:0);
lua_pushinteger(L, items[0].revents);
return 2;
}

static int luazmq_skt_monitor (lua_State *L) {
zsocket *skt = luazmq_getsocket(L);
char endpoint[128];
Expand Down Expand Up @@ -810,6 +825,7 @@ static const struct luaL_Reg luazmq_skt_methods[] = {
{"unbind", luazmq_skt_unbind },
{"connect", luazmq_skt_connect },
{"disconnect", luazmq_skt_disconnect },
{"poll", luazmq_skt_poll },
{"send", luazmq_skt_send },
{"send_msg", luazmq_skt_send_msg },
{"sendx", luazmq_skt_sendx },
Expand Down
7 changes: 6 additions & 1 deletion test/lunit/console.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ local table = require "table"

local _M = {}

local function rfill(str, wdt, ch)
if wdt > #str then str = str .. (ch or ' '):rep(wdt - #str) end
return str
end

local function printformat(format, ...)
io.write( string.format(format, ...) )
end
Expand Down Expand Up @@ -94,7 +99,7 @@ function _M.begin()
end

function _M.run(testcasename, testname)
io.write(testcasename, '.', testname) io.flush()
io.write(rfill(testcasename .. '.' .. testname, 70)) io.flush()
end

function _M.err(fullname, message, traceback)
Expand Down
31 changes: 31 additions & 0 deletions test/utest.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,36 @@ end

end

local _ENV = TEST_CASE'socket poll' if true then

local ctx, req, rep, timer

function setup()
ctx = assert(zmq.context())
rep = assert(ctx:socket{zmq.REP, bind = ECHO_ADDR, rcvtimeo = 100})
req = assert(ctx:socket{zmq.REQ, connect = ECHO_ADDR})
timer = ztimer.monotonic()
end

function teardown()
if ctx then ctx:destroy() end
timer:close()
end

function test_timeout()
timer:start()
assert_false(rep:poll(2000))
assert_true(ge(1900, timer:stop()))
end

function test_recv()
req:send("HELLO")
assert_true(rep:poll(2000))
assert_equal("HELLO", rep:recv())
end

end

local _ENV = TEST_CASE'loop' if true then

local loop, timer
Expand All @@ -1101,6 +1131,7 @@ end

function teardown()
loop:destroy()
timer:close()
wait(500) -- for TCP time to release IP address
end

Expand Down

0 comments on commit c83a0fd

Please sign in to comment.