Merge master and move argument splitting patch to sds.c

This commit is contained in:
Pieter Noordhuis 2010-08-25 09:54:02 +02:00
commit 4b93e5e267
16 changed files with 361 additions and 206 deletions

View File

@ -45,7 +45,7 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
} }
robj *lookupKeyWrite(redisDb *db, robj *key) { robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key); expireIfNeeded(db,key);
return lookupKey(db,key); return lookupKey(db,key);
} }
@ -321,7 +321,6 @@ void renameGenericCommand(redisClient *c, int nx) {
return; return;
incrRefCount(o); incrRefCount(o);
deleteIfVolatile(c->db,c->argv[2]);
if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
if (nx) { if (nx) {
decrRefCount(o); decrRefCount(o);
@ -375,7 +374,6 @@ void moveCommand(redisClient *c) {
} }
/* Try to add the element to the target DB */ /* Try to add the element to the target DB */
deleteIfVolatile(dst,c->argv[1]);
if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
addReply(c,shared.czero); addReply(c,shared.czero);
return; return;
@ -396,23 +394,16 @@ int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the /* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */ * main dict. Otherwise, the key will never be freed. */
redisAssert(dictFind(db->dict,key->ptr) != NULL); redisAssert(dictFind(db->dict,key->ptr) != NULL);
if (dictDelete(db->expires,key->ptr) == DICT_OK) { return dictDelete(db->expires,key->ptr) == DICT_OK;
return 1;
} else {
return 0;
}
} }
int setExpire(redisDb *db, robj *key, time_t when) { void setExpire(redisDb *db, robj *key, time_t when) {
dictEntry *de; dictEntry *de;
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
redisAssert((de = dictFind(db->dict,key->ptr)) != NULL); de = dictFind(db->dict,key->ptr);
if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) { redisAssert(de != NULL);
return 0; dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
} else {
return 1;
}
} }
/* Return the expire time of the specified key, or -1 if no expire /* Return the expire time of the specified key, or -1 if no expire
@ -430,8 +421,46 @@ time_t getExpire(redisDb *db, robj *key) {
return (time_t) dictGetEntryVal(de); return (time_t) dictGetEntryVal(de);
} }
/* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
void propagateExpire(redisDb *db, robj *key) {
struct redisCommand *cmd;
robj *argv[2];
cmd = lookupCommand("del");
argv[0] = createStringObject("DEL",3);
argv[1] = key;
incrRefCount(key);
if (server.appendonly)
feedAppendOnlyFile(cmd,db->id,argv,2);
if (listLength(server.slaves))
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
int expireIfNeeded(redisDb *db, robj *key) { int expireIfNeeded(redisDb *db, robj *key) {
time_t when = getExpire(db,key); time_t when = getExpire(db,key);
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) {
return time(NULL) > when;
}
if (when < 0) return 0; if (when < 0) return 0;
/* Return when this key has not expired */ /* Return when this key has not expired */
@ -440,15 +469,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
/* Delete the key */ /* Delete the key */
server.stat_expiredkeys++; server.stat_expiredkeys++;
server.dirty++; server.dirty++;
return dbDelete(db,key); propagateExpire(db,key);
}
int deleteIfVolatile(redisDb *db, robj *key) {
if (getExpire(db,key) < 0) return 0;
/* Delete the key */
server.stat_expiredkeys++;
server.dirty++;
return dbDelete(db,key); return dbDelete(db,key);
} }
@ -476,13 +497,10 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
return; return;
} else { } else {
time_t when = time(NULL)+seconds; time_t when = time(NULL)+seconds;
if (setExpire(c->db,key,when)) { setExpire(c->db,key,when);
addReply(c,shared.cone); addReply(c,shared.cone);
touchWatchedKey(c->db,key); touchWatchedKey(c->db,key);
server.dirty++; server.dirty++;
} else {
addReply(c,shared.czero);
}
return; return;
} }
} }
@ -496,13 +514,28 @@ void expireatCommand(redisClient *c) {
} }
void ttlCommand(redisClient *c) { void ttlCommand(redisClient *c) {
time_t expire; time_t expire, ttl = -1;
int ttl = -1;
expire = getExpire(c->db,c->argv[1]); expire = getExpire(c->db,c->argv[1]);
if (expire != -1) { if (expire != -1) {
ttl = (int) (expire-time(NULL)); ttl = (expire-time(NULL));
if (ttl < 0) ttl = -1; if (ttl < 0) ttl = -1;
} }
addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl)); addReplyLongLong(c,(long long)ttl);
}
void persistCommand(redisClient *c) {
dictEntry *de;
de = dictFind(c->db->dict,c->argv[1]->ptr);
if (de == NULL) {
addReply(c,shared.czero);
} else {
if (removeExpire(c->db,c->argv[1])) {
addReply(c,shared.cone);
server.dirty++;
} else {
addReply(c,shared.czero);
}
}
} }

View File

@ -255,7 +255,8 @@ void freeClient(redisClient *c) {
server.vm_blocked_clients--; server.vm_blocked_clients--;
} }
listRelease(c->io_keys); listRelease(c->io_keys);
/* Master/slave cleanup */ /* Master/slave cleanup.
* Case 1: we lost the connection with a slave. */
if (c->flags & REDIS_SLAVE) { if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd); close(c->repldbfd);
@ -264,9 +265,20 @@ void freeClient(redisClient *c) {
redisAssert(ln != NULL); redisAssert(ln != NULL);
listDelNode(l,ln); listDelNode(l,ln);
} }
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) { if (c->flags & REDIS_MASTER) {
server.master = NULL; server.master = NULL;
server.replstate = REDIS_REPL_CONNECT; server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
* when we'll resync with the master the other slaves will sync again
* with us as well. Note that also when the slave is not connected
* to the master it will keep refusing connections by other slaves. */
while (listLength(server.slaves)) {
ln = listFirst(server.slaves);
freeClient((redisClient*)ln->value);
}
} }
/* Release memory */ /* Release memory */
zfree(c->argv); zfree(c->argv);
@ -466,6 +478,7 @@ void closeTimedoutClients(void) {
if (server.maxidletime && if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */
!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
listLength(c->pubsub_patterns) == 0 && listLength(c->pubsub_patterns) == 0 &&
(now - c->lastinteraction > server.maxidletime)) (now - c->lastinteraction > server.maxidletime))

View File

@ -358,6 +358,8 @@ int getLongLongFromObject(robj *o, long long *target) {
if (o->encoding == REDIS_ENCODING_RAW) { if (o->encoding == REDIS_ENCODING_RAW) {
value = strtoll(o->ptr, &eptr, 10); value = strtoll(o->ptr, &eptr, 10);
if (eptr[0] != '\0') return REDIS_ERR; if (eptr[0] != '\0') return REDIS_ERR;
if (errno == ERANGE && (value == LLONG_MIN || value == LLONG_MAX))
return REDIS_ERR;
} else if (o->encoding == REDIS_ENCODING_INT) { } else if (o->encoding == REDIS_ENCODING_INT) {
value = (long)o->ptr; value = (long)o->ptr;
} else { } else {
@ -375,7 +377,7 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con
if (msg != NULL) { if (msg != NULL) {
addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
} else { } else {
addReplySds(c, sdsnew("-ERR value is not an integer\r\n")); addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n"));
} }
return REDIS_ERR; return REDIS_ERR;
} }

View File

@ -36,6 +36,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <ctype.h> #include <ctype.h>
#include <errno.h>
#include "anet.h" #include "anet.h"
#include "sds.h" #include "sds.h"
@ -68,11 +69,14 @@ static struct config {
static int cliReadReply(int fd); static int cliReadReply(int fd);
static void usage(); static void usage();
static int cliConnect(void) { /* Connect to the client. If force is not zero the connection is performed
* even if there is already a connected socket. */
static int cliConnect(int force) {
char err[ANET_ERR_LEN]; char err[ANET_ERR_LEN];
static int fd = ANET_ERR; static int fd = ANET_ERR;
if (fd == ANET_ERR) { if (fd == ANET_ERR || force) {
if (force) close(fd);
fd = anetTcpConnect(err,config.hostip,config.hostport); fd = anetTcpConnect(err,config.hostip,config.hostport);
if (fd == ANET_ERR) { if (fd == ANET_ERR) {
fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err); fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err);
@ -170,6 +174,7 @@ static int cliReadBulkReply(int fd) {
static int cliReadMultiBulkReply(int fd) { static int cliReadMultiBulkReply(int fd) {
sds replylen = cliReadLine(fd); sds replylen = cliReadLine(fd);
int elements, c = 1; int elements, c = 1;
int retval = 0;
if (replylen == NULL) return 1; if (replylen == NULL) return 1;
elements = atoi(replylen); elements = atoi(replylen);
@ -183,20 +188,28 @@ static int cliReadMultiBulkReply(int fd) {
} }
while(elements--) { while(elements--) {
if (config.tty) printf("%d. ", c); if (config.tty) printf("%d. ", c);
if (cliReadReply(fd)) return 1; if (cliReadReply(fd)) retval = 1;
if (elements) printf("%c",config.mb_sep); if (elements) printf("%c",config.mb_sep);
c++; c++;
} }
return 0; return retval;
} }
static int cliReadReply(int fd) { static int cliReadReply(int fd) {
char type; char type;
int nread;
if (anetRead(fd,&type,1) <= 0) { if ((nread = anetRead(fd,&type,1)) <= 0) {
if (config.shutdown) return 0; if (config.shutdown) return 0;
if (config.interactive &&
(nread == 0 || (nread == -1 && errno == ECONNRESET)))
{
return ECONNRESET;
} else {
printf("I/O error while reading from socket: %s",strerror(errno));
exit(1); exit(1);
} }
}
switch(type) { switch(type) {
case '-': case '-':
if (config.tty) printf("(error) "); if (config.tty) printf("(error) ");
@ -247,7 +260,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
if (!strcasecmp(command,"subscribe") || if (!strcasecmp(command,"subscribe") ||
!strcasecmp(command,"psubscribe")) config.pubsub_mode = 1; !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
if ((fd = cliConnect()) == -1) return 1; if ((fd = cliConnect(0)) == -1) return 1;
/* Select db number */ /* Select db number */
retval = selectDb(fd); retval = selectDb(fd);
@ -374,91 +387,15 @@ static char **convertToSds(int count, char** args) {
return sds; return sds;
} }
static char **splitArguments(char *line, int *argc) {
char *p = line;
char *current = NULL;
char **vector = NULL;
*argc = 0;
while(1) {
/* skip blanks */
while(*p && isspace(*p)) p++;
if (*p) {
/* get a token */
int inq=0; /* set to 1 if we are in "quotes" */
int done=0;
if (current == NULL) current = sdsempty();
while(!done) {
if (inq) {
if (*p == '\\' && *(p+1)) {
char c;
p++;
switch(*p) {
case 'n': c = '\n'; break;
case 'r': c = '\r'; break;
case 't': c = '\t'; break;
case 'b': c = '\b'; break;
case 'a': c = '\a'; break;
default: c = *p; break;
}
current = sdscatlen(current,&c,1);
} else if (*p == '"') {
/* closing quote must be followed by a space */
if (*(p+1) && !isspace(*(p+1))) goto err;
done=1;
} else if (!*p) {
/* unterminated quotes */
goto err;
} else {
current = sdscatlen(current,p,1);
}
} else {
switch(*p) {
case ' ':
case '\n':
case '\r':
case '\t':
case '\0':
done=1;
break;
case '"':
inq=1;
break;
default:
current = sdscatlen(current,p,1);
break;
}
}
if (*p) p++;
}
/* add the token to the vector */
vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
vector[*argc] = current;
(*argc)++;
current = NULL;
} else {
return vector;
}
}
err:
while(*argc--)
sdsfree(vector[*argc]);
zfree(vector);
if (current) sdsfree(current);
return NULL;
}
#define LINE_BUFLEN 4096 #define LINE_BUFLEN 4096
static void repl() { static void repl() {
int argc, j; int argc, j;
char *line, **argv; char *line;
sds *argv;
while((line = linenoise("redis> ")) != NULL) { while((line = linenoise("redis> ")) != NULL) {
if (line[0] != '\0') { if (line[0] != '\0') {
argv = splitArguments(line,&argc); argv = sdssplitargs(line,&argc);
linenoiseHistoryAdd(line); linenoiseHistoryAdd(line);
if (config.historyfile) linenoiseHistorySave(config.historyfile); if (config.historyfile) linenoiseHistorySave(config.historyfile);
if (argv == NULL) { if (argv == NULL) {
@ -467,10 +404,22 @@ static void repl() {
} else if (argc > 0) { } else if (argc > 0) {
if (strcasecmp(argv[0],"quit") == 0 || if (strcasecmp(argv[0],"quit") == 0 ||
strcasecmp(argv[0],"exit") == 0) strcasecmp(argv[0],"exit") == 0)
{
exit(0); exit(0);
else } else {
int err;
if ((err = cliSendCommand(argc, argv, 1)) != 0) {
if (err == ECONNRESET) {
printf("Reconnecting... ");
fflush(stdout);
if (cliConnect(1) == -1) exit(1);
printf("OK\n");
cliSendCommand(argc,argv,1); cliSendCommand(argc,argv,1);
} }
}
}
}
/* Free the argument vector */ /* Free the argument vector */
for (j = 0; j < argc; j++) for (j = 0; j < argc; j++)
sdsfree(argv[j]); sdsfree(argv[j]);

View File

@ -170,6 +170,7 @@ struct redisCommand readonlyCommandTable[] = {
{"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
@ -435,6 +436,48 @@ void updateDictResizePolicy(void) {
/* ======================= Cron: called every 100 ms ======================== */ /* ======================= Cron: called every 100 ms ======================== */
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace. */
void activeExpireCycle(void) {
int j;
for (j = 0; j < server.dbnum; j++) {
int expired;
redisDb *db = server.db+j;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
long num = dictSize(db->expires);
time_t now = time(NULL);
expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
dictEntry *de;
time_t t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = (time_t) dictGetEntryVal(de);
if (now > t) {
sds key = dictGetEntryKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
}
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++; int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(eventLoop);
@ -533,41 +576,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
} }
} }
/* Try to expire a few timed out keys. The algorithm used is adaptive and /* Expire a few keys per cycle, only if this is a master.
* will use few CPU cycles if there are few expiring keys, otherwise * On slaves we wait for DEL operations synthesized by the master
* it will get more aggressive to avoid that too much memory is used by * in order to guarantee a strict consistency. */
* keys that can be removed from the keyspace. */ if (server.masterhost == NULL) activeExpireCycle();
for (j = 0; j < server.dbnum; j++) {
int expired;
redisDb *db = server.db+j;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
long num = dictSize(db->expires);
time_t now = time(NULL);
expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
dictEntry *de;
time_t t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = (time_t) dictGetEntryVal(de);
if (now > t) {
sds key = dictGetEntryKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
dbDelete(db,keyobj);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
}
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
/* Swap a few keys on disk if we are over the memory limit and VM /* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */ * is enbled. Try to free objects from the free list first. */
@ -900,9 +912,14 @@ int processCommand(redisClient *c) {
resetClient(c); resetClient(c);
return 1; return 1;
} else { } else {
int bulklen = atoi(((char*)c->argv[0]->ptr)+1); char *eptr;
long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
int perr = eptr[0] != '\0';
decrRefCount(c->argv[0]); decrRefCount(c->argv[0]);
if (bulklen < 0 || bulklen > 1024*1024*1024) { if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
bulklen < 0 || bulklen > 1024*1024*1024)
{
c->argc--; c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c); resetClient(c);
@ -972,10 +989,14 @@ int processCommand(redisClient *c) {
return 1; return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
/* This is a bulk command, we have to read the last argument yet. */ /* This is a bulk command, we have to read the last argument yet. */
int bulklen = atoi(c->argv[c->argc-1]->ptr); char *eptr;
long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
int perr = eptr[0] != '\0';
decrRefCount(c->argv[c->argc-1]); decrRefCount(c->argv[c->argc-1]);
if (bulklen < 0 || bulklen > 1024*1024*1024) { if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
bulklen < 0 || bulklen > 1024*1024*1024)
{
c->argc--; c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c); resetClient(c);
@ -1064,11 +1085,7 @@ int prepareForShutdown() {
if (server.vm_enabled) unlink(server.vm_swap_file); if (server.vm_enabled) unlink(server.vm_swap_file);
} else { } else {
/* Snapshotting. Perform a SYNC SAVE and exit */ /* Snapshotting. Perform a SYNC SAVE and exit */
if (rdbSave(server.dbfilename) == REDIS_OK) { if (rdbSave(server.dbfilename) != REDIS_OK) {
if (server.daemonize)
unlink(server.pidfile);
redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
} else {
/* Ooops.. error saving! The best we can do is to continue /* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process, * operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background * in the next cron() Redis will be notified that the background
@ -1078,6 +1095,7 @@ int prepareForShutdown() {
return REDIS_ERR; return REDIS_ERR;
} }
} }
if (server.daemonize) unlink(server.pidfile);
redisLog(REDIS_WARNING,"Server exit now, bye bye..."); redisLog(REDIS_WARNING,"Server exit now, bye bye...");
return REDIS_OK; return REDIS_OK;
} }
@ -1350,9 +1368,17 @@ void linuxOvercommitMemoryWarning(void) {
} }
#endif /* __linux__ */ #endif /* __linux__ */
void createPidFile(void) {
/* Try to write the pid file in a best-effort way. */
FILE *fp = fopen(server.pidfile,"w");
if (fp) {
fprintf(fp,"%d\n",getpid());
fclose(fp);
}
}
void daemonize(void) { void daemonize(void) {
int fd; int fd;
FILE *fp;
if (fork() != 0) exit(0); /* parent exits */ if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */ setsid(); /* create a new session */
@ -1366,12 +1392,6 @@ void daemonize(void) {
dup2(fd, STDERR_FILENO); dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd); if (fd > STDERR_FILENO) close(fd);
} }
/* Try to write the pid file */
fp = fopen(server.pidfile,"w");
if (fp) {
fprintf(fp,"%d\n",getpid());
fclose(fp);
}
} }
void version() { void version() {
@ -1404,6 +1424,7 @@ int main(int argc, char **argv) {
} }
if (server.daemonize) daemonize(); if (server.daemonize) daemonize();
initServer(); initServer();
if (server.daemonize) createPidFile();
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__ #ifdef __linux__
linuxOvercommitMemoryWarning(); linuxOvercommitMemoryWarning();
@ -1480,6 +1501,7 @@ void segvHandler(int sig, siginfo_t *info, void *secret) {
redisLog(REDIS_WARNING,"%s", messages[i]); redisLog(REDIS_WARNING,"%s", messages[i]);
/* free(messages); Don't call free() with possibly corrupted memory. */ /* free(messages); Don't call free() with possibly corrupted memory. */
if (server.daemonize) unlink(server.pidfile);
_exit(0); _exit(0);
} }

View File

@ -283,7 +283,7 @@ typedef struct redisClient {
sds querybuf; sds querybuf;
robj **argv, **mbargv; robj **argv, **mbargv;
int argc, mbargc; int argc, mbargc;
int bulklen; /* bulk read len. -1 if not in bulk read mode */ long bulklen; /* bulk read len. -1 if not in bulk read mode */
int multibulk; /* multi bulk command format active */ int multibulk; /* multi bulk command format active */
list *reply; list *reply;
int sentlen; int sentlen;
@ -752,10 +752,10 @@ void resetServerSaveParams();
/* db.c -- Keyspace access API */ /* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key); int removeExpire(redisDb *db, robj *key);
void propagateExpire(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key);
int deleteIfVolatile(redisDb *db, robj *key);
time_t getExpire(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key);
int setExpire(redisDb *db, robj *key, time_t when); void setExpire(redisDb *db, robj *key, time_t when);
robj *lookupKey(redisDb *db, robj *key); robj *lookupKey(redisDb *db, robj *key);
robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key);
@ -838,6 +838,7 @@ void expireCommand(redisClient *c);
void expireatCommand(redisClient *c); void expireatCommand(redisClient *c);
void getsetCommand(redisClient *c); void getsetCommand(redisClient *c);
void ttlCommand(redisClient *c); void ttlCommand(redisClient *c);
void persistCommand(redisClient *c);
void slaveofCommand(redisClient *c); void slaveofCommand(redisClient *c);
void debugCommand(redisClient *c); void debugCommand(redisClient *c);
void msetCommand(redisClient *c); void msetCommand(redisClient *c);

View File

@ -176,6 +176,13 @@ void syncCommand(redisClient *c) {
/* ignore SYNC if aleady slave or in monitor mode */ /* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return; if (c->flags & REDIS_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n"));
return;
}
/* SYNC can't be issued when the server has pending data to send to /* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply * the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current * buffer registering the differences between the BGSAVE and the current
@ -392,7 +399,12 @@ int syncWithMaster(void) {
strerror(errno)); strerror(errno));
return REDIS_ERR; return REDIS_ERR;
} }
if (buf[0] != '$') { if (buf[0] == '-') {
close(fd);
redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s",
buf+1);
return REDIS_ERR;
} else if (buf[0] != '$') {
close(fd); close(fd);
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
return REDIS_ERR; return REDIS_ERR;
@ -416,9 +428,9 @@ int syncWithMaster(void) {
int nread, nwritten; int nread, nwritten;
nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024); nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
if (nread == -1) { if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
strerror(errno)); (nread == -1) ? strerror(errno) : "connection lost");
close(fd); close(fd);
close(dfd); close(dfd);
return REDIS_ERR; return REDIS_ERR;

View File

@ -382,3 +382,92 @@ sds sdscatrepr(sds s, char *p, size_t len) {
} }
return sdscatlen(s,"\"",1); return sdscatlen(s,"\"",1);
} }
/* Split a line into arguments, where every argument can be in the
* following programming-language REPL-alike form:
*
* foo bar "newline are supported\n" and "\xff\x00otherstuff"
*
* The number of arguments is stored into *argc, and an array
* of sds is returned. The caller should sdsfree() all the returned
* strings and finally zfree() the array itself.
*
* Note that sdscatrepr() is able to convert back a string into
* a quoted string in the same format sdssplitargs() is able to parse.
*/
sds *sdssplitargs(char *line, int *argc) {
char *p = line;
char *current = NULL;
char **vector = NULL;
*argc = 0;
while(1) {
/* skip blanks */
while(*p && isspace(*p)) p++;
if (*p) {
/* get a token */
int inq=0; /* set to 1 if we are in "quotes" */
int done=0;
if (current == NULL) current = sdsempty();
while(!done) {
if (inq) {
if (*p == '\\' && *(p+1)) {
char c;
p++;
switch(*p) {
case 'n': c = '\n'; break;
case 'r': c = '\r'; break;
case 't': c = '\t'; break;
case 'b': c = '\b'; break;
case 'a': c = '\a'; break;
default: c = *p; break;
}
current = sdscatlen(current,&c,1);
} else if (*p == '"') {
/* closing quote must be followed by a space */
if (*(p+1) && !isspace(*(p+1))) goto err;
done=1;
} else if (!*p) {
/* unterminated quotes */
goto err;
} else {
current = sdscatlen(current,p,1);
}
} else {
switch(*p) {
case ' ':
case '\n':
case '\r':
case '\t':
case '\0':
done=1;
break;
case '"':
inq=1;
break;
default:
current = sdscatlen(current,p,1);
break;
}
}
if (*p) p++;
}
/* add the token to the vector */
vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
vector[*argc] = current;
(*argc)++;
current = NULL;
} else {
return vector;
}
}
err:
while(*argc--)
sdsfree(vector[*argc]);
zfree(vector);
if (current) sdsfree(current);
return NULL;
}

View File

@ -70,5 +70,6 @@ void sdstolower(sds s);
void sdstoupper(sds s); void sdstoupper(sds s);
sds sdsfromlonglong(long long value); sds sdsfromlonglong(long long value);
sds sdscatrepr(sds s, char *p, size_t len); sds sdscatrepr(sds s, char *p, size_t len);
sds *sdssplitargs(char *line, int *argc);
#endif #endif

View File

@ -17,7 +17,6 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
} }
} }
if (nx) deleteIfVolatile(c->db,key);
retval = dbAdd(c->db,key,val); retval = dbAdd(c->db,key,val);
if (retval == REDIS_ERR) { if (retval == REDIS_ERR) {
if (!nx) { if (!nx) {

View File

@ -23,6 +23,24 @@ start_server {tags {"repl"}} {
} }
assert_equal [r debug digest] [r -1 debug digest] assert_equal [r debug digest] [r -1 debug digest]
} }
test {MASTER and SLAVE consistency with expire} {
createComplexDataset r 50000 useexpire
after 4000 ;# Make sure everything expired before taking the digest
if {[r debug digest] ne [r -1 debug digest]} {
set csv1 [csvdump r]
set csv2 [csvdump {r -1}]
set fd [open /tmp/repldump1.txt w]
puts -nonewline $fd $csv1
close $fd
set fd [open /tmp/repldump2.txt w]
puts -nonewline $fd $csv2
close $fd
puts "Master - Slave inconsistency"
puts "Run diff -u against /tmp/repldump*.txt for more info"
}
assert_equal [r debug digest] [r -1 debug digest]
}
} }
} }

View File

@ -140,12 +140,19 @@ proc findKeyWithType {r type} {
return {} return {}
} }
proc createComplexDataset {r ops} { proc createComplexDataset {r ops {opt {}}} {
for {set j 0} {$j < $ops} {incr j} { for {set j 0} {$j < $ops} {incr j} {
set k [randomKey] set k [randomKey]
set k2 [randomKey] set k2 [randomKey]
set f [randomValue] set f [randomValue]
set v [randomValue] set v [randomValue]
if {[lsearch -exact $opt useexpire] != -1} {
if {rand() < 0.1} {
{*}$r expire [randomKey] [randomInt 2]
}
}
randpath { randpath {
set d [expr {rand()}] set d [expr {rand()}]
} { } {

View File

@ -148,12 +148,11 @@ start_server {tags {"basic"}} {
r get novar2 r get novar2
} {foobared} } {foobared}
test {SETNX will overwrite EXPIREing key} { test {SETNX against volatile key} {
r set x 10 r set x 10
r expire x 10000 r expire x 10000
r setnx x 20 list [r setnx x 20] [r get x]
r get x } {0 10}
} {20}
test {EXISTS} { test {EXISTS} {
set res {} set res {}
@ -362,13 +361,6 @@ start_server {tags {"basic"}} {
list [r msetnx x1 xxx y2 yyy] [r get x1] [r get y2] list [r msetnx x1 xxx y2 yyy] [r get x1] [r get y2]
} {1 xxx yyy} } {1 xxx yyy}
test {MSETNX should remove all the volatile keys even on failure} {
r mset x 1 y 2 z 3
r expire y 10000
r expire z 10000
list [r msetnx x A y B z C] [r mget x y z]
} {0 {1 {} {}}}
test {STRLEN against non existing key} { test {STRLEN against non existing key} {
r strlen notakey r strlen notakey
} {0} } {0}

View File

@ -1,12 +1,13 @@
start_server {tags {"expire"}} { start_server {tags {"expire"}} {
test {EXPIRE - don't set timeouts multiple times} { test {EXPIRE - set timeouts multiple times} {
r set x foobar r set x foobar
set v1 [r expire x 5] set v1 [r expire x 5]
set v2 [r ttl x] set v2 [r ttl x]
set v3 [r expire x 10] set v3 [r expire x 10]
set v4 [r ttl x] set v4 [r ttl x]
r expire x 4
list $v1 $v2 $v3 $v4 list $v1 $v2 $v3 $v4
} {1 5 0 5} } {1 5 1 10}
test {EXPIRE - It should be still possible to read 'x'} { test {EXPIRE - It should be still possible to read 'x'} {
r get x r get x
@ -19,13 +20,13 @@ start_server {tags {"expire"}} {
} {{} 0} } {{} 0}
} }
test {EXPIRE - Delete on write policy} { test {EXPIRE - write on expire should work} {
r del x r del x
r lpush x foo r lpush x foo
r expire x 1000 r expire x 1000
r lpush x bar r lpush x bar
r lrange x 0 -1 r lrange x 0 -1
} {bar} } {bar foo}
test {EXPIREAT - Check for EXPIRE alike behavior} { test {EXPIREAT - Check for EXPIRE alike behavior} {
r del x r del x
@ -59,4 +60,15 @@ start_server {tags {"expire"}} {
catch {r setex z -10 foo} e catch {r setex z -10 foo} e
set _ $e set _ $e
} {*invalid expire*} } {*invalid expire*}
test {PERSIST can undo an EXPIRE} {
r set x foo
r expire x 50
list [r ttl x] [r persist x] [r ttl x] [r get x]
} {50 1 -1 foo}
test {PERSIST returns 0 against non existing or non volatile keys} {
r set x foo
list [r persist foo] [r persist nokeyatall]
} {0 0}
} }

View File

@ -27,6 +27,13 @@ start_server {} {
gets $fd gets $fd
} {*invalid bulk*count*} } {*invalid bulk*count*}
test {bulk payload is not a number} {
set fd [r channel]
puts -nonewline $fd "SET x blabla\r\n"
flush $fd
gets $fd
} {*invalid bulk*count*}
test {Multi bulk request not followed by bulk args} { test {Multi bulk request not followed by bulk args} {
set fd [r channel] set fd [r channel]
puts -nonewline $fd "*1\r\nfoo\r\n" puts -nonewline $fd "*1\r\nfoo\r\n"

View File

@ -1,12 +1,10 @@
# redis-sha1.rb - Copyright (C) 2009 Salvatore Sanfilippo # redis-copy.rb - Copyright (C) 2009-2010 Salvatore Sanfilippo
# BSD license, See the COPYING file for more information. # BSD license, See the COPYING file for more information.
# #
# Performs the SHA1 sum of the whole datset. # Copy the whole dataset from one Redis instance to another one
# This is useful to spot bugs in persistence related code and to make sure
# Slaves and Masters are in SYNC.
# #
# If you hack this code make sure to sort keys and set elements as this are # WARNING: currently hashes and sorted sets are not supported! This
# unsorted elements. Otherwise the sum may differ with equal dataset. # program should be updated.
require 'rubygems' require 'rubygems'
require 'redis' require 'redis'