Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout support for libuv adapter #1016

Merged
merged 9 commits into from
Jan 18, 2022
Merged
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)


ifndef AE_DIR
hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
Expand All @@ -177,10 +176,11 @@ hiredis-example-ae: examples/example-ae.c adapters/ae.h $(STLIBNAME)
endif

ifndef LIBUV_DIR
hiredis-example-libuv:
@echo "Please specify LIBUV_DIR (e.g. ../libuv/)"
@false
# dynamic link libuv.so
hiredis-example-libuv: examples/example-libuv.c adapters/libuv.h $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. -I$(LIBUV_DIR)/include $< -luv -lpthread -lrt $(STLIBNAME) $(REAL_LDFLAGS)
else
# use user provided static lib
hiredis-example-libuv: examples/example-libuv.c adapters/libuv.h $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. -I$(LIBUV_DIR)/include $< $(LIBUV_DIR)/.libs/libuv.a -lpthread -lrt $(STLIBNAME) $(REAL_LDFLAGS)
endif
Expand Down
169 changes: 111 additions & 58 deletions adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,111 +7,164 @@
#include <string.h>

typedef struct redisLibuvEvents {
redisAsyncContext* context;
uv_poll_t handle;
int events;
redisAsyncContext* context;
uv_poll_t handle;
uv_timer_t timer;
int events;
} redisLibuvEvents;


static void redisLibuvPoll(uv_poll_t* handle, int status, int events) {
redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
int ev = (status ? p->events : events);

if (p->context != NULL && (ev & UV_READABLE)) {
redisAsyncHandleRead(p->context);
}
if (p->context != NULL && (ev & UV_WRITABLE)) {
redisAsyncHandleWrite(p->context);
}
redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
int ev = (status ? p->events : events);

if (p->context != NULL && (ev & UV_READABLE)) {
redisAsyncHandleRead(p->context);
}
if (p->context != NULL && (ev & UV_WRITABLE)) {
redisAsyncHandleWrite(p->context);
}
}


static void redisLibuvAddRead(void *privdata) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

p->events |= UV_READABLE;
p->events |= UV_READABLE;

uv_poll_start(&p->handle, p->events, redisLibuvPoll);
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
}


static void redisLibuvDelRead(void *privdata) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

p->events &= ~UV_READABLE;
p->events &= ~UV_READABLE;

if (p->events) {
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
} else {
uv_poll_stop(&p->handle);
}
if (p->events) {
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
} else {
uv_poll_stop(&p->handle);
}
}


static void redisLibuvAddWrite(void *privdata) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

p->events |= UV_WRITABLE;
p->events |= UV_WRITABLE;

uv_poll_start(&p->handle, p->events, redisLibuvPoll);
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
}


static void redisLibuvDelWrite(void *privdata) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

p->events &= ~UV_WRITABLE;
p->events &= ~UV_WRITABLE;

if (p->events) {
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
} else {
uv_poll_stop(&p->handle);
}
if (p->events) {
uv_poll_start(&p->handle, p->events, redisLibuvPoll);
} else {
uv_poll_stop(&p->handle);
}
}

static void on_timer_close(uv_handle_t *handle) {
redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
p->timer.data = NULL;
if (!p->handle.data) {
// both timer and handle are closed
hi_free(p);
}
// else, wait for `on_handle_close`
}

static void on_close(uv_handle_t* handle) {
redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
static void on_handle_close(uv_handle_t *handle) {
redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
p->handle.data = NULL;
if (!p->timer.data) {
// timer never started, or timer already destroyed
hi_free(p);
}
// else, wait for `on_timer_close`
}

hi_free(p);
// libuv removed `status` parameter since v0.11.23
// see: /~https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h
// explain:
// 1. !defined(UV_VERSION_PATCH), must < 0.10.2
// 2. major.minor.patch < 0.11.23
// => (major < 0) || (major == 0 && minor < 11) || (minor == 11 && patch < 23)
// major < 0 can't happen
// => (major == 0 && minor < 11) || (minor == 11 && patch < 23)

// < 0.10.2 goes here < 0.10.2~x goes here 0.11.0~22 goes here
MichaelSuen-thePointer marked this conversation as resolved.
Show resolved Hide resolved
#if !defined(UV_VERSION_PATCH) || (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || (UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23)
MichaelSuen-thePointer marked this conversation as resolved.
Show resolved Hide resolved
static void redisLibuvTimeout(uv_timer_t *timer, int status) {
(void)status; // unused
#else
static void redisLibuvTimeout(uv_timer_t *timer) {
#endif
redisLibuvEvents *e = (redisLibuvEvents*)timer->data;
redisAsyncHandleTimeout(e->context);
}

static void redisLibuvSetTimeout(void *privdata, struct timeval tv) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0;
if (!p->timer.data) {
// timer is uninitialized
if (uv_timer_init(p->handle.loop, &p->timer) != 0) {
return;
}
p->timer.data = p;
}
// updates the timeout if the timer has already started
// or start the timer
uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0);
}

static void redisLibuvCleanup(void *privdata) {
redisLibuvEvents* p = (redisLibuvEvents*)privdata;
redisLibuvEvents* p = (redisLibuvEvents*)privdata;

p->context = NULL; // indicate that context might no longer exist
uv_close((uv_handle_t*)&p->handle, on_close);
p->context = NULL; // indicate that context might no longer exist
if (p->timer.data) {
uv_close((uv_handle_t*)&p->timer, on_timer_close);
}
uv_close((uv_handle_t*)&p->handle, on_handle_close);
}


static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) {
redisContext *c = &(ac->c);
redisContext *c = &(ac->c);

if (ac->ev.data != NULL) {
return REDIS_ERR;
}
if (ac->ev.data != NULL) {
return REDIS_ERR;
}

ac->ev.addRead = redisLibuvAddRead;
ac->ev.delRead = redisLibuvDelRead;
ac->ev.addWrite = redisLibuvAddWrite;
ac->ev.delWrite = redisLibuvDelWrite;
ac->ev.cleanup = redisLibuvCleanup;
ac->ev.addRead = redisLibuvAddRead;
ac->ev.delRead = redisLibuvDelRead;
ac->ev.addWrite = redisLibuvAddWrite;
ac->ev.delWrite = redisLibuvDelWrite;
ac->ev.cleanup = redisLibuvCleanup;
ac->ev.scheduleTimer = redisLibuvSetTimeout;

redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p));
if (p == NULL)
return REDIS_ERR;
redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p));
if (p == NULL)
return REDIS_ERR;

memset(p, 0, sizeof(*p));
memset(p, 0, sizeof(*p));

if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) {
return REDIS_ERR;
}
if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) {
return REDIS_ERR;
}

ac->ev.data = p;
p->handle.data = p;
p->context = ac;
ac->ev.data = p;
p->handle.data = p;
p->context = ac;

return REDIS_OK;
return REDIS_OK;
}
#endif
37 changes: 31 additions & 6 deletions examples/example-libuv.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,41 @@
#include <async.h>
#include <adapters/libuv.h>

void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
(void)privdata; //unused
redisReply *reply = r;
if (reply == NULL) {
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/
redisAsyncDisconnect(c);
}

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
if (reply == NULL) {
printf("`GET key` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}
printf("`GET key` result: argv[%s]: %s\n", (char*)privdata, reply->str);

/* Disconnect after receiving the reply to GET */
redisAsyncDisconnect(c);
/* start another request that demonstrate timeout */
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5);
}

void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
printf("connect error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
printf("disconnect because of error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
Expand All @@ -49,8 +64,18 @@ int main (int argc, char **argv) {
redisLibuvAttach(c,loop);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0});

/*
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of libuv adapter.
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request.
Because we have set a 1 second timeout to the connection, the command will always fail with a
timeout error, which is shown in the `debugCallback`.
*/

redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");

uv_run(loop, UV_RUN_DEFAULT);
return 0;
}