diff --git a/src/db.c b/src/db.c
index 958a9f6b..6d287d72 100644
--- a/src/db.c
+++ b/src/db.c
@@ -45,7 +45,7 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
 }
 
 robj *lookupKeyWrite(redisDb *db, robj *key) {
-    deleteIfVolatile(db,key);
+    expireIfNeeded(db,key);
     return lookupKey(db,key);
 }
 
@@ -321,7 +321,6 @@ void renameGenericCommand(redisClient *c, int nx) {
         return;
 
     incrRefCount(o);
-    deleteIfVolatile(c->db,c->argv[2]);
     if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
         if (nx) {
             decrRefCount(o);
@@ -375,7 +374,6 @@ void moveCommand(redisClient *c) {
     }
 
     /* Try to add the element to the target DB */
-    deleteIfVolatile(dst,c->argv[1]);
     if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
         addReply(c,shared.czero);
         return;
@@ -396,23 +394,16 @@ int removeExpire(redisDb *db, robj *key) {
     /* An expire may only be removed if there is a corresponding entry in the
      * main dict. Otherwise, the key will never be freed. */
     redisAssert(dictFind(db->dict,key->ptr) != NULL);
-    if (dictDelete(db->expires,key->ptr) == DICT_OK) {
-        return 1;
-    } else {
-        return 0;
-    }
+    return dictDelete(db->expires,key->ptr) == DICT_OK;
 }
 
-int setExpire(redisDb *db, robj *key, time_t when) {
+void setExpire(redisDb *db, robj *key, time_t when) {
     dictEntry *de;
 
     /* Reuse the sds from the main dict in the expire dict */
-    redisAssert((de = dictFind(db->dict,key->ptr)) != NULL);
-    if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) {
-        return 0;
-    } else {
-        return 1;
-    }
+    de = dictFind(db->dict,key->ptr);
+    redisAssert(de != NULL);
+    dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
 }
 
 /* Return the expire time of the specified key, or -1 if no expire
@@ -430,8 +421,46 @@ time_t getExpire(redisDb *db, robj *key) {
     return (time_t) dictGetEntryVal(de);
 }
 
+/* Propagate expires into slaves and the AOF file.
+ * When a key expires in the master, a DEL operation for this key is sent
+ * to all the slaves and the AOF file if enabled.
+ *
+ * This way the key expiry is centralized in one place, and since both
+ * AOF and the master->slave link guarantee operation ordering, everything
+ * will be consistent even if we allow write operations against expiring
+ * keys. */
+void propagateExpire(redisDb *db, robj *key) {
+    struct redisCommand *cmd;
+    robj *argv[2];
+
+    cmd = lookupCommand("del");
+    argv[0] = createStringObject("DEL",3);
+    argv[1] = key;
+    incrRefCount(key);
+
+    if (server.appendonly)
+        feedAppendOnlyFile(cmd,db->id,argv,2);
+    if (listLength(server.slaves))
+        replicationFeedSlaves(server.slaves,db->id,argv,2);
+
+    decrRefCount(argv[0]);
+    decrRefCount(argv[1]);
+}
+
 int expireIfNeeded(redisDb *db, robj *key) {
     time_t when = getExpire(db,key);
+
+    /* If we are running in the context of a slave, return ASAP:
+     * the slave key expiration is controlled by the master that will
+     * send us synthesized DEL operations for expired keys.
+     *
+     * Still we try to return the right information to the caller, 
+     * that is, 0 if we think the key should be still valid, 1 if
+     * we think the key is expired at this time. */
+    if (server.masterhost != NULL) {
+        return time(NULL) > when;
+    }
+
     if (when < 0) return 0;
 
     /* Return when this key has not expired */
@@ -440,15 +469,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
     /* Delete the key */
     server.stat_expiredkeys++;
     server.dirty++;
-    return dbDelete(db,key);
-}
-
-int deleteIfVolatile(redisDb *db, robj *key) {
-    if (getExpire(db,key) < 0) return 0;
-
-    /* Delete the key */
-    server.stat_expiredkeys++;
-    server.dirty++;
+    propagateExpire(db,key);
     return dbDelete(db,key);
 }
 
@@ -476,13 +497,10 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
         return;
     } else {
         time_t when = time(NULL)+seconds;
-        if (setExpire(c->db,key,when)) {
-            addReply(c,shared.cone);
-            touchWatchedKey(c->db,key);
-            server.dirty++;
-        } else {
-            addReply(c,shared.czero);
-        }
+        setExpire(c->db,key,when);
+        addReply(c,shared.cone);
+        touchWatchedKey(c->db,key);
+        server.dirty++;
         return;
     }
 }
@@ -496,13 +514,28 @@ void expireatCommand(redisClient *c) {
 }
 
 void ttlCommand(redisClient *c) {
-    time_t expire;
-    int ttl = -1;
+    time_t expire, ttl = -1;
 
     expire = getExpire(c->db,c->argv[1]);
     if (expire != -1) {
-        ttl = (int) (expire-time(NULL));
+        ttl = (expire-time(NULL));
         if (ttl < 0) ttl = -1;
     }
-    addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
+    addReplyLongLong(c,(long long)ttl);
+}
+
+void persistCommand(redisClient *c) {
+    dictEntry *de;
+
+    de = dictFind(c->db->dict,c->argv[1]->ptr);
+    if (de == NULL) {
+        addReply(c,shared.czero);
+    } else {
+        if (removeExpire(c->db,c->argv[1])) {
+            addReply(c,shared.cone);
+            server.dirty++;
+        } else {
+            addReply(c,shared.czero);
+        }
+    }
 }
diff --git a/src/networking.c b/src/networking.c
index e5a66984..a39be7c4 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -255,7 +255,8 @@ void freeClient(redisClient *c) {
         server.vm_blocked_clients--;
     }
     listRelease(c->io_keys);
-    /* Master/slave cleanup */
+    /* Master/slave cleanup.
+     * Case 1: we lost the connection with a slave. */
     if (c->flags & REDIS_SLAVE) {
         if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
             close(c->repldbfd);
@@ -264,9 +265,20 @@ void freeClient(redisClient *c) {
         redisAssert(ln != NULL);
         listDelNode(l,ln);
     }
+
+    /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
         server.replstate = REDIS_REPL_CONNECT;
+        /* Since we lost the connection with the master, we should also
+         * close the connection with all our slaves if we have any, so
+         * when we'll resync with the master the other slaves will sync again
+         * with us as well. Note that also when the slave is not connected
+         * to the master it will keep refusing connections by other slaves. */
+        while (listLength(server.slaves)) {
+            ln = listFirst(server.slaves);
+            freeClient((redisClient*)ln->value);
+        }
     }
     /* Release memory */
     zfree(c->argv);
@@ -466,6 +478,7 @@ void closeTimedoutClients(void) {
         if (server.maxidletime &&
             !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
             !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+            !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
             dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
             listLength(c->pubsub_patterns) == 0 &&
             (now - c->lastinteraction > server.maxidletime))
diff --git a/src/object.c b/src/object.c
index 21268340..429ac0ec 100644
--- a/src/object.c
+++ b/src/object.c
@@ -358,6 +358,8 @@ int getLongLongFromObject(robj *o, long long *target) {
         if (o->encoding == REDIS_ENCODING_RAW) {
             value = strtoll(o->ptr, &eptr, 10);
             if (eptr[0] != '\0') return REDIS_ERR;
+            if (errno == ERANGE && (value == LLONG_MIN || value == LLONG_MAX))
+                return REDIS_ERR;
         } else if (o->encoding == REDIS_ENCODING_INT) {
             value = (long)o->ptr;
         } else {
@@ -375,7 +377,7 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con
         if (msg != NULL) {
             addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
         } else {
-            addReplySds(c, sdsnew("-ERR value is not an integer\r\n"));
+            addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n"));
         }
         return REDIS_ERR;
     }
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 43cbc55e..1bd0798b 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -36,6 +36,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <ctype.h>
+#include <errno.h>
 
 #include "anet.h"
 #include "sds.h"
@@ -68,11 +69,14 @@ static struct config {
 static int cliReadReply(int fd);
 static void usage();
 
-static int cliConnect(void) {
+/* Connect to the client. If force is not zero the connection is performed
+ * even if there is already a connected socket. */
+static int cliConnect(int force) {
     char err[ANET_ERR_LEN];
     static int fd = ANET_ERR;
 
-    if (fd == ANET_ERR) {
+    if (fd == ANET_ERR || force) {
+        if (force) close(fd);
         fd = anetTcpConnect(err,config.hostip,config.hostport);
         if (fd == ANET_ERR) {
             fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err);
@@ -170,6 +174,7 @@ static int cliReadBulkReply(int fd) {
 static int cliReadMultiBulkReply(int fd) {
     sds replylen = cliReadLine(fd);
     int elements, c = 1;
+    int retval = 0;
 
     if (replylen == NULL) return 1;
     elements = atoi(replylen);
@@ -183,19 +188,27 @@ static int cliReadMultiBulkReply(int fd) {
     }
     while(elements--) {
         if (config.tty) printf("%d. ", c);
-        if (cliReadReply(fd)) return 1;
+        if (cliReadReply(fd)) retval = 1;
         if (elements) printf("%c",config.mb_sep);
         c++;
     }
-    return 0;
+    return retval;
 }
 
 static int cliReadReply(int fd) {
     char type;
+    int nread;
 
-    if (anetRead(fd,&type,1) <= 0) {
+    if ((nread = anetRead(fd,&type,1)) <= 0) {
         if (config.shutdown) return 0;
-        exit(1);
+        if (config.interactive &&
+            (nread == 0 || (nread == -1 && errno == ECONNRESET)))
+        {
+            return ECONNRESET;
+        } else {
+            printf("I/O error while reading from socket: %s",strerror(errno));
+            exit(1);
+        }
     }
     switch(type) {
     case '-':
@@ -247,7 +260,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
     if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
     if (!strcasecmp(command,"subscribe") ||
         !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
-    if ((fd = cliConnect()) == -1) return 1;
+    if ((fd = cliConnect(0)) == -1) return 1;
 
     /* Select db number */
     retval = selectDb(fd);
@@ -374,91 +387,15 @@ static char **convertToSds(int count, char** args) {
   return sds;
 }
 
-static char **splitArguments(char *line, int *argc) {
-    char *p = line;
-    char *current = NULL;
-    char **vector = NULL;
-
-    *argc = 0;
-    while(1) {
-        /* skip blanks */
-        while(*p && isspace(*p)) p++;
-        if (*p) {
-            /* get a token */
-            int inq=0; /* set to 1 if we are in "quotes" */
-            int done=0;
-
-            if (current == NULL) current = sdsempty();
-            while(!done) {
-                if (inq) {
-                    if (*p == '\\' && *(p+1)) {
-                        char c;
-
-                        p++;
-                        switch(*p) {
-                        case 'n': c = '\n'; break;
-                        case 'r': c = '\r'; break;
-                        case 't': c = '\t'; break;
-                        case 'b': c = '\b'; break;
-                        case 'a': c = '\a'; break;
-                        default: c = *p; break;
-                        }
-                        current = sdscatlen(current,&c,1);
-                    } else if (*p == '"') {
-                        /* closing quote must be followed by a space */
-                        if (*(p+1) && !isspace(*(p+1))) goto err;
-                        done=1;
-                    } else if (!*p) {
-                        /* unterminated quotes */
-                        goto err;
-                    } else {
-                        current = sdscatlen(current,p,1);
-                    }
-                } else {
-                    switch(*p) {
-                    case ' ':
-                    case '\n':
-                    case '\r':
-                    case '\t':
-                    case '\0':
-                        done=1;
-                        break;
-                    case '"':
-                        inq=1;
-                        break;
-                    default:
-                        current = sdscatlen(current,p,1);
-                        break;
-                    }
-                }
-                if (*p) p++;
-            }
-            /* add the token to the vector */
-            vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
-            vector[*argc] = current;
-            (*argc)++;
-            current = NULL;
-        } else {
-            return vector;
-        }
-    }
-
-err:
-    while(*argc--)
-        sdsfree(vector[*argc]);
-    zfree(vector);
-    if (current) sdsfree(current);
-    return NULL;
-}
-
 #define LINE_BUFLEN 4096
 static void repl() {
     int argc, j;
-    char *line, **argv;
+    char *line;
+    sds *argv;
 
     while((line = linenoise("redis> ")) != NULL) {
         if (line[0] != '\0') {
-            argv = splitArguments(line,&argc);
+            argv = sdssplitargs(line,&argc);
             linenoiseHistoryAdd(line);
             if (config.historyfile) linenoiseHistorySave(config.historyfile);
             if (argv == NULL) {
@@ -467,9 +404,21 @@ static void repl() {
             } else if (argc > 0) {
                 if (strcasecmp(argv[0],"quit") == 0 ||
                     strcasecmp(argv[0],"exit") == 0)
-                        exit(0);
-                else
-                    cliSendCommand(argc, argv, 1);
+                {
+                    exit(0);
+                } else {
+                    int err;
+
+                    if ((err = cliSendCommand(argc, argv, 1)) != 0) {
+                        if (err == ECONNRESET) {
+                            printf("Reconnecting... ");
+                            fflush(stdout);
+                            if (cliConnect(1) == -1) exit(1);
+                            printf("OK\n");
+                            cliSendCommand(argc,argv,1);
+                        }
+                    }
+                }
             }
             /* Free the argument vector */
             for (j = 0; j < argc; j++)
diff --git a/src/redis.c b/src/redis.c
index c8b1c781..0ee7a20b 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -170,6 +170,7 @@ struct redisCommand readonlyCommandTable[] = {
     {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
     {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
     {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
+    {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
     {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
     {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
     {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
@@ -435,6 +436,48 @@ void updateDictResizePolicy(void) {
 
 /* ======================= Cron: called every 100 ms ======================== */
 
+/* Try to expire a few timed out keys. The algorithm used is adaptive and
+ * will use few CPU cycles if there are few expiring keys, otherwise
+ * it will get more aggressive to avoid that too much memory is used by
+ * keys that can be removed from the keyspace. */
+void activeExpireCycle(void) {
+    int j;
+
+    for (j = 0; j < server.dbnum; j++) {
+        int expired;
+        redisDb *db = server.db+j;
+
+        /* Continue to expire if at the end of the cycle more than 25%
+         * of the keys were expired. */
+        do {
+            long num = dictSize(db->expires);
+            time_t now = time(NULL);
+
+            expired = 0;
+            if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
+                num = REDIS_EXPIRELOOKUPS_PER_CRON;
+            while (num--) {
+                dictEntry *de;
+                time_t t;
+
+                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
+                t = (time_t) dictGetEntryVal(de);
+                if (now > t) {
+                    sds key = dictGetEntryKey(de);
+                    robj *keyobj = createStringObject(key,sdslen(key));
+
+                    propagateExpire(db,keyobj);
+                    dbDelete(db,keyobj);
+                    decrRefCount(keyobj);
+                    expired++;
+                    server.stat_expiredkeys++;
+                }
+            }
+        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
+    }
+}
+
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops++;
     REDIS_NOTUSED(eventLoop);
@@ -533,41 +576,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
          }
     }
 
-    /* Try to expire a few timed out keys. The algorithm used is adaptive and
-     * will use few CPU cycles if there are few expiring keys, otherwise
-     * it will get more aggressive to avoid that too much memory is used by
-     * keys that can be removed from the keyspace. */
-    for (j = 0; j < server.dbnum; j++) {
-        int expired;
-        redisDb *db = server.db+j;
-
-        /* Continue to expire if at the end of the cycle more than 25%
-         * of the keys were expired. */
-        do {
-            long num = dictSize(db->expires);
-            time_t now = time(NULL);
-
-            expired = 0;
-            if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
-                num = REDIS_EXPIRELOOKUPS_PER_CRON;
-            while (num--) {
-                dictEntry *de;
-                time_t t;
-
-                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
-                t = (time_t) dictGetEntryVal(de);
-                if (now > t) {
-                    sds key = dictGetEntryKey(de);
-                    robj *keyobj = createStringObject(key,sdslen(key));
-
-                    dbDelete(db,keyobj);
-                    decrRefCount(keyobj);
-                    expired++;
-                    server.stat_expiredkeys++;
-                }
-            }
-        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
-    }
+    /* Expire a few keys per cycle, only if this is a master.
+     * On slaves we wait for DEL operations synthesized by the master
+     * in order to guarantee a strict consistency. */
+    if (server.masterhost == NULL) activeExpireCycle();
 
     /* Swap a few keys on disk if we are over the memory limit and VM
      * is enbled. Try to free objects from the free list first. */
@@ -900,9 +912,14 @@ int processCommand(redisClient *c) {
                 resetClient(c);
                 return 1;
             } else {
-                int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
+                char *eptr;
+                long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
+                int perr = eptr[0] != '\0';
+
                 decrRefCount(c->argv[0]);
-                if (bulklen < 0 || bulklen > 1024*1024*1024) {
+                if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
+                    bulklen < 0 || bulklen > 1024*1024*1024)
+                {
                     c->argc--;
                     addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
                     resetClient(c);
@@ -972,10 +989,14 @@ int processCommand(redisClient *c) {
         return 1;
     } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
         /* This is a bulk command, we have to read the last argument yet. */
-        int bulklen = atoi(c->argv[c->argc-1]->ptr);
+        char *eptr;
+        long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
+        int perr = eptr[0] != '\0';
 
         decrRefCount(c->argv[c->argc-1]);
-        if (bulklen < 0 || bulklen > 1024*1024*1024) {
+        if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
+            bulklen < 0 || bulklen > 1024*1024*1024)
+        {
             c->argc--;
             addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
             resetClient(c);
@@ -1064,11 +1085,7 @@ int prepareForShutdown() {
         if (server.vm_enabled) unlink(server.vm_swap_file);
     } else {
         /* Snapshotting. Perform a SYNC SAVE and exit */
-        if (rdbSave(server.dbfilename) == REDIS_OK) {
-            if (server.daemonize)
-                unlink(server.pidfile);
-            redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
-        } else {
+        if (rdbSave(server.dbfilename) != REDIS_OK) {
             /* Ooops.. error saving! The best we can do is to continue
              * operating. Note that if there was a background saving process,
              * in the next cron() Redis will be notified that the background
@@ -1078,6 +1095,7 @@ int prepareForShutdown() {
             return REDIS_ERR;
         }
     }
+    if (server.daemonize) unlink(server.pidfile);
     redisLog(REDIS_WARNING,"Server exit now, bye bye...");
     return REDIS_OK;
 }
@@ -1350,9 +1368,17 @@ void linuxOvercommitMemoryWarning(void) {
 }
 #endif /* __linux__ */
 
+void createPidFile(void) {
+    /* Try to write the pid file in a best-effort way. */
+    FILE *fp = fopen(server.pidfile,"w");
+    if (fp) {
+        fprintf(fp,"%d\n",getpid());
+        fclose(fp);
+    }
+}
+
 void daemonize(void) {
     int fd;
-    FILE *fp;
 
     if (fork() != 0) exit(0); /* parent exits */
     setsid(); /* create a new session */
@@ -1366,12 +1392,6 @@ void daemonize(void) {
         dup2(fd, STDERR_FILENO);
         if (fd > STDERR_FILENO) close(fd);
     }
-    /* Try to write the pid file */
-    fp = fopen(server.pidfile,"w");
-    if (fp) {
-        fprintf(fp,"%d\n",getpid());
-        fclose(fp);
-    }
 }
 
 void version() {
@@ -1404,6 +1424,7 @@ int main(int argc, char **argv) {
     }
     if (server.daemonize) daemonize();
     initServer();
+    if (server.daemonize) createPidFile();
     redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
 #ifdef __linux__
     linuxOvercommitMemoryWarning();
@@ -1480,6 +1501,7 @@ void segvHandler(int sig, siginfo_t *info, void *secret) {
         redisLog(REDIS_WARNING,"%s", messages[i]);
 
     /* free(messages); Don't call free() with possibly corrupted memory. */
+    if (server.daemonize) unlink(server.pidfile);
     _exit(0);
 }
 
diff --git a/src/redis.h b/src/redis.h
index fb051f8e..c35fe53a 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -283,7 +283,7 @@ typedef struct redisClient {
     sds querybuf;
     robj **argv, **mbargv;
     int argc, mbargc;
-    int bulklen;            /* bulk read len. -1 if not in bulk read mode */
+    long bulklen;            /* bulk read len. -1 if not in bulk read mode */
     int multibulk;          /* multi bulk command format active */
     list *reply;
     int sentlen;
@@ -752,10 +752,10 @@ void resetServerSaveParams();
 
 /* db.c -- Keyspace access API */
 int removeExpire(redisDb *db, robj *key);
+void propagateExpire(redisDb *db, robj *key);
 int expireIfNeeded(redisDb *db, robj *key);
-int deleteIfVolatile(redisDb *db, robj *key);
 time_t getExpire(redisDb *db, robj *key);
-int setExpire(redisDb *db, robj *key, time_t when);
+void setExpire(redisDb *db, robj *key, time_t when);
 robj *lookupKey(redisDb *db, robj *key);
 robj *lookupKeyRead(redisDb *db, robj *key);
 robj *lookupKeyWrite(redisDb *db, robj *key);
@@ -838,6 +838,7 @@ void expireCommand(redisClient *c);
 void expireatCommand(redisClient *c);
 void getsetCommand(redisClient *c);
 void ttlCommand(redisClient *c);
+void persistCommand(redisClient *c);
 void slaveofCommand(redisClient *c);
 void debugCommand(redisClient *c);
 void msetCommand(redisClient *c);
diff --git a/src/replication.c b/src/replication.c
index 5387db91..363ce54a 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -176,6 +176,13 @@ void syncCommand(redisClient *c) {
     /* ignore SYNC if aleady slave or in monitor mode */
     if (c->flags & REDIS_SLAVE) return;
 
+    /* Refuse SYNC requests if we are a slave but the link with our master
+     * is not ok... */
+    if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
+        addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n"));
+        return;
+    }
+
     /* SYNC can't be issued when the server has pending data to send to
      * the client about already issued commands. We need a fresh reply
      * buffer registering the differences between the BGSAVE and the current
@@ -392,7 +399,12 @@ int syncWithMaster(void) {
             strerror(errno));
         return REDIS_ERR;
     }
-    if (buf[0] != '$') {
+    if (buf[0] == '-') {
+        close(fd);
+        redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s",
+            buf+1);
+        return REDIS_ERR;
+    } else if (buf[0] != '$') {
         close(fd);
         redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
         return REDIS_ERR;
@@ -416,9 +428,9 @@ int syncWithMaster(void) {
         int nread, nwritten;
 
         nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
-        if (nread == -1) {
+        if (nread <= 0) {
             redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
-                strerror(errno));
+                (nread == -1) ? strerror(errno) : "connection lost");
             close(fd);
             close(dfd);
             return REDIS_ERR;
diff --git a/src/sds.c b/src/sds.c
index 5e67f044..d7d23c45 100644
--- a/src/sds.c
+++ b/src/sds.c
@@ -382,3 +382,92 @@ sds sdscatrepr(sds s, char *p, size_t len) {
     }
     return sdscatlen(s,"\"",1);
 }
+
+/* Split a line into arguments, where every argument can be in the
+ * following programming-language REPL-alike form:
+ *
+ * foo bar "newline are supported\n" and "\xff\x00otherstuff"
+ *
+ * The number of arguments is stored into *argc, and an array
+ * of sds is returned. The caller should sdsfree() all the returned
+ * strings and finally zfree() the array itself.
+ *
+ * Note that sdscatrepr() is able to convert back a string into
+ * a quoted string in the same format sdssplitargs() is able to parse.
+ */
+sds *sdssplitargs(char *line, int *argc) {
+    char *p = line;
+    char *current = NULL;
+    char **vector = NULL;
+
+    *argc = 0;
+    while(1) {
+        /* skip blanks */
+        while(*p && isspace(*p)) p++;
+        if (*p) {
+            /* get a token */
+            int inq=0; /* set to 1 if we are in "quotes" */
+            int done=0;
+
+            if (current == NULL) current = sdsempty();
+            while(!done) {
+                if (inq) {
+                    if (*p == '\\' && *(p+1)) {
+                        char c;
+
+                        p++;
+                        switch(*p) {
+                        case 'n': c = '\n'; break;
+                        case 'r': c = '\r'; break;
+                        case 't': c = '\t'; break;
+                        case 'b': c = '\b'; break;
+                        case 'a': c = '\a'; break;
+                        default: c = *p; break;
+                        }
+                        current = sdscatlen(current,&c,1);
+                    } else if (*p == '"') {
+                        /* closing quote must be followed by a space */
+                        if (*(p+1) && !isspace(*(p+1))) goto err;
+                        done=1;
+                    } else if (!*p) {
+                        /* unterminated quotes */
+                        goto err;
+                    } else {
+                        current = sdscatlen(current,p,1);
+                    }
+                } else {
+                    switch(*p) {
+                    case ' ':
+                    case '\n':
+                    case '\r':
+                    case '\t':
+                    case '\0':
+                        done=1;
+                        break;
+                    case '"':
+                        inq=1;
+                        break;
+                    default:
+                        current = sdscatlen(current,p,1);
+                        break;
+                    }
+                }
+                if (*p) p++;
+            }
+            /* add the token to the vector */
+            vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
+            vector[*argc] = current;
+            (*argc)++;
+            current = NULL;
+        } else {
+            return vector;
+        }
+    }
+
+err:
+    while(*argc--)
+        sdsfree(vector[*argc]);
+    zfree(vector);
+    if (current) sdsfree(current);
+    return NULL;
+}
diff --git a/src/sds.h b/src/sds.h
index ef3a418f..a0e224f5 100644
--- a/src/sds.h
+++ b/src/sds.h
@@ -70,5 +70,6 @@ void sdstolower(sds s);
 void sdstoupper(sds s);
 sds sdsfromlonglong(long long value);
 sds sdscatrepr(sds s, char *p, size_t len);
+sds *sdssplitargs(char *line, int *argc);
 
 #endif
diff --git a/src/t_string.c b/src/t_string.c
index f55595c2..3b8a39bb 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -17,7 +17,6 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
         }
     }
 
-    if (nx) deleteIfVolatile(c->db,key);
     retval = dbAdd(c->db,key,val);
     if (retval == REDIS_ERR) {
         if (!nx) {
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 4b258825..6ca5a6dd 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -23,6 +23,24 @@ start_server {tags {"repl"}} {
             }
             assert_equal [r debug digest] [r -1 debug digest]
         }
+
+        test {MASTER and SLAVE consistency with expire} {
+            createComplexDataset r 50000 useexpire
+            after 4000 ;# Make sure everything expired before taking the digest
+            if {[r debug digest] ne [r -1 debug digest]} {
+                set csv1 [csvdump r]
+                set csv2 [csvdump {r -1}]
+                set fd [open /tmp/repldump1.txt w]
+                puts -nonewline $fd $csv1
+                close $fd
+                set fd [open /tmp/repldump2.txt w]
+                puts -nonewline $fd $csv2
+                close $fd
+                puts "Master - Slave inconsistency"
+                puts "Run diff -u against /tmp/repldump*.txt for more info"
+            }
+            assert_equal [r debug digest] [r -1 debug digest]
+        }
     }
 }
 
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index b9c89aa8..95153111 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -140,12 +140,19 @@ proc findKeyWithType {r type} {
     return {}
 }
 
-proc createComplexDataset {r ops} {
+proc createComplexDataset {r ops {opt {}}} {
     for {set j 0} {$j < $ops} {incr j} {
         set k [randomKey]
         set k2 [randomKey]
         set f [randomValue]
         set v [randomValue]
+
+        if {[lsearch -exact $opt useexpire] != -1} {
+            if {rand() < 0.1} {
+                {*}$r expire [randomKey] [randomInt 2]
+            }
+        }
+
         randpath {
             set d [expr {rand()}]
         } {
diff --git a/tests/unit/basic.tcl b/tests/unit/basic.tcl
index f888cabc..a8f7feb0 100644
--- a/tests/unit/basic.tcl
+++ b/tests/unit/basic.tcl
@@ -148,12 +148,11 @@ start_server {tags {"basic"}} {
         r get novar2
     } {foobared}
 
-    test {SETNX will overwrite EXPIREing key} {
+    test {SETNX against volatile key} {
         r set x 10
         r expire x 10000
-        r setnx x 20
-        r get x
-    } {20}
+        list [r setnx x 20] [r get x]
+    } {0 10}
 
     test {EXISTS} {
         set res {}
@@ -362,13 +361,6 @@ start_server {tags {"basic"}} {
         list [r msetnx x1 xxx y2 yyy] [r get x1] [r get y2]
     } {1 xxx yyy}
 
-    test {MSETNX should remove all the volatile keys even on failure} {
-        r mset x 1 y 2 z 3
-        r expire y 10000
-        r expire z 10000
-        list [r msetnx x A y B z C] [r mget x y z]
-    } {0 {1 {} {}}}
-
     test {STRLEN against non existing key} {
         r strlen notakey
     } {0}
diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl
index b80975b6..6f16ed58 100644
--- a/tests/unit/expire.tcl
+++ b/tests/unit/expire.tcl
@@ -1,12 +1,13 @@
 start_server {tags {"expire"}} {
-    test {EXPIRE - don't set timeouts multiple times} {
+    test {EXPIRE - set timeouts multiple times} {
         r set x foobar
         set v1 [r expire x 5]
         set v2 [r ttl x]
         set v3 [r expire x 10]
         set v4 [r ttl x]
+        r expire x 4
         list $v1 $v2 $v3 $v4
-    } {1 5 0 5}
+    } {1 5 1 10}
 
     test {EXPIRE - It should be still possible to read 'x'} {
         r get x
@@ -19,13 +20,13 @@ start_server {tags {"expire"}} {
         } {{} 0}
     }
 
-    test {EXPIRE - Delete on write policy} {
+    test {EXPIRE - write on expire should work} {
         r del x
         r lpush x foo
         r expire x 1000
         r lpush x bar
         r lrange x 0 -1
-    } {bar}
+    } {bar foo}
 
     test {EXPIREAT - Check for EXPIRE alike behavior} {
         r del x
@@ -59,4 +60,15 @@ start_server {tags {"expire"}} {
         catch {r setex z -10 foo} e
         set _ $e
     } {*invalid expire*}
+
+    test {PERSIST can undo an EXPIRE} {
+        r set x foo
+        r expire x 50
+        list [r ttl x] [r persist x] [r ttl x] [r get x]
+    } {50 1 -1 foo}
+
+    test {PERSIST returns 0 against non existing or non volatile keys} {
+        r set x foo
+        list [r persist foo] [r persist nokeyatall]
+    } {0 0}
 }
diff --git a/tests/unit/protocol.tcl b/tests/unit/protocol.tcl
index 8717cd9f..9eebf77f 100644
--- a/tests/unit/protocol.tcl
+++ b/tests/unit/protocol.tcl
@@ -27,6 +27,13 @@ start_server {} {
         gets $fd
     } {*invalid bulk*count*}
 
+    test {bulk payload is not a number} {
+        set fd [r channel]
+        puts -nonewline $fd "SET x blabla\r\n"
+        flush $fd
+        gets $fd
+    } {*invalid bulk*count*}
+
     test {Multi bulk request not followed by bulk args} {
         set fd [r channel]
         puts -nonewline $fd "*1\r\nfoo\r\n"
diff --git a/utils/redis-copy.rb b/utils/redis-copy.rb
index af214b79..d892e377 100644
--- a/utils/redis-copy.rb
+++ b/utils/redis-copy.rb
@@ -1,12 +1,10 @@
-# redis-sha1.rb - Copyright (C) 2009 Salvatore Sanfilippo
+# redis-copy.rb - Copyright (C) 2009-2010 Salvatore Sanfilippo
 # BSD license, See the COPYING file for more information.
 #
-# Performs the SHA1 sum of the whole datset.
-# This is useful to spot bugs in persistence related code and to make sure
-# Slaves and Masters are in SYNC.
+# Copy the whole dataset from one Redis instance to another one
 #
-# If you hack this code make sure to sort keys and set elements as this are
-# unsorted elements. Otherwise the sum may differ with equal dataset.
+# WARNING: currently hashes and sorted sets are not supported! This
+#          program should be updated.
 
 require 'rubygems'
 require 'redis'