diff --git a/src/Makefile b/src/Makefile index d4551d92..e0592710 100644 --- a/src/Makefile +++ b/src/Makefile @@ -103,7 +103,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o +REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/aof.c b/src/aof.c index dcbcc62e..8cbdec88 100644 --- a/src/aof.c +++ b/src/aof.c @@ -449,6 +449,7 @@ struct redisClient *createFakeClient(void) { c->argv = NULL; c->bufpos = 0; c->flags = 0; + c->btype = REDIS_BLOCKED_NONE; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; diff --git a/src/blocked.c b/src/blocked.c new file mode 100644 index 00000000..3f4dd6e8 --- /dev/null +++ b/src/blocked.c @@ -0,0 +1,121 @@ +/* blocked.c - generic support for blocking operations like BLPOP & WAIT. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "redis.h" + +/* Get a timeout value from an object and store it into 'timeout'. + * The final timeout is always stored as milliseconds as a time where the + * timeout will expire, however the parsing is performed according to + * the 'unit' that can be seconds or milliseconds. + * + * Note that if the timeout is zero (usually from the point of view of + * commands API this means no timeout) the value stored into 'timeout' + * is zero. */ +int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) { + long long tval; + + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + if (tval < 0) { + addReplyError(c,"timeout is negative"); + return REDIS_ERR; + } + + if (tval > 0) { + if (unit == UNIT_SECONDS) tval *= 1000; + tval += mstime(); + } + *timeout = tval; + + return REDIS_OK; +} + +/* Block a client for the specific operation type. Once the REDIS_BLOCKED + * flag is set client query buffer is not longer processed, but accumulated, + * and will be processed when the client is unblocked. */ +void blockClient(redisClient *c, int btype) { + c->flags |= REDIS_BLOCKED; + c->btype = btype; + server.bpop_blocked_clients++; +} + +/* This function is called in the beforeSleep() function of the event loop + * in order to process the pending input buffer of clients that were + * unblocked after a blocking operation. */ +void processUnblockedClients(void) { + listNode *ln; + redisClient *c; + + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + c->flags &= ~REDIS_UNBLOCKED; + c->btype = REDIS_BLOCKED_NONE; + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + server.current_client = c; + processInputBuffer(c); + server.current_client = NULL; + } + } +} + +/* Unblock a client calling the right function depending on the kind + * of operation the client is blocking for. */ +void unblockClient(redisClient *c) { + if (c->btype == REDIS_BLOCKED_LIST) { + unblockClientWaitingData(c); + } else { + redisPanic("Unknown btype in unblockClient()."); + } + /* Clear the flags, and put the client in the unblocked list so that + * we'll process new commands in its query buffer ASAP. */ + c->flags &= ~REDIS_BLOCKED; + c->flags |= REDIS_UNBLOCKED; + c->btype = REDIS_BLOCKED_NONE; + server.bpop_blocked_clients--; + listAddNodeTail(server.unblocked_clients,c); +} + +/* This function gets called when a blocked client timed out in order to + * send it a reply of some kind. */ +void replyToBlockedClientTimedOut(redisClient *c) { + if (c->btype == REDIS_BLOCKED_LIST) { + addReply(c,shared.nullmultibulk); + } else { + redisPanic("Unknown btype in replyToBlockedClientTimedOut()."); + } +} + diff --git a/src/networking.c b/src/networking.c index 0582789f..7ed8d2c0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -108,9 +108,12 @@ redisClient *createClient(int fd) { c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); - c->bpop.keys = dictCreate(&setDictType,NULL); + c->btype = REDIS_BLOCKED_NONE; c->bpop.timeout = 0; + c->bpop.keys = dictCreate(&setDictType,NULL); c->bpop.target = NULL; + c->bpop.numreplicas = 0; + c->bpop.reploffset = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); @@ -666,8 +669,7 @@ void freeClient(redisClient *c) { c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ - if (c->flags & REDIS_BLOCKED) - unblockClientWaitingData(c); + if (c->flags & REDIS_BLOCKED) unblockClient(c); dictRelease(c->bpop.keys); /* UNWATCH all the keys */ diff --git a/src/redis.c b/src/redis.c index e112f1b5..f2939776 100644 --- a/src/redis.c +++ b/src/redis.c @@ -871,7 +871,7 @@ long long getOperationsPerSecond(void) { /* Check for timeouts. Returns non-zero if the client was terminated */ int clientsCronHandleTimeout(redisClient *c) { - time_t now = server.unixtime; + mstime_t now = mstime(); if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ @@ -886,8 +886,8 @@ int clientsCronHandleTimeout(redisClient *c) { return 1; } else if (c->flags & REDIS_BLOCKED) { if (c->bpop.timeout != 0 && c->bpop.timeout < now) { - addReply(c,shared.nullmultibulk); - unblockClientWaitingData(c); + replyToBlockedClientTimedOut(c); + unblockClient(c); } } return 0; @@ -1194,8 +1194,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); - listNode *ln; - redisClient *c; /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ @@ -1203,20 +1201,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Try to process pending commands for clients that were just unblocked. */ - while (listLength(server.unblocked_clients)) { - ln = listFirst(server.unblocked_clients); - redisAssert(ln != NULL); - c = ln->value; - listDelNode(server.unblocked_clients,ln); - c->flags &= ~REDIS_UNBLOCKED; - - /* Process remaining data in the input buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - server.current_client = c; - processInputBuffer(c); - server.current_client = NULL; - } - } + processUnblockedClients(); /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); diff --git a/src/redis.h b/src/redis.h index e4413d28..bf968e3c 100644 --- a/src/redis.h +++ b/src/redis.h @@ -232,6 +232,12 @@ #define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */ #define REDIS_PRE_PSYNC_SLAVE (1<<16) /* Slave don't understand PSYNC. */ +/* Client block type (btype field in client structure) + * if REDIS_BLOCKED flag is set. */ +#define REDIS_BLOCKED_NONE 0 /* Not blocked, no REDIS_BLOCKED flag set. */ +#define REDIS_BLOCKED_LIST 1 /* BLPOP & co. */ +#define REDIS_BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ + /* Client request types */ #define REDIS_REQ_INLINE 1 #define REDIS_REQ_MULTIBULK 2 @@ -419,13 +425,22 @@ typedef struct multiState { time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState; +/* This structure holds the blocking operation state for a client. + * The fields used depend on client->btype. */ typedef struct blockingState { + /* Generic fields. */ + mstime_t timeout; /* Blocking operation timeout. If UNIX current time + * is > timeout then the operation timed out. */ + + /* REDIS_BLOCK_LIST */ dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - time_t timeout; /* Blocking operation timeout. If UNIX current time - * is > timeout then the operation timed out. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ + + /* REDIS_BLOCK_WAIT */ + int numreplicas; /* Number of replicas we are waiting for ACK. */ + long long reploffset; /* Replication offset to reach. */ } blockingState; /* The following structure represents a node in the server.ready_keys list, @@ -479,6 +494,7 @@ typedef struct redisClient { char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ multiState mstate; /* MULTI/EXEC state */ + int btype; /* Type of blocking op if REDIS_BLOCKED. */ blockingState bpop; /* blocking state */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ @@ -1227,6 +1243,13 @@ void sentinelIsRunning(void); /* Scripting */ void scriptingInit(void); +/* Blocked clients */ +void processUnblockedClients(void); +void blockClient(redisClient *c, int btype); +void unblockClient(redisClient *c); +void replyToBlockedClientTimedOut(redisClient *c); +int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit); + /* Git SHA1 */ char *redisGitSHA1(void); char *redisGitDirty(void); diff --git a/src/t_list.c b/src/t_list.c index a8ce9b97..555cb31e 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -778,7 +778,7 @@ void rpoplpushCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; @@ -808,13 +808,11 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj } listAddNodeTail(l,c); } - - /* Mark the client as a blocked client */ - c->flags |= REDIS_BLOCKED; - server.bpop_blocked_clients++; + blockClient(c,REDIS_BLOCKED_LIST); } -/* Unblock a client that's waiting in a blocking operation such as BLPOP */ +/* Unblock a client that's waiting in a blocking operation such as BLPOP. + * You should never call this function directly, but unblockClient() instead. */ void unblockClientWaitingData(redisClient *c) { dictEntry *de; dictIterator *di; @@ -842,10 +840,6 @@ void unblockClientWaitingData(redisClient *c) { decrRefCount(c->bpop.target); c->bpop.target = NULL; } - c->flags &= ~REDIS_BLOCKED; - c->flags |= REDIS_UNBLOCKED; - server.bpop_blocked_clients--; - listAddNodeTail(server.unblocked_clients,c); } /* If the specified key has clients blocked waiting for list pushes, this @@ -1000,10 +994,10 @@ void handleClientsBlockedOnLists(void) { if (value) { /* Protect receiver->bpop.target, that will be - * freed by the next unblockClientWaitingData() + * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); - unblockClientWaitingData(receiver); + unblockClient(receiver); if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, @@ -1036,32 +1030,14 @@ void handleClientsBlockedOnLists(void) { } } -int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { - long tval; - - if (getLongFromObjectOrReply(c,object,&tval, - "timeout is not an integer or out of range") != REDIS_OK) - return REDIS_ERR; - - if (tval < 0) { - addReplyError(c,"timeout is negative"); - return REDIS_ERR; - } - - if (tval > 0) tval += server.unixtime; - *timeout = tval; - - return REDIS_OK; -} - /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - time_t timeout; + mstime_t timeout; int j; - if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK) - return; + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) + != REDIS_OK) return; for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); @@ -1120,10 +1096,10 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - time_t timeout; + mstime_t timeout; - if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK) - return; + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) + != REDIS_OK) return; robj *key = lookupKeyWrite(c->db, c->argv[1]);