From a3309139992de03f3e68ec648db188c1c43057a0 Mon Sep 17 00:00:00 2001
From: Pieter Noordhuis <pcnoordhuis@gmail.com>
Date: Thu, 19 May 2011 18:53:06 +0200
Subject: [PATCH 01/16] Non-blocking connect with master

---
 src/redis.h       | 10 +++---
 src/replication.c | 82 ++++++++++++++++++++++++++++++-----------------
 2 files changed, 58 insertions(+), 34 deletions(-)

diff --git a/src/redis.h b/src/redis.h
index 5934b6a6..d6b99cea 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -153,10 +153,11 @@
 #define REDIS_REQ_MULTIBULK 2
 
 /* Slave replication state - slave side */
-#define REDIS_REPL_NONE 0   /* No active replication */
-#define REDIS_REPL_CONNECT 1    /* Must connect to master */
-#define REDIS_REPL_TRANSFER 2    /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 3  /* Connected to master */
+#define REDIS_REPL_NONE 0 /* No active replication */
+#define REDIS_REPL_CONNECT 1 /* Must connect to master */
+#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 3 /* Connected to master */
+#define REDIS_REPL_CONNECTING 4 /* Connecting to master */
 
 /* Slave replication state - from the point of view of master
  * Note that in SEND_BULK and ONLINE state the slave receives new updates
@@ -888,7 +889,6 @@ int fwriteBulkCount(FILE *fp, char prefix, int count);
 /* Replication */
 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
 void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
-int syncWithMaster(void);
 void updateSlavesWaitingBgsave(int bgsaveerr);
 void replicationCron(void);
 
diff --git a/src/replication.c b/src/replication.c
index b0fa7055..c3ddd2d3 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -393,46 +393,44 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 }
 
-int syncWithMaster(void) {
-    char buf[1024], tmpfile[256], authcmd[1024];
-    int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
+void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
+    char buf[1024], tmpfile[256];
     int dfd, maxtries = 5;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(privdata);
+    REDIS_NOTUSED(mask);
 
-    if (fd == -1) {
-        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
-            strerror(errno));
-        return REDIS_ERR;
-    }
+    /* Remove this event before anything else. */
+    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
 
     /* AUTH with the master if required. */
     if(server.masterauth) {
-    	snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
-    	if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
-            close(fd);
+        char authcmd[1024];
+        size_t authlen;
+
+        authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
+        if (syncWrite(fd,authcmd,authlen,5) == -1) {
             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
                 strerror(errno));
-            return REDIS_ERR;
-    	}
+            goto error;
+        }
         /* Read the AUTH result.  */
         if (syncReadLine(fd,buf,1024,3600) == -1) {
-            close(fd);
             redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
                 strerror(errno));
-            return REDIS_ERR;
+            goto error;
         }
         if (buf[0] != '+') {
-            close(fd);
             redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
-            return REDIS_ERR;
+            goto error;
         }
     }
 
     /* Issue the SYNC command */
     if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
-        close(fd);
         redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
             strerror(errno));
-        return REDIS_ERR;
+        goto error;
     }
 
     /* Prepare a suitable temp file for bulk transfer */
@@ -444,25 +442,51 @@ int syncWithMaster(void) {
         sleep(1);
     }
     if (dfd == -1) {
-        close(fd);
         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
-        return REDIS_ERR;
+        goto error;
     }
 
     /* Setup the non blocking download of the bulk file. */
-    if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
+    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
             == AE_ERR)
+    {
+        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
+        goto error;
+    }
+
+    server.replstate = REDIS_REPL_TRANSFER;
+    server.repl_transfer_left = -1;
+    server.repl_transfer_fd = dfd;
+    server.repl_transfer_lastio = time(NULL);
+    server.repl_transfer_tmpfile = zstrdup(tmpfile);
+    return;
+
+error:
+    server.replstate = REDIS_REPL_CONNECT;
+    close(fd);
+    return;
+}
+
+int connectWithMaster(void) {
+    int fd;
+
+    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
+    if (fd == -1) {
+        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
+            strerror(errno));
+        return REDIS_ERR;
+    }
+
+    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,
+            syncWithMaster,NULL) == AE_ERR)
     {
         close(fd);
         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
         return REDIS_ERR;
     }
-    server.replstate = REDIS_REPL_TRANSFER;
-    server.repl_transfer_left = -1;
+
     server.repl_transfer_s = fd;
-    server.repl_transfer_fd = dfd;
-    server.repl_transfer_lastio = time(NULL);
-    server.repl_transfer_tmpfile = zstrdup(tmpfile);
+    server.replstate = REDIS_REPL_CONNECTING;
     return REDIS_OK;
 }
 
@@ -517,8 +541,8 @@ void replicationCron(void) {
     /* Check if we should connect to a MASTER */
     if (server.replstate == REDIS_REPL_CONNECT) {
         redisLog(REDIS_NOTICE,"Connecting to MASTER...");
-        if (syncWithMaster() == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
+        if (connectWithMaster() == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
             if (server.appendonly) rewriteAppendOnlyFileBackground();
         }
     }

From b075621fb7d8b44b10d849e8db6db9b2498056ad Mon Sep 17 00:00:00 2001
From: Pieter Noordhuis <pcnoordhuis@gmail.com>
Date: Sun, 22 May 2011 12:41:24 +0200
Subject: [PATCH 02/16] Minor changes in non-blocking repl. connect

---
 src/redis.h       |  6 +++---
 src/replication.c | 29 +++++++++++++++++------------
 2 files changed, 20 insertions(+), 15 deletions(-)

diff --git a/src/redis.h b/src/redis.h
index d6b99cea..19f0b516 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -155,9 +155,9 @@
 /* Slave replication state - slave side */
 #define REDIS_REPL_NONE 0 /* No active replication */
 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
-#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 3 /* Connected to master */
-#define REDIS_REPL_CONNECTING 4 /* Connecting to master */
+#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
+#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 4 /* Connected to master */
 
 /* Slave replication state - from the point of view of master
  * Note that in SEND_BULK and ONLINE state the slave receives new updates
diff --git a/src/replication.c b/src/replication.c
index c3ddd2d3..851a40f8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -319,15 +319,14 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
                 strerror(errno));
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         }
+
         if (buf[0] == '-') {
             redisLog(REDIS_WARNING,
                 "MASTER aborted replication with an error: %s",
                 buf+1);
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         } else if (buf[0] == '\0') {
             /* At this stage just a newline works as a PING in order to take
              * the connection live. So we refresh our last interaction
@@ -336,8 +335,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
             return;
         } else if (buf[0] != '$') {
             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         }
         server.repl_transfer_left = strtol(buf+1,NULL,10);
         redisLog(REDIS_NOTICE,
@@ -359,8 +357,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     server.repl_transfer_lastio = time(NULL);
     if (write(server.repl_transfer_fd,buf,nread) != nread) {
         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
-        replicationAbortSyncTransfer();
-        return;
+        goto error;
     }
     server.repl_transfer_left -= nread;
     /* Check if the transfer is now complete */
@@ -391,6 +388,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
         server.replstate = REDIS_REPL_CONNECTED;
         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
     }
+
+    return;
+
+error:
+    replicationAbortSyncTransfer();
+    return;
 }
 
 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -400,8 +403,10 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     REDIS_NOTUSED(privdata);
     REDIS_NOTUSED(mask);
 
-    /* Remove this event before anything else. */
-    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
+    /* This event should only be triggered once since it is used to have a
+     * non-blocking connect(2) to the master. It has been triggered when this
+     * function is called, so we can delete it. */
+    aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
 
     /* AUTH with the master if required. */
     if(server.masterauth) {
@@ -477,8 +482,8 @@ int connectWithMaster(void) {
         return REDIS_ERR;
     }
 
-    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,
-            syncWithMaster,NULL) == AE_ERR)
+    if (aeCreateFileEvent(server.el,fd,AE_WRITABLE,syncWithMaster,NULL) ==
+            AE_ERR)
     {
         close(fd);
         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");

From 890a2ed989274cb09b5cde1def3935e110ec3cb9 Mon Sep 17 00:00:00 2001
From: Pieter Noordhuis <pcnoordhuis@gmail.com>
Date: Sun, 22 May 2011 12:57:30 +0200
Subject: [PATCH 03/16] Configurable synchronous I/O timeout

---
 src/redis.c       | 1 +
 src/redis.h       | 4 ++++
 src/replication.c | 8 ++++----
 3 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/src/redis.c b/src/redis.c
index 63b41ba8..f64a2d34 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -871,6 +871,7 @@ void initServerConfig() {
     server.masterport = 6379;
     server.master = NULL;
     server.replstate = REDIS_REPL_NONE;
+    server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
     server.repl_serve_stale_data = 1;
 
     /* Double constants initialization */
diff --git a/src/redis.h b/src/redis.h
index 19f0b516..f249d237 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -159,6 +159,9 @@
 #define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
 #define REDIS_REPL_CONNECTED 4 /* Connected to master */
 
+/* Synchronous read timeout - slave side */
+#define REDIS_REPL_SYNCIO_TIMEOUT 5
+
 /* Slave replication state - from the point of view of master
  * Note that in SEND_BULK and ONLINE state the slave receives new updates
  * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
@@ -586,6 +589,7 @@ struct redisServer {
     char *masterhost;
     int masterport;
     redisClient *master;    /* client that is master for this slave */
+    int repl_syncio_timeout; /* timeout for synchronous I/O calls */
     int replstate;          /* replication status if the instance is a slave */
     off_t repl_transfer_left;  /* bytes left reading .rdb  */
     int repl_transfer_s;    /* slave -> master SYNC socket */
diff --git a/src/replication.c b/src/replication.c
index 851a40f8..87d575ca 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -315,7 +315,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     /* If repl_transfer_left == -1 we still have to read the bulk length
      * from the master reply. */
     if (server.repl_transfer_left == -1) {
-        if (syncReadLine(fd,buf,1024,3600) == -1) {
+        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
                 strerror(errno));
@@ -414,13 +414,13 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
         size_t authlen;
 
         authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
-        if (syncWrite(fd,authcmd,authlen,5) == -1) {
+        if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
                 strerror(errno));
             goto error;
         }
         /* Read the AUTH result.  */
-        if (syncReadLine(fd,buf,1024,3600) == -1) {
+        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
                 strerror(errno));
             goto error;
@@ -432,7 +432,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 
     /* Issue the SYNC command */
-    if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
+    if (syncWrite(fd,"SYNC \r\n",7,server.repl_syncio_timeout) == -1) {
         redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
             strerror(errno));
         goto error;

From a45f9a1a1d4c3bc419beeba09d9aae9b2d5b1433 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Sat, 28 May 2011 15:04:12 +0200
Subject: [PATCH 04/16] redis-cli no longer aborts in repl-mode on error, and
 retries to reconncet with the server at every command issued if the state is
 not connected. Also the prompt shows the server we are connected to.

---
 src/redis-cli.c | 31 +++++++++++++++++--------------
 1 file changed, 17 insertions(+), 14 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index b53a4c82..c2d8d9a3 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -87,9 +87,11 @@ static long long mstime(void) {
 
 static void cliRefreshPrompt(void) {
     if (config.dbnum == 0)
-        snprintf(config.prompt,sizeof(config.prompt),"redis> ");
+        snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d> ",
+            config.hostip, config.hostport);
     else
-        snprintf(config.prompt,sizeof(config.prompt),"redis:%d> ",config.dbnum);
+        snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d[%d]> ",
+            config.hostip, config.hostport, config.dbnum);
 }
 
 /*------------------------------------------------------------------------------
@@ -314,10 +316,9 @@ static int cliConnect(int force) {
     return REDIS_OK;
 }
 
-static void cliPrintContextErrorAndExit() {
+static void cliPrintContextError() {
     if (context == NULL) return;
     fprintf(stderr,"Error: %s\n",context->errstr);
-    exit(1);
 }
 
 static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
@@ -436,7 +437,8 @@ static int cliReadReply(int output_raw_strings) {
             if (context->err == REDIS_ERR_EOF)
                 return REDIS_ERR;
         }
-        cliPrintContextErrorAndExit();
+        cliPrintContextError();
+        exit(1);
         return REDIS_ERR; /* avoid compiler warning */
     }
 
@@ -462,10 +464,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
     size_t *argvlen;
     int j, output_raw;
 
-    if (context == NULL) {
-        printf("Not connected, please use: connect <host> <port>\n");
-        return REDIS_OK;
-    }
+    if (context == NULL) return REDIS_ERR;
 
     output_raw = 0;
     if (!strcasecmp(command,"info") ||
@@ -688,7 +687,7 @@ static void repl() {
                         /* If we still cannot send the command,
                          * print error and abort. */
                         if (cliSendCommand(argc,argv,1) != REDIS_OK)
-                            cliPrintContextErrorAndExit();
+                            cliPrintContextError();
                     }
                     elapsed = mstime()-start_time;
                     if (elapsed >= 500) {
@@ -741,11 +740,15 @@ int main(int argc, char **argv) {
     argc -= firstarg;
     argv += firstarg;
 
-    /* Try to connect */
-    if (cliConnect(0) != REDIS_OK) exit(1);
-
     /* Start interactive mode when no command is provided */
-    if (argc == 0) repl();
+    if (argc == 0) {
+        /* Note that in repl mode we don't abort on connection error.
+         * A new attempt will be performed for every command send. */
+        cliConnect(0);
+        repl();
+    }
+
     /* Otherwise, we have some arguments to execute */
+    if (cliConnect(0) != REDIS_OK) exit(1);
     return noninteractive(argc,convertToSds(argc,argv));
 }

From 4d19e3443cc3a36bd51684099152d89e18c45afa Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Sat, 28 May 2011 15:13:55 +0200
Subject: [PATCH 05/16] redis-cli in REPL mode is now able to send the same
 command multiple times, prefixing the command with a number as in "10 ping"

---
 src/redis-cli.c | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index c2d8d9a3..650eafe3 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -680,12 +680,22 @@ static void repl() {
                     linenoiseClearScreen();
                 } else {
                     long long start_time = mstime(), elapsed;
+                    int repeat, skipargs = 0;
 
-                    if (cliSendCommand(argc,argv,1) != REDIS_OK) {
+                    repeat = atoi(argv[0]);
+                    if (repeat) {
+                        skipargs = 1;
+                    } else {
+                        repeat = 1;
+                    }
+
+                    if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
+                        != REDIS_OK)
+                    {
                         cliConnect(1);
 
-                        /* If we still cannot send the command,
-                         * print error and abort. */
+                        /* If we still cannot send the command print error.
+                         * We'll try to reconnect the next time. */
                         if (cliSendCommand(argc,argv,1) != REDIS_OK)
                             cliPrintContextError();
                     }

From 442c748d841bb3603ba1194527ca9da6a1b3cd21 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Sat, 28 May 2011 15:25:48 +0200
Subject: [PATCH 06/16] redis-cli: Use the repetiton prefix after a
 reconnection.

---
 src/redis-cli.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index 650eafe3..42f74af8 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -696,7 +696,8 @@ static void repl() {
 
                         /* If we still cannot send the command print error.
                          * We'll try to reconnect the next time. */
-                        if (cliSendCommand(argc,argv,1) != REDIS_OK)
+                        if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
+                            != REDIS_OK)
                             cliPrintContextError();
                     }
                     elapsed = mstime()-start_time;

From 18f63d8d5197677a4049a372bf657e9fdaf0e890 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Sat, 28 May 2011 15:41:08 +0200
Subject: [PATCH 07/16] redis-cli: -i (interval) implemented, to wait the
 specified number of seconds (decimal digits are allowed) between commands.

---
 src/redis-cli.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index 42f74af8..d0c9d979 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -55,6 +55,7 @@ static struct config {
     int hostport;
     char *hostsocket;
     long repeat;
+    long interval;
     int dbnum;
     int interactive;
     int shutdown;
@@ -517,6 +518,8 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
                 cliRefreshPrompt();
             }
         }
+        if (config.interval) usleep(config.interval);
+        fflush(stdout); /* Make it grep friendly */
     }
 
     free(argvlen);
@@ -552,6 +555,10 @@ static int parseOptions(int argc, char **argv) {
         } else if (!strcmp(argv[i],"-r") && !lastarg) {
             config.repeat = strtoll(argv[i+1],NULL,10);
             i++;
+        } else if (!strcmp(argv[i],"-i") && !lastarg) {
+            double seconds = atof(argv[i+1]);
+            config.interval = seconds*1000000;
+            i++;
         } else if (!strcmp(argv[i],"-n") && !lastarg) {
             config.dbnum = atoi(argv[i+1]);
             i++;
@@ -604,6 +611,8 @@ static void usage() {
 "  -s <socket>      Server socket (overrides hostname and port)\n"
 "  -a <password>    Password to use when connecting to the server\n"
 "  -r <repeat>      Execute specified command N times\n"
+"  -i <interval>    When -r is used, waits <interval> seconds per command.\n"
+"                   It is possible to specify sub-second times like -i 0.1.\n"
 "  -n <db>          Database number\n"
 "  -x               Read last argument from STDIN\n"
 "  -d <delimiter>   Multi-bulk delimiter in for raw formatting (default: \\n)\n"
@@ -615,6 +624,7 @@ static void usage() {
 "  cat /etc/passwd | redis-cli -x set mypasswd\n"
 "  redis-cli get mypasswd\n"
 "  redis-cli -r 100 lpush mylist x\n"
+"  redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
 "\n"
 "When no command is given, redis-cli starts in interactive mode.\n"
 "Type \"help\" in interactive mode for information on available commands.\n"
@@ -736,6 +746,7 @@ int main(int argc, char **argv) {
     config.hostport = 6379;
     config.hostsocket = NULL;
     config.repeat = 1;
+    config.interval = 0;
     config.dbnum = 0;
     config.interactive = 0;
     config.shutdown = 0;

From 615e414c5d9add3c0445d28b6d7ca62905dc8cd8 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Sun, 29 May 2011 15:17:29 +0200
Subject: [PATCH 08/16] INFO now contains the time (in usecs) needed to fork()
 in order to persist. The info is available in the stats section of INFO.

---
 src/aof.c   | 5 ++++-
 src/rdb.c   | 3 +++
 src/redis.c | 7 +++++--
 src/redis.h | 1 +
 4 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/src/aof.c b/src/aof.c
index ef72a2b1..cd409a0b 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -565,16 +565,18 @@ werr:
  */
 int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
+    long long start;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
     if (server.ds_enabled != 0) {
         redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
         return REDIS_ERR;
     }
+    start = ustime();
     if ((childpid = fork()) == 0) {
-        /* Child */
         char tmpfile[256];
 
+        /* Child */
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
@@ -585,6 +587,7 @@ int rewriteAppendOnlyFileBackground(void) {
         }
     } else {
         /* Parent */
+        server.stat_fork_time = ustime()-start;
         if (childpid == -1) {
             redisLog(REDIS_WARNING,
                 "Can't rewrite append only file in background: fork: %s",
diff --git a/src/rdb.c b/src/rdb.c
index eeafc053..0d4940d2 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -482,6 +482,7 @@ werr:
 
 int rdbSaveBackground(char *filename) {
     pid_t childpid;
+    long long start;
 
     if (server.bgsavechildpid != -1 ||
         server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
@@ -493,6 +494,7 @@ int rdbSaveBackground(char *filename) {
         return dsRdbSaveBackground(filename);
     }
 
+    start = ustime();
     if ((childpid = fork()) == 0) {
         int retval;
 
@@ -503,6 +505,7 @@ int rdbSaveBackground(char *filename) {
         _exit((retval == REDIS_OK) ? 0 : 1);
     } else {
         /* Parent */
+        server.stat_fork_time = ustime()-start;
         if (childpid == -1) {
             redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                 strerror(errno));
diff --git a/src/redis.c b/src/redis.c
index f64a2d34..a494e1f7 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -964,6 +964,7 @@ void initServer() {
     server.stat_keyspace_misses = 0;
     server.stat_keyspace_hits = 0;
     server.stat_peak_memory = 0;
+    server.stat_fork_time = 0;
     server.unixtime = time(NULL);
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
@@ -1439,7 +1440,8 @@ sds genRedisInfoString(char *section) {
             "keyspace_hits:%lld\r\n"
             "keyspace_misses:%lld\r\n"
             "pubsub_channels:%ld\r\n"
-            "pubsub_patterns:%u\r\n",
+            "pubsub_patterns:%u\r\n"
+            "latest_fork_usec:%lld\r\n",
             server.stat_numconnections,
             server.stat_numcommands,
             server.stat_expiredkeys,
@@ -1447,7 +1449,8 @@ sds genRedisInfoString(char *section) {
             server.stat_keyspace_hits,
             server.stat_keyspace_misses,
             dictSize(server.pubsub_channels),
-            listLength(server.pubsub_patterns));
+            listLength(server.pubsub_patterns),
+            server.stat_fork_time);
     }
 
     /* Replication */
diff --git a/src/redis.h b/src/redis.h
index f249d237..089a3042 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -547,6 +547,7 @@ struct redisServer {
     long long stat_keyspace_hits;   /* number of successful lookups of keys */
     long long stat_keyspace_misses; /* number of failed lookups of keys */
     size_t stat_peak_memory;        /* max used memory record */
+    long long stat_fork_time;       /* time needed to perform latets fork() */
     /* Configuration */
     int verbosity;
     int maxidletime;

From 632e4c09acad87b999dd944413ac5b75207de571 Mon Sep 17 00:00:00 2001
From: Pieter Noordhuis <pcnoordhuis@gmail.com>
Date: Sun, 29 May 2011 17:55:13 -0700
Subject: [PATCH 09/16] Make replication faster (biggest gain for small number
 of slaves)

---
 src/redis.h       |  1 -
 src/replication.c | 41 ++++++-----------------------------------
 2 files changed, 6 insertions(+), 36 deletions(-)

diff --git a/src/redis.h b/src/redis.h
index 089a3042..4249985f 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -41,7 +41,6 @@
 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */
 #define REDIS_IOBUF_LEN         1024
 #define REDIS_LOADBUF_LEN       1024
-#define REDIS_STATIC_ARGS       8
 #define REDIS_DEFAULT_DBNUM     16
 #define REDIS_CONFIGLINE_MAX    1024
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
diff --git a/src/replication.c b/src/replication.c
index 87d575ca..c8ebb6e7 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -10,38 +10,8 @@
 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     listNode *ln;
     listIter li;
-    int outc = 0, j;
-    robj **outv;
-    /* We need 1+(ARGS*3) objects since commands are using the new protocol
-     * and we one 1 object for the first "*<count>\r\n" multibulk count, then
-     * for every additional object we have "$<count>\r\n" + object + "\r\n". */
-    robj *static_outv[REDIS_STATIC_ARGS*3+1];
-    robj *lenobj;
+    int j;
 
-    if (argc <= REDIS_STATIC_ARGS) {
-        outv = static_outv;
-    } else {
-        outv = zmalloc(sizeof(robj*)*(argc*3+1));
-    }
-
-    lenobj = createObject(REDIS_STRING,
-            sdscatprintf(sdsempty(), "*%d\r\n", argc));
-    lenobj->refcount = 0;
-    outv[outc++] = lenobj;
-    for (j = 0; j < argc; j++) {
-        lenobj = createObject(REDIS_STRING,
-            sdscatprintf(sdsempty(),"$%lu\r\n",
-                (unsigned long) stringObjectLen(argv[j])));
-        lenobj->refcount = 0;
-        outv[outc++] = lenobj;
-        outv[outc++] = argv[j];
-        outv[outc++] = shared.crlf;
-    }
-
-    /* Increment all the refcounts at start and decrement at end in order to
-     * be sure to free objects if there is no slave in a replication state
-     * able to be feed with commands */
-    for (j = 0; j < outc; j++) incrRefCount(outv[j]);
     listRewind(slaves,&li);
     while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
@@ -49,7 +19,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
         /* Don't feed slaves that are still waiting for BGSAVE to start */
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
 
-        /* Feed all the other slaves, MONITORs and so on */
+        /* Feed slaves that are waiting for the initial SYNC (so these commands
+         * are queued in the output buffer until the intial SYNC completes),
+         * or are already in sync with the master. */
         if (slave->slaveseldb != dictid) {
             robj *selectcmd;
 
@@ -73,10 +45,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
             addReply(slave,selectcmd);
             slave->slaveseldb = dictid;
         }
-        for (j = 0; j < outc; j++) addReply(slave,outv[j]);
+        addReplyMultiBulkLen(slave,argc);
+        for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
     }
-    for (j = 0; j < outc; j++) decrRefCount(outv[j]);
-    if (outv != static_outv) zfree(outv);
 }
 
 void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {

From ef231a7c56cafc42ff486b91d88ae7ec642a2117 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 17:47:34 +0200
Subject: [PATCH 10/16] Variadic ZADD

---
 src/redis.c  |   2 +-
 src/t_zset.c | 180 +++++++++++++++++++++++++++------------------------
 2 files changed, 97 insertions(+), 85 deletions(-)

diff --git a/src/redis.c b/src/redis.c
index a494e1f7..35f4395b 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -116,7 +116,7 @@ struct redisCommand redisCommandTable[] = {
     {"sdiff",sdiffCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1,0,0},
     {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1,0,0},
     {"smembers",sinterCommand,2,0,NULL,1,1,1,0,0},
-    {"zadd",zaddCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
+    {"zadd",zaddCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
     {"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
     {"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
     {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
diff --git a/src/t_zset.c b/src/t_zset.c
index a5dc27c7..a0c1b1e8 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -810,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) {
     robj *ele;
     robj *zobj;
     robj *curobj;
-    double score, curscore = 0.0;
+    double score = 0, *scores, curscore = 0.0;
+    int j, elements = (c->argc-2)/2;
+    int added = 0;
 
-    if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
+    if (c->argc % 2) {
+        addReply(c,shared.syntaxerr);
         return;
+    }
 
+    /* Start parsing all the scores, we need to emit any syntax error
+     * before executing additions to the sorted set, as the command should
+     * either execute fully or nothing at all. */
+    scores = zmalloc(sizeof(double)*elements);
+    for (j = 0; j < elements; j++) {
+        if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
+            != REDIS_OK)
+        {
+            zfree(scores);
+            return;
+        }
+    }
+
+    /* Lookup the key and create the sorted set if does not exist. */
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
         if (server.zset_max_ziplist_entries == 0 ||
@@ -828,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) {
     } else {
         if (zobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
+            zfree(scores);
             return;
         }
     }
 
-    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
-        unsigned char *eptr;
+    for (j = 0; j < elements; j++) {
+        score = scores[j];
 
-        /* Prefer non-encoded element when dealing with ziplists. */
-        ele = c->argv[3];
-        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+        if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+            unsigned char *eptr;
+
+            /* Prefer non-encoded element when dealing with ziplists. */
+            ele = c->argv[3+j*2];
+            if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. */
-            if (score != curscore) {
-                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                /* Remove and re-insert when score changed. */
+                if (score != curscore) {
+                    zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                /* Optimize: check if the element is too large or the list
+                 * becomes too long *before* executing zzlInsert. */
                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+                if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
+                if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
+        } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
+            zset *zs = zobj->ptr;
+            zskiplistNode *znode;
+            dictEntry *de;
 
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
-        } else {
-            /* Optimize: check if the element is too large or the list becomes
-             * too long *before* executing zzlInsert. */
-            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
-            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
-                zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
-            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
-                zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
+            ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
+            de = dictFind(zs->dict,ele);
+            if (de != NULL) {
+                curobj = dictGetEntryKey(de);
+                curscore = *(double*)dictGetEntryVal(de);
 
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
-        }
-    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
-        zset *zs = zobj->ptr;
-        zskiplistNode *znode;
-        dictEntry *de;
-
-        ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            curobj = dictGetEntryKey(de);
-            curscore = *(double*)dictGetEntryVal(de);
-
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. We can safely delete
-             * the key object from the skiplist, since the dictionary still has
-             * a reference to it. */
-            if (score != curscore) {
-                redisAssert(zslDelete(zs->zsl,curscore,curobj));
-                znode = zslInsert(zs->zsl,score,curobj);
-                incrRefCount(curobj); /* Re-inserted in skiplist. */
-                dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+                /* Remove and re-insert when score changed. We can safely
+                 * delete the key object from the skiplist, since the
+                 * dictionary still has a reference to it. */
+                if (score != curscore) {
+                    redisAssert(zslDelete(zs->zsl,curscore,curobj));
+                    znode = zslInsert(zs->zsl,score,curobj);
+                    incrRefCount(curobj); /* Re-inserted in skiplist. */
+                    dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                znode = zslInsert(zs->zsl,score,ele);
+                incrRefCount(ele); /* Inserted in skiplist. */
+                redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
+                incrRefCount(ele); /* Added to dictionary. */
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
         } else {
-            znode = zslInsert(zs->zsl,score,ele);
-            incrRefCount(ele); /* Inserted in skiplist. */
-            redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
-            incrRefCount(ele); /* Added to dictionary. */
-
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
+            redisPanic("Unknown sorted set encoding");
         }
-    } else {
-        redisPanic("Unknown sorted set encoding");
     }
+    zfree(scores);
+    if (incr) /* ZINCRBY */
+        addReplyDouble(c,score);
+    else /* ZADD */
+        addReplyLongLong(c,added);
 }
 
 void zaddCommand(redisClient *c) {

From 3fe40d6e3ce724de842bf690b3f5eebaf69fafa2 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 18:35:09 +0200
Subject: [PATCH 11/16] Variadic ZADD tests

---
 tests/test_helper.tcl | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 6dc85eff..bce8ded5 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -110,6 +110,13 @@ proc cleanup {} {
 }
 
 proc execute_everything {} {
+    if 1 {
+        # Use this when hacking on new tests.
+        set ::verbose 1
+        execute_tests "unit/first"
+        return
+    }
+
     execute_tests "unit/printver"
     execute_tests "unit/auth"
     execute_tests "unit/protocol"

From faa2a80f89b5a3cd35812e71893d812e2877ed2e Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 18:49:12 +0200
Subject: [PATCH 12/16] disabled development test entry, tests moved in the
 right place

---
 tests/test_helper.tcl    |  2 +-
 tests/unit/type/zset.tcl | 28 ++++++++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index bce8ded5..e2a9e525 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -110,7 +110,7 @@ proc cleanup {} {
 }
 
 proc execute_everything {} {
-    if 1 {
+    if 0 {
         # Use this when hacking on new tests.
         set ::verbose 1
         execute_tests "unit/first"
diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl
index 761cac49..7b757280 100644
--- a/tests/unit/type/zset.tcl
+++ b/tests/unit/type/zset.tcl
@@ -48,6 +48,34 @@ start_server {tags {"zset"}} {
             assert_error "*NaN*" {r zincrby myzset -inf abc}
         }
 
+        test {ZADD - Variadic version base case} {
+            r del myzset
+            list [r zadd myzset 10 a 20 b 30 c] [r zrange myzset 0 -1 withscores]
+        } {3 {a 10 b 20 c 30}}
+
+        test {ZADD - Return value is the number of actually added items} {
+            list [r zadd myzset 5 x 20 b 30 c] [r zrange myzset 0 -1 withscores]
+        } {1 {x 5 a 10 b 20 c 30}}
+
+        test {ZADD - Variadic version does not add nothing on single parsing err} {
+            r del myzset
+            catch {r zadd myzset 10 a 20 b 30.badscore c} e
+            assert_match {*ERR*not*double*} $e
+            r exists myzset
+        } {0}
+
+        test {ZADD - Variadic version will raise error on missing arg} {
+            r del myzset
+            catch {r zadd myzset 10 a 20 b 30 c 40} e
+            assert_match {*ERR*syntax*} $e
+        }
+
+        test {ZINCRBY does not work variadic even if shares ZADD implementation} {
+            r del myzset
+            catch {r zincrby myzset 10 a 20 b 30 c} e
+            assert_match {*ERR*wrong*number*arg*} $e
+        }
+
         test "ZCARD basics - $encoding" {
             assert_equal 3 [r zcard ztmp]
             assert_equal 0 [r zcard zdoesntexist]

From 3738ff5f32aaadd3074a691544caf2f2daa77928 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 20:14:29 +0200
Subject: [PATCH 13/16] Fix for the variadic version of SREM. Regression test
 added.

---
 src/t_set.c             | 5 ++++-
 tests/unit/type/set.tcl | 6 ++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/src/t_set.c b/src/t_set.c
index be083c8b..c7d05c2f 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -249,8 +249,11 @@ void sremCommand(redisClient *c) {
 
     for (j = 2; j < c->argc; j++) {
         if (setTypeRemove(set,c->argv[j])) {
-            if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
             deleted++;
+            if (setTypeSize(set) == 0) {
+                dbDelete(c->db,c->argv[1]);
+                break;
+            }
         }
     }
     if (deleted) {
diff --git a/tests/unit/type/set.tcl b/tests/unit/type/set.tcl
index 1a37ed61..bdd1f9bf 100644
--- a/tests/unit/type/set.tcl
+++ b/tests/unit/type/set.tcl
@@ -105,6 +105,12 @@ start_server {
         lsort [r smembers myset]
     } {a c}
 
+    test {SREM variadic version with more args needed to destroy the key} {
+        r del myset
+        r sadd myset 1 2 3
+        r srem myset 1 2 3 4 5 6 7 8
+    } {3}
+
     foreach {type} {hashtable intset} {
         for {set i 1} {$i <= 5} {incr i} {
             r del [format "set%d" $i]

From 3f7b2b1f300e6d0441f420fcb51c253b10631cfa Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 20:15:18 +0200
Subject: [PATCH 14/16] Variadic ZREM

---
 src/redis.c  |  2 +-
 src/t_zset.c | 53 ++++++++++++++++++++++++++++++----------------------
 2 files changed, 32 insertions(+), 23 deletions(-)

diff --git a/src/redis.c b/src/redis.c
index 35f4395b..dc8a0032 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -118,7 +118,7 @@ struct redisCommand redisCommandTable[] = {
     {"smembers",sinterCommand,2,0,NULL,1,1,1,0,0},
     {"zadd",zaddCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
     {"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
-    {"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
+    {"zrem",zremCommand,-3,0,NULL,1,1,1,0,0},
     {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
     {"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
     {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
diff --git a/src/t_zset.c b/src/t_zset.c
index a0c1b1e8..6a56c3b4 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -957,8 +957,8 @@ void zincrbyCommand(redisClient *c) {
 
 void zremCommand(redisClient *c) {
     robj *key = c->argv[1];
-    robj *ele = c->argv[2];
     robj *zobj;
+    int deleted = 0, j;
 
     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
         checkType(c,zobj,REDIS_ZSET)) return;
@@ -966,39 +966,48 @@ void zremCommand(redisClient *c) {
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *eptr;
 
-        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
-            zobj->ptr = zzlDelete(zobj->ptr,eptr);
-            if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+        for (j = 2; j < c->argc; j++) {
+            if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
+                deleted++;
+                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                if (zzlLength(zobj->ptr) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         dictEntry *de;
         double score;
 
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            /* Delete from the skiplist */
-            score = *(double*)dictGetEntryVal(de);
-            redisAssert(zslDelete(zs->zsl,score,ele));
+        for (j = 2; j < c->argc; j++) {
+            de = dictFind(zs->dict,c->argv[j]);
+            if (de != NULL) {
+                deleted++;
 
-            /* Delete from the hash table */
-            dictDelete(zs->dict,ele);
-            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-            if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+                /* Delete from the skiplist */
+                score = *(double*)dictGetEntryVal(de);
+                redisAssert(zslDelete(zs->zsl,score,c->argv[j]));
+
+                /* Delete from the hash table */
+                dictDelete(zs->dict,c->argv[j]);
+                if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+                if (dictSize(zs->dict) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
     } else {
         redisPanic("Unknown sorted set encoding");
     }
 
-    signalModifiedKey(c->db,key);
-    server.dirty++;
-    addReply(c,shared.cone);
+    if (deleted) {
+        signalModifiedKey(c->db,key);
+        server.dirty += deleted;
+    }
+    addReplyLongLong(c,deleted);
 }
 
 void zremrangebyscoreCommand(redisClient *c) {

From b002546bb4f543dd4d82e06037ada1bd7c036aec Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 31 May 2011 20:30:54 +0200
Subject: [PATCH 15/16] ZREM tests

---
 tests/unit/type/zset.tcl | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl
index 7b757280..46d40f6f 100644
--- a/tests/unit/type/zset.tcl
+++ b/tests/unit/type/zset.tcl
@@ -93,6 +93,21 @@ start_server {tags {"zset"}} {
             assert_equal 0 [r exists ztmp]
         }
 
+        test "ZREM variadic version" {
+            r del ztmp
+            r zadd ztmp 10 a 20 b 30 c
+            assert_equal 2 [r zrem ztmp x y a b k]
+            assert_equal 0 [r zrem ztmp foo bar]
+            assert_equal 1 [r zrem ztmp c]
+            r exists ztmp
+        } {0}
+
+        test "ZREM variadic version -- remove elements after key deletion" {
+            r del ztmp
+            r zadd ztmp 10 a 20 b 30 c
+            r zrem ztmp a b c d e f g
+        } {3}
+
         test "ZRANGE basics - $encoding" {
             r del ztmp
             r zadd ztmp 1 a

From 936c4ab64bde3c781e965a7e68b154bbd8891e6e Mon Sep 17 00:00:00 2001
From: antirez <antirez@metal.(none)>
Date: Thu, 2 Jun 2011 17:41:42 +0200
Subject: [PATCH 16/16] touch less pages in decrRefCount

---
 src/object.c | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/object.c b/src/object.c
index 22f53837..20e7f57a 100644
--- a/src/object.c
+++ b/src/object.c
@@ -180,7 +180,7 @@ void decrRefCount(void *obj) {
     robj *o = obj;
 
     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
-    if (--(o->refcount) == 0) {
+    if (o->refcount == 1) {
         switch(o->type) {
         case REDIS_STRING: freeStringObject(o); break;
         case REDIS_LIST: freeListObject(o); break;
@@ -189,8 +189,9 @@ void decrRefCount(void *obj) {
         case REDIS_HASH: freeHashObject(o); break;
         default: redisPanic("Unknown object type"); break;
         }
-        o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */
         zfree(o);
+    } else {
+        o->refcount--;
     }
 }