From bcf2995c987acea7f5485ec0e3717a29a7e98457 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 2 Aug 2010 18:13:39 +0200 Subject: [PATCH 01/18] support for write operations against expiring keys, by master-controlled expiring in replication and AOF synthesizing DEL operations --- src/db.c | 51 +++++++++++++++++++++++-------- src/redis.c | 81 ++++++++++++++++++++++++++++---------------------- src/redis.h | 2 +- src/t_string.c | 1 - 4 files changed, 86 insertions(+), 49 deletions(-) diff --git a/src/db.c b/src/db.c index 958a9f6b..d8a5d0b2 100644 --- a/src/db.c +++ b/src/db.c @@ -45,7 +45,7 @@ robj *lookupKeyRead(redisDb *db, robj *key) { } robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); + expireIfNeeded(db,key); return lookupKey(db,key); } @@ -321,7 +321,6 @@ void renameGenericCommand(redisClient *c, int nx) { return; incrRefCount(o); - deleteIfVolatile(c->db,c->argv[2]); if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); @@ -375,7 +374,6 @@ void moveCommand(redisClient *c) { } /* Try to add the element to the target DB */ - deleteIfVolatile(dst,c->argv[1]); if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; @@ -430,8 +428,45 @@ time_t getExpire(redisDb *db, robj *key) { return (time_t) dictGetEntryVal(de); } +/* Propagate expires into slaves and the AOF file. + * When a key expires in the master, a DEL operation for this key is sent + * to all the slaves and the AOF file if enabled. + * + * This way the key expiry is centralized in one place, and since both + * AOF and the master->slave link guarantee operation ordering, everything + * will be consistent even if we allow write operations against expiring + * keys. */ +void propagateExpire(redisDb *db, robj *key) { + struct redisCommand *cmd; + robj *argv[2]; + + cmd = lookupCommand("del"); + argv[0] = createStringObject("DEL",3); + argv[1] = key; + incrRefCount(key); + + if (server.appendonly) + feedAppendOnlyFile(cmd,db->id,argv,2); + if (listLength(server.slaves)) + replicationFeedSlaves(server.slaves,db->id,argv,2); + + decrRefCount(key); +} + int expireIfNeeded(redisDb *db, robj *key) { time_t when = getExpire(db,key); + + /* If we are running in the context of a slave, return ASAP: + * the slave key expiration is controlled by the master that will + * send us synthesized DEL operations for expired keys. + * + * Still we try to return the right information to the caller, + * that is, 0 if we think the key should be still valid, 1 if + * we think the key is expired at this time. */ + if (server.masterhost != NULL) { + return time(NULL) > when; + } + if (when < 0) return 0; /* Return when this key has not expired */ @@ -440,15 +475,7 @@ int expireIfNeeded(redisDb *db, robj *key) { /* Delete the key */ server.stat_expiredkeys++; server.dirty++; - return dbDelete(db,key); -} - -int deleteIfVolatile(redisDb *db, robj *key) { - if (getExpire(db,key) < 0) return 0; - - /* Delete the key */ - server.stat_expiredkeys++; - server.dirty++; + propagateExpire(db,key); return dbDelete(db,key); } diff --git a/src/redis.c b/src/redis.c index c8b1c781..27ade8b1 100644 --- a/src/redis.c +++ b/src/redis.c @@ -435,6 +435,48 @@ void updateDictResizePolicy(void) { /* ======================= Cron: called every 100 ms ======================== */ +/* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. */ +void activeExpireCycle(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + int expired; + redisDb *db = server.db+j; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + long num = dictSize(db->expires); + time_t now = time(NULL); + + expired = 0; + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj); + dbDelete(db,keyobj); + decrRefCount(keyobj); + expired++; + server.stat_expiredkeys++; + } + } + } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); + } +} + + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@ -533,41 +575,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* Try to expire a few timed out keys. The algorithm used is adaptive and - * will use few CPU cycles if there are few expiring keys, otherwise - * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ - for (j = 0; j < server.dbnum; j++) { - int expired; - redisDb *db = server.db+j; - - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - long num = dictSize(db->expires); - time_t now = time(NULL); - - expired = 0; - if (num > REDIS_EXPIRELOOKUPS_PER_CRON) - num = REDIS_EXPIRELOOKUPS_PER_CRON; - while (num--) { - dictEntry *de; - time_t t; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - t = (time_t) dictGetEntryVal(de); - if (now > t) { - sds key = dictGetEntryKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - dbDelete(db,keyobj); - decrRefCount(keyobj); - expired++; - server.stat_expiredkeys++; - } - } - } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); - } + /* Expire a few keys per cycle, only if this is a master. + * On slaves we wait for DEL operations synthesized by the master + * in order to guarantee a strict consistency. */ + if (server.masterhost == NULL) activeExpireCycle(); /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ diff --git a/src/redis.h b/src/redis.h index fb051f8e..27520c19 100644 --- a/src/redis.h +++ b/src/redis.h @@ -752,8 +752,8 @@ void resetServerSaveParams(); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); +void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); -int deleteIfVolatile(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); int setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); diff --git a/src/t_string.c b/src/t_string.c index f55595c2..3b8a39bb 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -17,7 +17,6 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir } } - if (nx) deleteIfVolatile(c->db,key); retval = dbAdd(c->db,key,val); if (retval == REDIS_ERR) { if (!nx) { From c25a5d3b1062f3398a96a76ecd27c6f3a77a446e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 2 Aug 2010 21:37:39 +0200 Subject: [PATCH 02/18] memory leak removed from expire propagation code --- src/db.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index d8a5d0b2..6ac2b0d7 100644 --- a/src/db.c +++ b/src/db.c @@ -450,7 +450,8 @@ void propagateExpire(redisDb *db, robj *key) { if (listLength(server.slaves)) replicationFeedSlaves(server.slaves,db->id,argv,2); - decrRefCount(key); + decrRefCount(argv[0]); + decrRefCount(argv[1]); } int expireIfNeeded(redisDb *db, robj *key) { From 0cf5b7b57cde8b699198a866b04feca9f5394d03 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 3 Aug 2010 12:26:30 +0200 Subject: [PATCH 03/18] allow to set a new EXPIRE of an existing volatile key --- src/db.c | 22 ++++++++-------------- src/redis.h | 2 +- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/db.c b/src/db.c index 6ac2b0d7..5acda3d5 100644 --- a/src/db.c +++ b/src/db.c @@ -401,16 +401,13 @@ int removeExpire(redisDb *db, robj *key) { } } -int setExpire(redisDb *db, robj *key, time_t when) { +void setExpire(redisDb *db, robj *key, time_t when) { dictEntry *de; /* Reuse the sds from the main dict in the expire dict */ - redisAssert((de = dictFind(db->dict,key->ptr)) != NULL); - if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) { - return 0; - } else { - return 1; - } + de = dictFind(db->dict,key->ptr); + redisAssert(de != NULL); + dictReplace(db->expires,dictGetEntryKey(de),(void*)when); } /* Return the expire time of the specified key, or -1 if no expire @@ -504,13 +501,10 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) { return; } else { time_t when = time(NULL)+seconds; - if (setExpire(c->db,key,when)) { - addReply(c,shared.cone); - touchWatchedKey(c->db,key); - server.dirty++; - } else { - addReply(c,shared.czero); - } + setExpire(c->db,key,when); + addReply(c,shared.cone); + touchWatchedKey(c->db,key); + server.dirty++; return; } } diff --git a/src/redis.h b/src/redis.h index 27520c19..c211cfb5 100644 --- a/src/redis.h +++ b/src/redis.h @@ -755,7 +755,7 @@ int removeExpire(redisDb *db, robj *key); void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); -int setExpire(redisDb *db, robj *key, time_t when); +void setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); From 2c572622fb99f32328de58f815953f17d4ad0e4d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 3 Aug 2010 13:08:32 +0200 Subject: [PATCH 04/18] no longer passing tests due to the new write-on-volatile semantics modified/removed --- tests/unit/basic.tcl | 14 +++----------- tests/unit/expire.tcl | 9 +++++---- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/tests/unit/basic.tcl b/tests/unit/basic.tcl index f888cabc..a8f7feb0 100644 --- a/tests/unit/basic.tcl +++ b/tests/unit/basic.tcl @@ -148,12 +148,11 @@ start_server {tags {"basic"}} { r get novar2 } {foobared} - test {SETNX will overwrite EXPIREing key} { + test {SETNX against volatile key} { r set x 10 r expire x 10000 - r setnx x 20 - r get x - } {20} + list [r setnx x 20] [r get x] + } {0 10} test {EXISTS} { set res {} @@ -362,13 +361,6 @@ start_server {tags {"basic"}} { list [r msetnx x1 xxx y2 yyy] [r get x1] [r get y2] } {1 xxx yyy} - test {MSETNX should remove all the volatile keys even on failure} { - r mset x 1 y 2 z 3 - r expire y 10000 - r expire z 10000 - list [r msetnx x A y B z C] [r mget x y z] - } {0 {1 {} {}}} - test {STRLEN against non existing key} { r strlen notakey } {0} diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index b80975b6..5de907ab 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -1,12 +1,13 @@ start_server {tags {"expire"}} { - test {EXPIRE - don't set timeouts multiple times} { + test {EXPIRE - set timeouts multiple times} { r set x foobar set v1 [r expire x 5] set v2 [r ttl x] set v3 [r expire x 10] set v4 [r ttl x] + r expire x 4 list $v1 $v2 $v3 $v4 - } {1 5 0 5} + } {1 5 1 10} test {EXPIRE - It should be still possible to read 'x'} { r get x @@ -19,13 +20,13 @@ start_server {tags {"expire"}} { } {{} 0} } - test {EXPIRE - Delete on write policy} { + test {EXPIRE - write on expire should work} { r del x r lpush x foo r expire x 1000 r lpush x bar r lrange x 0 -1 - } {bar} + } {bar foo} test {EXPIREAT - Check for EXPIRE alike behavior} { r del x From 6146329f1f3381e8daef47463a6588b161f10596 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 3 Aug 2010 13:38:39 +0200 Subject: [PATCH 05/18] replication test with expires --- tests/integration/replication.tcl | 18 ++++++++++++++++++ tests/support/util.tcl | 9 ++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 4b258825..6ca5a6dd 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -23,6 +23,24 @@ start_server {tags {"repl"}} { } assert_equal [r debug digest] [r -1 debug digest] } + + test {MASTER and SLAVE consistency with expire} { + createComplexDataset r 50000 useexpire + after 4000 ;# Make sure everything expired before taking the digest + if {[r debug digest] ne [r -1 debug digest]} { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + puts "Master - Slave inconsistency" + puts "Run diff -u against /tmp/repldump*.txt for more info" + } + assert_equal [r debug digest] [r -1 debug digest] + } } } diff --git a/tests/support/util.tcl b/tests/support/util.tcl index b9c89aa8..95153111 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -140,12 +140,19 @@ proc findKeyWithType {r type} { return {} } -proc createComplexDataset {r ops} { +proc createComplexDataset {r ops {opt {}}} { for {set j 0} {$j < $ops} {incr j} { set k [randomKey] set k2 [randomKey] set f [randomValue] set v [randomValue] + + if {[lsearch -exact $opt useexpire] != -1} { + if {rand() < 0.1} { + {*}$r expire [randomKey] [randomInt 2] + } + } + randpath { set d [expr {rand()}] } { From a539d29ac559ffb80bfe6b3f045eddbd772fa1ba Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 3 Aug 2010 14:19:20 +0200 Subject: [PATCH 06/18] PERSIST command implemented --- src/db.c | 20 +++++++++++++++----- src/redis.c | 1 + src/redis.h | 1 + 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/db.c b/src/db.c index 5acda3d5..81e41430 100644 --- a/src/db.c +++ b/src/db.c @@ -394,11 +394,7 @@ int removeExpire(redisDb *db, robj *key) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ redisAssert(dictFind(db->dict,key->ptr) != NULL); - if (dictDelete(db->expires,key->ptr) == DICT_OK) { - return 1; - } else { - return 0; - } + return dictDelete(db->expires,key->ptr) == DICT_OK; } void setExpire(redisDb *db, robj *key, time_t when) { @@ -528,3 +524,17 @@ void ttlCommand(redisClient *c) { } addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl)); } + +void persistCommand(redisClient *c) { + dictEntry *de; + + de = dictFind(c->db->dict,c->argv[1]->ptr); + if (de == NULL) { + addReply(c,shared.czero); + } else { + if (removeExpire(c->db,c->argv[1])) + addReply(c,shared.cone); + else + addReply(c,shared.czero); + } +} diff --git a/src/redis.c b/src/redis.c index 27ade8b1..1a581a92 100644 --- a/src/redis.c +++ b/src/redis.c @@ -170,6 +170,7 @@ struct redisCommand readonlyCommandTable[] = { {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, + {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, diff --git a/src/redis.h b/src/redis.h index c211cfb5..781fb209 100644 --- a/src/redis.h +++ b/src/redis.h @@ -838,6 +838,7 @@ void expireCommand(redisClient *c); void expireatCommand(redisClient *c); void getsetCommand(redisClient *c); void ttlCommand(redisClient *c); +void persistCommand(redisClient *c); void slaveofCommand(redisClient *c); void debugCommand(redisClient *c); void msetCommand(redisClient *c); From 1fb4e8def723ac836ba96e5369f22a0bf463578d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 3 Aug 2010 14:25:22 +0200 Subject: [PATCH 07/18] PERSIST: a fix and some basic test --- src/db.c | 6 ++++-- tests/unit/expire.tcl | 11 +++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/db.c b/src/db.c index 81e41430..0dec95b1 100644 --- a/src/db.c +++ b/src/db.c @@ -532,9 +532,11 @@ void persistCommand(redisClient *c) { if (de == NULL) { addReply(c,shared.czero); } else { - if (removeExpire(c->db,c->argv[1])) + if (removeExpire(c->db,c->argv[1])) { addReply(c,shared.cone); - else + server.dirty++; + } else { addReply(c,shared.czero); + } } } diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 5de907ab..6f16ed58 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -60,4 +60,15 @@ start_server {tags {"expire"}} { catch {r setex z -10 foo} e set _ $e } {*invalid expire*} + + test {PERSIST can undo an EXPIRE} { + r set x foo + r expire x 50 + list [r ttl x] [r persist x] [r ttl x] [r get x] + } {50 1 -1 foo} + + test {PERSIST returns 0 against non existing or non volatile keys} { + r set x foo + list [r persist foo] [r persist nokeyatall] + } {0 0} } From cbce5171451eb53f1370aacc30decd74512347ac Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 5 Aug 2010 11:36:39 +0200 Subject: [PATCH 08/18] redis cli argument splitting is general and is now moved into the sds.c lib --- src/redis-cli.c | 70 ++------------------------------------------ src/sds.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sds.h | 1 + 3 files changed, 81 insertions(+), 67 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index dac82862..b4a10890 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -366,79 +366,15 @@ static char **convertToSds(int count, char** args) { return sds; } -static char **splitArguments(char *line, int *argc) { - char *p = line; - char *current = NULL; - char **vector = NULL; - - *argc = 0; - while(1) { - /* skip blanks */ - while(*p && isspace(*p)) p++; - if (*p) { - /* get a token */ - int inq=0; /* set to 1 if we are in "quotes" */ - int done = 0; - - if (current == NULL) current = sdsempty(); - while(!done) { - if (inq) { - if (*p == '\\' && *(p+1)) { - char c; - - p++; - switch(*p) { - case 'n': c = '\n'; break; - case 'r': c = '\r'; break; - case 't': c = '\t'; break; - case 'b': c = '\b'; break; - case 'a': c = '\a'; break; - default: c = *p; break; - } - current = sdscatlen(current,&c,1); - } else if (*p == '"') { - done = 1; - } else { - current = sdscatlen(current,p,1); - } - } else { - switch(*p) { - case ' ': - case '\n': - case '\r': - case '\t': - case '\0': - done=1; - break; - case '"': - inq=1; - break; - default: - current = sdscatlen(current,p,1); - break; - } - } - if (*p) p++; - } - /* add the token to the vector */ - vector = zrealloc(vector,((*argc)+1)*sizeof(char*)); - vector[*argc] = current; - (*argc)++; - current = NULL; - } else { - return vector; - } - } -} - #define LINE_BUFLEN 4096 static void repl() { int argc, j; - char *line, **argv; + char *line; + sds *argv; while((line = linenoise("redis> ")) != NULL) { if (line[0] != '\0') { - argv = splitArguments(line,&argc); + argv = sdssplitargs(line,&argc); linenoiseHistoryAdd(line); if (config.historyfile) linenoiseHistorySave(config.historyfile); if (argc > 0) { diff --git a/src/sds.c b/src/sds.c index 5e67f044..4878f8a6 100644 --- a/src/sds.c +++ b/src/sds.c @@ -382,3 +382,80 @@ sds sdscatrepr(sds s, char *p, size_t len) { } return sdscatlen(s,"\"",1); } + +/* Split a line into arguments, where every argument can be in the + * following programming-language REPL-alike form: + * + * foo bar "newline are supported\n" and "\xff\x00otherstuff" + * + * The number of arguments is stored into *argc, and an array + * of sds is returned. The caller should sdsfree() all the returned + * strings and finally zfree() the array itself. + * + * Note that sdscatrepr() is able to convert back a string into + * a quoted string in the same format sdssplitargs() is able to parse. + */ +sds *sdssplitargs(char *line, int *argc) { + char *p = line; + char *current = NULL; + char **vector = NULL; + + *argc = 0; + while(1) { + /* skip blanks */ + while(*p && isspace(*p)) p++; + if (*p) { + /* get a token */ + int inq=0; /* set to 1 if we are in "quotes" */ + int done = 0; + + if (current == NULL) current = sdsempty(); + while(!done) { + if (inq) { + if (*p == '\\' && *(p+1)) { + char c; + + p++; + switch(*p) { + case 'n': c = '\n'; break; + case 'r': c = '\r'; break; + case 't': c = '\t'; break; + case 'b': c = '\b'; break; + case 'a': c = '\a'; break; + default: c = *p; break; + } + current = sdscatlen(current,&c,1); + } else if (*p == '"') { + done = 1; + } else { + current = sdscatlen(current,p,1); + } + } else { + switch(*p) { + case ' ': + case '\n': + case '\r': + case '\t': + case '\0': + done=1; + break; + case '"': + inq=1; + break; + default: + current = sdscatlen(current,p,1); + break; + } + } + if (*p) p++; + } + /* add the token to the vector */ + vector = zrealloc(vector,((*argc)+1)*sizeof(char*)); + vector[*argc] = current; + (*argc)++; + current = NULL; + } else { + return vector; + } + } +} diff --git a/src/sds.h b/src/sds.h index ef3a418f..a0e224f5 100644 --- a/src/sds.h +++ b/src/sds.h @@ -70,5 +70,6 @@ void sdstolower(sds s); void sdstoupper(sds s); sds sdsfromlonglong(long long value); sds sdscatrepr(sds s, char *p, size_t len); +sds *sdssplitargs(char *line, int *argc); #endif From c91abdcd077f868a59290bc9d68fba3130a3121d Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Aug 2010 17:06:38 +0200 Subject: [PATCH 09/18] Fixed overflow detection in argument to long convertion function in general, and in expire/ttl pairs specifically, addressing issue 54 --- src/db.c | 7 +++---- src/object.c | 4 +++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/db.c b/src/db.c index 0dec95b1..6d287d72 100644 --- a/src/db.c +++ b/src/db.c @@ -514,15 +514,14 @@ void expireatCommand(redisClient *c) { } void ttlCommand(redisClient *c) { - time_t expire; - int ttl = -1; + time_t expire, ttl = -1; expire = getExpire(c->db,c->argv[1]); if (expire != -1) { - ttl = (int) (expire-time(NULL)); + ttl = (expire-time(NULL)); if (ttl < 0) ttl = -1; } - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl)); + addReplyLongLong(c,(long long)ttl); } void persistCommand(redisClient *c) { diff --git a/src/object.c b/src/object.c index 21268340..429ac0ec 100644 --- a/src/object.c +++ b/src/object.c @@ -358,6 +358,8 @@ int getLongLongFromObject(robj *o, long long *target) { if (o->encoding == REDIS_ENCODING_RAW) { value = strtoll(o->ptr, &eptr, 10); if (eptr[0] != '\0') return REDIS_ERR; + if (errno == ERANGE && (value == LLONG_MIN || value == LLONG_MAX)) + return REDIS_ERR; } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { @@ -375,7 +377,7 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con if (msg != NULL) { addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); } else { - addReplySds(c, sdsnew("-ERR value is not an integer\r\n")); + addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n")); } return REDIS_ERR; } From e19387302522a81d987bedef98d8961dd7ff06a9 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 10:10:01 +0200 Subject: [PATCH 10/18] changed the comments on top of redis-copy.rb to reflect what the program really does --- utils/redis-copy.rb | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/utils/redis-copy.rb b/utils/redis-copy.rb index af214b79..d892e377 100644 --- a/utils/redis-copy.rb +++ b/utils/redis-copy.rb @@ -1,12 +1,10 @@ -# redis-sha1.rb - Copyright (C) 2009 Salvatore Sanfilippo +# redis-copy.rb - Copyright (C) 2009-2010 Salvatore Sanfilippo # BSD license, See the COPYING file for more information. # -# Performs the SHA1 sum of the whole datset. -# This is useful to spot bugs in persistence related code and to make sure -# Slaves and Masters are in SYNC. +# Copy the whole dataset from one Redis instance to another one # -# If you hack this code make sure to sort keys and set elements as this are -# unsorted elements. Otherwise the sum may differ with equal dataset. +# WARNING: currently hashes and sorted sets are not supported! This +# program should be updated. require 'rubygems' require 'redis' From a679185aa515e2f52d8a0f66c3972eb8f43d7fae Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 11:45:05 +0200 Subject: [PATCH 11/18] sanity check for the bulk argument in protocol parsing code, fixing issue 146 --- src/redis.c | 17 +++++++++++++---- src/redis.h | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/redis.c b/src/redis.c index 1a581a92..eade7868 100644 --- a/src/redis.c +++ b/src/redis.c @@ -912,9 +912,14 @@ int processCommand(redisClient *c) { resetClient(c); return 1; } else { - int bulklen = atoi(((char*)c->argv[0]->ptr)+1); + char *eptr; + long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10); + int perr = eptr[0] != '\0'; + decrRefCount(c->argv[0]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); resetClient(c); @@ -984,10 +989,14 @@ int processCommand(redisClient *c) { return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { /* This is a bulk command, we have to read the last argument yet. */ - int bulklen = atoi(c->argv[c->argc-1]->ptr); + char *eptr; + long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10); + int perr = eptr[0] != '\0'; decrRefCount(c->argv[c->argc-1]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); resetClient(c); diff --git a/src/redis.h b/src/redis.h index 781fb209..c35fe53a 100644 --- a/src/redis.h +++ b/src/redis.h @@ -283,7 +283,7 @@ typedef struct redisClient { sds querybuf; robj **argv, **mbargv; int argc, mbargc; - int bulklen; /* bulk read len. -1 if not in bulk read mode */ + long bulklen; /* bulk read len. -1 if not in bulk read mode */ int multibulk; /* multi bulk command format active */ list *reply; int sentlen; From 01daeecee7a93b92e10347fc2613b8ee22de751e Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 11:49:05 +0200 Subject: [PATCH 12/18] added tests for invalid bulk argument --- tests/unit/protocol.tcl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/unit/protocol.tcl b/tests/unit/protocol.tcl index 8717cd9f..9eebf77f 100644 --- a/tests/unit/protocol.tcl +++ b/tests/unit/protocol.tcl @@ -27,6 +27,13 @@ start_server {} { gets $fd } {*invalid bulk*count*} + test {bulk payload is not a number} { + set fd [r channel] + puts -nonewline $fd "SET x blabla\r\n" + flush $fd + gets $fd + } {*invalid bulk*count*} + test {Multi bulk request not followed by bulk args} { set fd [r channel] puts -nonewline $fd "*1\r\nfoo\r\n" From e452436a07224022df17c59d6dbfbd47dcfc7fd6 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 12:10:59 +0200 Subject: [PATCH 13/18] BLPOPping clients are no longer subject to connection timeouts, fixing issues 155 --- src/networking.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.c b/src/networking.c index e5a66984..10b9580e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -466,6 +466,7 @@ void closeTimedoutClients(void) { if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ + !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ listLength(c->pubsub_patterns) == 0 && (now - c->lastinteraction > server.maxidletime)) From 778b2210a939083070abaea4b7fc62ebf2ad9bfb Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 16:04:13 +0200 Subject: [PATCH 14/18] slave with attached slaves now close the conection to all the slaves when the connection to the master is lost. Now a slave without a connected link to the master will refuse SYNC from other slaves. Enhanced the replication error reporting. All this will fix Issue 156 --- src/networking.c | 14 +++++++++++++- src/replication.c | 14 +++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 10b9580e..a39be7c4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -255,7 +255,8 @@ void freeClient(redisClient *c) { server.vm_blocked_clients--; } listRelease(c->io_keys); - /* Master/slave cleanup */ + /* Master/slave cleanup. + * Case 1: we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); @@ -264,9 +265,20 @@ void freeClient(redisClient *c) { redisAssert(ln != NULL); listDelNode(l,ln); } + + /* Case 2: we lost the connection with the master. */ if (c->flags & REDIS_MASTER) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; + /* Since we lost the connection with the master, we should also + * close the connection with all our slaves if we have any, so + * when we'll resync with the master the other slaves will sync again + * with us as well. Note that also when the slave is not connected + * to the master it will keep refusing connections by other slaves. */ + while (listLength(server.slaves)) { + ln = listFirst(server.slaves); + freeClient((redisClient*)ln->value); + } } /* Release memory */ zfree(c->argv); diff --git a/src/replication.c b/src/replication.c index 5387db91..89375820 100644 --- a/src/replication.c +++ b/src/replication.c @@ -176,6 +176,13 @@ void syncCommand(redisClient *c) { /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; + /* Refuse SYNC requests if we are a slave but the link with our master + * is not ok... */ + if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { + addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n")); + return; + } + /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current @@ -392,7 +399,12 @@ int syncWithMaster(void) { strerror(errno)); return REDIS_ERR; } - if (buf[0] != '$') { + if (buf[0] == '-') { + close(fd); + redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s", + buf+1); + return REDIS_ERR; + } else if (buf[0] != '$') { close(fd); redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); return REDIS_ERR; From b91d605a35c294573f0213c89c421d09b538c2b6 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 16:25:00 +0200 Subject: [PATCH 15/18] slave now detect lost connection during SYNC, fixing Issue 173 --- src/replication.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index 89375820..363ce54a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -428,9 +428,9 @@ int syncWithMaster(void) { int nread, nwritten; nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024); - if (nread == -1) { + if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", - strerror(errno)); + (nread == -1) ? strerror(errno) : "connection lost"); close(fd); close(dfd); return REDIS_ERR; From 695fe87456ac4e5ed14e4a853b9cce61fb3e5975 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 17:09:25 +0200 Subject: [PATCH 16/18] The pid file is now created only after the server is correctly initialied. It is also removed on sigterm and when the stack trace is produced after a sigbus or a sigsegv. This two changes should fix the Issue 175 --- src/redis.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/redis.c b/src/redis.c index eade7868..0ee7a20b 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1085,11 +1085,7 @@ int prepareForShutdown() { if (server.vm_enabled) unlink(server.vm_swap_file); } else { /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - } else { + if (rdbSave(server.dbfilename) != REDIS_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background @@ -1099,6 +1095,7 @@ int prepareForShutdown() { return REDIS_ERR; } } + if (server.daemonize) unlink(server.pidfile); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); return REDIS_OK; } @@ -1371,9 +1368,17 @@ void linuxOvercommitMemoryWarning(void) { } #endif /* __linux__ */ +void createPidFile(void) { + /* Try to write the pid file in a best-effort way. */ + FILE *fp = fopen(server.pidfile,"w"); + if (fp) { + fprintf(fp,"%d\n",getpid()); + fclose(fp); + } +} + void daemonize(void) { int fd; - FILE *fp; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ @@ -1387,12 +1392,6 @@ void daemonize(void) { dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } - /* Try to write the pid file */ - fp = fopen(server.pidfile,"w"); - if (fp) { - fprintf(fp,"%d\n",getpid()); - fclose(fp); - } } void version() { @@ -1425,6 +1424,7 @@ int main(int argc, char **argv) { } if (server.daemonize) daemonize(); initServer(); + if (server.daemonize) createPidFile(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ linuxOvercommitMemoryWarning(); @@ -1501,6 +1501,7 @@ void segvHandler(int sig, siginfo_t *info, void *secret) { redisLog(REDIS_WARNING,"%s", messages[i]); /* free(messages); Don't call free() with possibly corrupted memory. */ + if (server.daemonize) unlink(server.pidfile); _exit(0); } From b37ca6edb10faa0ebcf54a7d23cee31d895fe5b1 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 18:08:09 +0200 Subject: [PATCH 17/18] Issue 179 fixed, now redis-cli is able to parse correctly multi bulk replies with elements that are errors --- src/redis-cli.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index b4a10890..007ebcde 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -169,6 +169,7 @@ static int cliReadBulkReply(int fd) { static int cliReadMultiBulkReply(int fd) { sds replylen = cliReadLine(fd); int elements, c = 1; + int retval = 0; if (replylen == NULL) return 1; elements = atoi(replylen); @@ -182,10 +183,10 @@ static int cliReadMultiBulkReply(int fd) { } while(elements--) { printf("%d. ", c); - if (cliReadReply(fd)) return 1; + if (cliReadReply(fd)) retval = 1; c++; } - return 0; + return retval; } static int cliReadReply(int fd) { From c0b3d42372dbe67c6ef096372869e2b60d4a1cdc Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Aug 2010 18:39:34 +0200 Subject: [PATCH 18/18] redis-cli now supports automatically reconnection in interactive mode --- src/redis-cli.c | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 007ebcde..a2a909ba 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -36,6 +36,7 @@ #include #include #include +#include #include "anet.h" #include "sds.h" @@ -67,11 +68,14 @@ static struct config { static int cliReadReply(int fd); static void usage(); -static int cliConnect(void) { +/* Connect to the client. If force is not zero the connection is performed + * even if there is already a connected socket. */ +static int cliConnect(int force) { char err[ANET_ERR_LEN]; static int fd = ANET_ERR; - if (fd == ANET_ERR) { + if (fd == ANET_ERR || force) { + if (force) close(fd); fd = anetTcpConnect(err,config.hostip,config.hostport); if (fd == ANET_ERR) { fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err); @@ -191,10 +195,18 @@ static int cliReadMultiBulkReply(int fd) { static int cliReadReply(int fd) { char type; + int nread; - if (anetRead(fd,&type,1) <= 0) { + if ((nread = anetRead(fd,&type,1)) <= 0) { if (config.shutdown) return 0; - exit(1); + if (config.interactive && + (nread == 0 || (nread == -1 && errno == ECONNRESET))) + { + return ECONNRESET; + } else { + printf("I/O error while reading from socket: %s",strerror(errno)); + exit(1); + } } switch(type) { case '-': @@ -246,7 +258,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) { if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; if (!strcasecmp(command,"subscribe") || !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1; - if ((fd = cliConnect()) == -1) return 1; + if ((fd = cliConnect(0)) == -1) return 1; /* Select db number */ retval = selectDb(fd); @@ -381,9 +393,21 @@ static void repl() { if (argc > 0) { if (strcasecmp(argv[0],"quit") == 0 || strcasecmp(argv[0],"exit") == 0) - exit(0); - else - cliSendCommand(argc, argv, 1); + { + exit(0); + } else { + int err; + + if ((err = cliSendCommand(argc, argv, 1)) != 0) { + if (err == ECONNRESET) { + printf("Reconnecting... "); + fflush(stdout); + if (cliConnect(1) == -1) exit(1); + printf("OK\n"); + cliSendCommand(argc,argv,1); + } + } + } } /* Free the argument vector */ for (j = 0; j < argc; j++) @@ -431,7 +455,8 @@ int main(int argc, char **argv) { cliSendCommand(2, convertToSds(2, authargv), 1); } - if (argc == 0 || config.interactive == 1) repl(); + if (argc == 0) config.interactive = 1; + if (config.interactive) repl(); argvcopy = convertToSds(argc+1, argv); if (config.argn_from_stdin) {