diff --git a/deps/hiredis/README.md b/deps/hiredis/README.md index a58101cc..62fe1067 100644 --- a/deps/hiredis/README.md +++ b/deps/hiredis/README.md @@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis. One or more spaces separates arguments, so you can use the specifiers anywhere in an argument: - reply = redisCommand("SET key:%s %s", myid, value); + reply = redisCommand(context, "SET key:%s %s", myid, value); ### Using replies @@ -320,6 +320,10 @@ The reply parsing API consists of the following functions: int redisReaderFeed(redisReader *reader, const char *buf, size_t len); int redisReaderGetReply(redisReader *reader, void **reply); +The same set of functions are used internally by hiredis when creating a +normal Redis context, the above API just exposes it to the user for a direct +usage. + ### Usage The function `redisReaderCreate` creates a `redisReader` structure that holds a @@ -346,6 +350,29 @@ immediately after creating the `redisReader`. For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c) uses customized reply object functions to create Ruby objects. +### Reader max buffer + +Both when using the Reader API directly or when using it indirectly via a +normal Redis context, the redisReader structure uses a buffer in order to +accumulate data from the server. +Usually this buffer is destroyed when it is empty and is larger than 16 +kb in order to avoid wasting memory in unused buffers + +However when working with very big payloads destroying the buffer may slow +down performances considerably, so it is possible to modify the max size of +an idle buffer changing the value of the `maxbuf` field of the reader structure +to the desired value. The special value of 0 means that there is no maximum +value for an idle buffer, so the buffer will never get freed. + +For instance if you have a normal Redis context you can set the maximum idle +buffer to zero (unlimited) just with: + + context->reader->maxbuf = 0; + +This should be done only in order to maximize performances when working with +large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again +as soon as possible in order to prevent allocation of useless memory. + ## AUTHORS Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index f83e2f51..f65f8694 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } + + /* If monitor mode, repush callback */ + if(c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ @@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular callbacks will * get a reply before pub/sub messages arrive. */ if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { - /* A spontaneous reply in a not-subscribed context can only be the - * error reply that is sent when a new connection exceeds the - * maximum number of allowed connections on the server side. This - * is seen as an error instead of a regular reply because the - * server closes the connection after sending it. To prevent the - * error from being overwritten by an EOF error the connection is - * closed here. See issue #43. */ - if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) { + /* + * A spontaneous reply in a not-subscribed context can be the error + * reply that is sent when a new connection exceeds the maximum + * number of allowed connections on the server side. + * + * This is seen as an error instead of a regular reply because the + * server closes the connection after sending it. + * + * To prevent the error from being overwritten by an EOF error the + * connection is closed here. See issue #43. + * + * Another possibility is that the server is loading its dataset. + * In this case we also want to close the connection, and have the + * user wait until the server is ready to take our request. + */ + if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { c->err = REDIS_ERR_OTHER; snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed. */ - assert(c->flags & REDIS_SUBSCRIBED); - __redisGetSubscribeCallback(ac,reply,&cb); + /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ + assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); + if(c->flags & REDIS_SUBSCRIBED) + __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { @@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ + } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + __redisPushCallback(&ac->replies,&cb); } else { if (c->flags & REDIS_SUBSCRIBED) /* This will likely result in an error reply, but it needs to be diff --git a/deps/hiredis/hiredis.c b/deps/hiredis/hiredis.c index e6109db8..4709ee32 100644 --- a/deps/hiredis/hiredis.c +++ b/deps/hiredis/hiredis.c @@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) { long elements; int root = 0; - /* Set error for nested multi bulks with depth > 2 */ + /* Set error for nested multi bulks with depth > 7 */ if (r->ridx == 8) { __redisReaderSetError(r,REDIS_ERR_PROTOCOL, "No support for nested multi bulk replies with depth > 7"); @@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) { r->errstr[0] = '\0'; r->fn = &defaultFunctions; r->buf = sdsempty(); + r->maxbuf = REDIS_READER_MAX_BUF; if (r->buf == NULL) { free(r); return NULL; @@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { -#if 0 /* Destroy internal buffer when it is empty and is quite large. */ - if (r->len == 0 && sdsavail(r->buf) > 16*1024) { + if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0; @@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* r->buf should not be NULL since we just free'd a larger one. */ assert(r->buf != NULL); } -#endif newbuf = sdscatlen(r->buf,buf,len); if (newbuf == NULL) { diff --git a/deps/hiredis/hiredis.h b/deps/hiredis/hiredis.h index a73f50e9..b922831e 100644 --- a/deps/hiredis/hiredis.h +++ b/deps/hiredis/hiredis.h @@ -76,6 +76,9 @@ /* Flag that is set when the async context has one or more subscriptions. */ #define REDIS_SUBSCRIBED 0x20 +/* Flag that is set when monitor mode is active */ +#define REDIS_MONITORING 0x40 + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 @@ -83,6 +86,8 @@ #define REDIS_REPLY_STATUS 5 #define REDIS_REPLY_ERROR 6 +#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */ + #ifdef __cplusplus extern "C" { #endif @@ -122,6 +127,7 @@ typedef struct redisReader { char *buf; /* Read buffer */ size_t pos; /* Buffer cursor */ size_t len; /* Buffer length */ + size_t maxbuf; /* Max length of unused buffer */ redisReadTask rstack[9]; int ridx; /* Index of current read task */ diff --git a/deps/hiredis/net.c b/deps/hiredis/net.c index 158e1dd8..82ab2b46 100644 --- a/deps/hiredis/net.c +++ b/deps/hiredis/net.c @@ -45,6 +45,8 @@ #include #include #include +#include +#include #include "net.h" #include "sds.h" @@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) { return REDIS_OK; } +#define __MAX_MSEC (((LONG_MAX) - 999) / 1000) + static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) { - struct timeval to; - struct timeval *toptr = NULL; - fd_set wfd; + struct pollfd wfd[1]; + long msec; + + msec = -1; + wfd[0].fd = fd; + wfd[0].events = POLLOUT; /* Only use timeout when not NULL. */ if (timeout != NULL) { - to = *timeout; - toptr = &to; - } - - if (errno == EINPROGRESS) { - FD_ZERO(&wfd); - FD_SET(fd, &wfd); - - if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) { - __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)"); + if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) { close(fd); return REDIS_ERR; } - if (!FD_ISSET(fd, &wfd)) { + msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000); + + if (msec < 0 || msec > INT_MAX) { + msec = INT_MAX; + } + } + + if (errno == EINPROGRESS) { + int res; + + if ((res = poll(wfd, 1, msec)) == -1) { + __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)"); + close(fd); + return REDIS_ERR; + } else if (res == 0) { errno = ETIMEDOUT; __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); close(fd);