From 82b672f6335ac2db32a724ba5dc10398c949a4a8 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 3 Dec 2013 17:43:53 +0100
Subject: [PATCH] BLPOP blocking code refactored to be generic & reusable.

---
 src/Makefile     |   2 +-
 src/aof.c        |   1 +
 src/blocked.c    | 121 +++++++++++++++++++++++++++++++++++++++++++++++
 src/networking.c |   8 ++--
 src/redis.c      |  23 ++-------
 src/redis.h      |  27 ++++++++++-
 src/t_list.c     |  48 +++++--------------
 7 files changed, 169 insertions(+), 61 deletions(-)
 create mode 100644 src/blocked.c

diff --git a/src/Makefile b/src/Makefile
index d4551d92..e0592710 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -103,7 +103,7 @@ endif
 
 REDIS_SERVER_NAME=redis-server
 REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o
+REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o
 REDIS_CLI_NAME=redis-cli
 REDIS_CLI_OBJ=anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
 REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/aof.c b/src/aof.c
index dcbcc62e..8cbdec88 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -449,6 +449,7 @@ struct redisClient *createFakeClient(void) {
     c->argv = NULL;
     c->bufpos = 0;
     c->flags = 0;
+    c->btype = REDIS_BLOCKED_NONE;
     /* We set the fake client as a slave waiting for the synchronization
      * so that Redis will not try to send replies to this client. */
     c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
diff --git a/src/blocked.c b/src/blocked.c
new file mode 100644
index 00000000..3f4dd6e8
--- /dev/null
+++ b/src/blocked.c
@@ -0,0 +1,121 @@
+/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+
+/* Get a timeout value from an object and store it into 'timeout'.
+ * The final timeout is always stored as milliseconds as a time where the
+ * timeout will expire, however the parsing is performed according to
+ * the 'unit' that can be seconds or milliseconds.
+ *
+ * Note that if the timeout is zero (usually from the point of view of
+ * commands API this means no timeout) the value stored into 'timeout'
+ * is zero. */
+int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) {
+    long long tval;
+
+    if (getLongLongFromObjectOrReply(c,object,&tval,
+        "timeout is not an integer or out of range") != REDIS_OK)
+        return REDIS_ERR;
+
+    if (tval < 0) {
+        addReplyError(c,"timeout is negative");
+        return REDIS_ERR;
+    }
+
+    if (tval > 0) {
+        if (unit == UNIT_SECONDS) tval *= 1000;
+        tval += mstime();
+    }
+    *timeout = tval;
+
+    return REDIS_OK;
+}
+
+/* Block a client for the specific operation type. Once the REDIS_BLOCKED
+ * flag is set client query buffer is not longer processed, but accumulated,
+ * and will be processed when the client is unblocked. */
+void blockClient(redisClient *c, int btype) {
+    c->flags |= REDIS_BLOCKED;
+    c->btype = btype;
+    server.bpop_blocked_clients++;
+}
+
+/* This function is called in the beforeSleep() function of the event loop
+ * in order to process the pending input buffer of clients that were
+ * unblocked after a blocking operation. */
+void processUnblockedClients(void) {
+    listNode *ln;
+    redisClient *c;
+
+    while (listLength(server.unblocked_clients)) {
+        ln = listFirst(server.unblocked_clients);
+        redisAssert(ln != NULL);
+        c = ln->value;
+        listDelNode(server.unblocked_clients,ln);
+        c->flags &= ~REDIS_UNBLOCKED;
+        c->btype = REDIS_BLOCKED_NONE;
+
+        /* Process remaining data in the input buffer. */
+        if (c->querybuf && sdslen(c->querybuf) > 0) {
+            server.current_client = c;
+            processInputBuffer(c);
+            server.current_client = NULL;
+        }
+    }
+}
+
+/* Unblock a client calling the right function depending on the kind
+ * of operation the client is blocking for. */
+void unblockClient(redisClient *c) {
+    if (c->btype == REDIS_BLOCKED_LIST) {
+        unblockClientWaitingData(c);
+    } else {
+        redisPanic("Unknown btype in unblockClient().");
+    }
+    /* Clear the flags, and put the client in the unblocked list so that
+     * we'll process new commands in its query buffer ASAP. */
+    c->flags &= ~REDIS_BLOCKED;
+    c->flags |= REDIS_UNBLOCKED;
+    c->btype = REDIS_BLOCKED_NONE;
+    server.bpop_blocked_clients--;
+    listAddNodeTail(server.unblocked_clients,c);
+}
+
+/* This function gets called when a blocked client timed out in order to
+ * send it a reply of some kind. */
+void replyToBlockedClientTimedOut(redisClient *c) {
+    if (c->btype == REDIS_BLOCKED_LIST) {
+        addReply(c,shared.nullmultibulk);
+    } else {
+        redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
+    }
+}
+
diff --git a/src/networking.c b/src/networking.c
index 0582789f..7ed8d2c0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -108,9 +108,12 @@ redisClient *createClient(int fd) {
     c->obuf_soft_limit_reached_time = 0;
     listSetFreeMethod(c->reply,decrRefCountVoid);
     listSetDupMethod(c->reply,dupClientReplyValue);
-    c->bpop.keys = dictCreate(&setDictType,NULL);
+    c->btype = REDIS_BLOCKED_NONE;
     c->bpop.timeout = 0;
+    c->bpop.keys = dictCreate(&setDictType,NULL);
     c->bpop.target = NULL;
+    c->bpop.numreplicas = 0;
+    c->bpop.reploffset = 0;
     c->watched_keys = listCreate();
     c->pubsub_channels = dictCreate(&setDictType,NULL);
     c->pubsub_patterns = listCreate();
@@ -666,8 +669,7 @@ void freeClient(redisClient *c) {
     c->querybuf = NULL;
 
     /* Deallocate structures used to block on blocking ops. */
-    if (c->flags & REDIS_BLOCKED)
-        unblockClientWaitingData(c);
+    if (c->flags & REDIS_BLOCKED) unblockClient(c);
     dictRelease(c->bpop.keys);
 
     /* UNWATCH all the keys */
diff --git a/src/redis.c b/src/redis.c
index e112f1b5..f2939776 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -871,7 +871,7 @@ long long getOperationsPerSecond(void) {
 
 /* Check for timeouts. Returns non-zero if the client was terminated */
 int clientsCronHandleTimeout(redisClient *c) {
-    time_t now = server.unixtime;
+    mstime_t now = mstime();
 
     if (server.maxidletime &&
         !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
@@ -886,8 +886,8 @@ int clientsCronHandleTimeout(redisClient *c) {
         return 1;
     } else if (c->flags & REDIS_BLOCKED) {
         if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
-            addReply(c,shared.nullmultibulk);
-            unblockClientWaitingData(c);
+            replyToBlockedClientTimedOut(c);
+            unblockClient(c);
         }
     }
     return 0;
@@ -1194,8 +1194,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
  * for ready file descriptors. */
 void beforeSleep(struct aeEventLoop *eventLoop) {
     REDIS_NOTUSED(eventLoop);
-    listNode *ln;
-    redisClient *c;
 
     /* Run a fast expire cycle (the called function will return
      * ASAP if a fast cycle is not needed). */
@@ -1203,20 +1201,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
 
     /* Try to process pending commands for clients that were just unblocked. */
-    while (listLength(server.unblocked_clients)) {
-        ln = listFirst(server.unblocked_clients);
-        redisAssert(ln != NULL);
-        c = ln->value;
-        listDelNode(server.unblocked_clients,ln);
-        c->flags &= ~REDIS_UNBLOCKED;
-
-        /* Process remaining data in the input buffer. */
-        if (c->querybuf && sdslen(c->querybuf) > 0) {
-            server.current_client = c;
-            processInputBuffer(c);
-            server.current_client = NULL;
-        }
-    }
+    processUnblockedClients();
 
     /* Write the AOF buffer on disk */
     flushAppendOnlyFile(0);
diff --git a/src/redis.h b/src/redis.h
index e4413d28..bf968e3c 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -232,6 +232,12 @@
 #define REDIS_FORCE_REPL (1<<15)  /* Force replication of current cmd. */
 #define REDIS_PRE_PSYNC_SLAVE (1<<16) /* Slave don't understand PSYNC. */
 
+/* Client block type (btype field in client structure)
+ * if REDIS_BLOCKED flag is set. */
+#define REDIS_BLOCKED_NONE 0    /* Not blocked, no REDIS_BLOCKED flag set. */
+#define REDIS_BLOCKED_LIST 1    /* BLPOP & co. */
+#define REDIS_BLOCKED_WAIT 2    /* WAIT for synchronous replication. */
+
 /* Client request types */
 #define REDIS_REQ_INLINE 1
 #define REDIS_REQ_MULTIBULK 2
@@ -419,13 +425,22 @@ typedef struct multiState {
     time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
 } multiState;
 
+/* This structure holds the blocking operation state for a client.
+ * The fields used depend on client->btype. */
 typedef struct blockingState {
+    /* Generic fields. */
+    mstime_t timeout;       /* Blocking operation timeout. If UNIX current time
+                             * is > timeout then the operation timed out. */
+
+    /* REDIS_BLOCK_LIST */
     dict *keys;             /* The keys we are waiting to terminate a blocking
                              * operation such as BLPOP. Otherwise NULL. */
-    time_t timeout;         /* Blocking operation timeout. If UNIX current time
-                             * is > timeout then the operation timed out. */
     robj *target;           /* The key that should receive the element,
                              * for BRPOPLPUSH. */
+
+    /* REDIS_BLOCK_WAIT */
+    int numreplicas;        /* Number of replicas we are waiting for ACK. */
+    long long reploffset;   /* Replication offset to reach. */
 } blockingState;
 
 /* The following structure represents a node in the server.ready_keys list,
@@ -479,6 +494,7 @@ typedef struct redisClient {
     char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
     int slave_listening_port; /* As configured with: SLAVECONF listening-port */
     multiState mstate;      /* MULTI/EXEC state */
+    int btype;              /* Type of blocking op if REDIS_BLOCKED. */
     blockingState bpop;   /* blocking state */
     list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
     dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
@@ -1227,6 +1243,13 @@ void sentinelIsRunning(void);
 /* Scripting */
 void scriptingInit(void);
 
+/* Blocked clients */
+void processUnblockedClients(void);
+void blockClient(redisClient *c, int btype);
+void unblockClient(redisClient *c);
+void replyToBlockedClientTimedOut(redisClient *c);
+int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit);
+
 /* Git SHA1 */
 char *redisGitSHA1(void);
 char *redisGitDirty(void);
diff --git a/src/t_list.c b/src/t_list.c
index a8ce9b97..555cb31e 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -778,7 +778,7 @@ void rpoplpushCommand(redisClient *c) {
 
 /* Set a client in blocking mode for the specified key, with the specified
  * timeout */
-void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
+void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
     dictEntry *de;
     list *l;
     int j;
@@ -808,13 +808,11 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
         }
         listAddNodeTail(l,c);
     }
-
-    /* Mark the client as a blocked client */
-    c->flags |= REDIS_BLOCKED;
-    server.bpop_blocked_clients++;
+    blockClient(c,REDIS_BLOCKED_LIST);
 }
 
-/* Unblock a client that's waiting in a blocking operation such as BLPOP */
+/* Unblock a client that's waiting in a blocking operation such as BLPOP.
+ * You should never call this function directly, but unblockClient() instead. */
 void unblockClientWaitingData(redisClient *c) {
     dictEntry *de;
     dictIterator *di;
@@ -842,10 +840,6 @@ void unblockClientWaitingData(redisClient *c) {
         decrRefCount(c->bpop.target);
         c->bpop.target = NULL;
     }
-    c->flags &= ~REDIS_BLOCKED;
-    c->flags |= REDIS_UNBLOCKED;
-    server.bpop_blocked_clients--;
-    listAddNodeTail(server.unblocked_clients,c);
 }
 
 /* If the specified key has clients blocked waiting for list pushes, this
@@ -1000,10 +994,10 @@ void handleClientsBlockedOnLists(void) {
 
                         if (value) {
                             /* Protect receiver->bpop.target, that will be
-                             * freed by the next unblockClientWaitingData()
+                             * freed by the next unblockClient()
                              * call. */
                             if (dstkey) incrRefCount(dstkey);
-                            unblockClientWaitingData(receiver);
+                            unblockClient(receiver);
 
                             if (serveClientBlockedOnList(receiver,
                                 rl->key,dstkey,rl->db,value,
@@ -1036,32 +1030,14 @@ void handleClientsBlockedOnLists(void) {
     }
 }
 
-int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
-    long tval;
-
-    if (getLongFromObjectOrReply(c,object,&tval,
-        "timeout is not an integer or out of range") != REDIS_OK)
-        return REDIS_ERR;
-
-    if (tval < 0) {
-        addReplyError(c,"timeout is negative");
-        return REDIS_ERR;
-    }
-
-    if (tval > 0) tval += server.unixtime;
-    *timeout = tval;
-
-    return REDIS_OK;
-}
-
 /* Blocking RPOP/LPOP */
 void blockingPopGenericCommand(redisClient *c, int where) {
     robj *o;
-    time_t timeout;
+    mstime_t timeout;
     int j;
 
-    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
-        return;
+    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
+        != REDIS_OK) return;
 
     for (j = 1; j < c->argc-1; j++) {
         o = lookupKeyWrite(c->db,c->argv[j]);
@@ -1120,10 +1096,10 @@ void brpopCommand(redisClient *c) {
 }
 
 void brpoplpushCommand(redisClient *c) {
-    time_t timeout;
+    mstime_t timeout;
 
-    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
-        return;
+    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
+        != REDIS_OK) return;
 
     robj *key = lookupKeyWrite(c->db, c->argv[1]);