high resolution expires API modified to use separated commands. AOF transation to PEXPIREAT of all the expire-style commands fixed.

This commit is contained in:
antirez 2011-11-10 17:52:02 +01:00
parent dab5332f95
commit 12d293ca6e
5 changed files with 86 additions and 60 deletions

View File

@ -181,21 +181,38 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
return dst; return dst;
} }
sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { /* Create the sds representation of an PEXPIREAT command, using
int argc = 3; * 'seconds' as time to live and 'cmd' to understand what command
long when; * we are translating into a PEXPIREAT.
*
* This command is used in order to translate EXPIRE and PEXPIRE commands
* into PEXPIREAT command so that we retain precision in the append only
* file, and the time is always absolute and not relative. */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
long long when;
robj *argv[3]; robj *argv[3];
/* Make sure we can use strtol */ /* Make sure we can use strtol */
seconds = getDecodedObject(seconds); seconds = getDecodedObject(seconds);
when = time(NULL)+strtol(seconds->ptr,NULL,10); when = strtoll(seconds->ptr,NULL,10);
/* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
cmd->proc == expireatCommand)
{
when *= 1000;
}
/* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
when += mstime();
}
decrRefCount(seconds); decrRefCount(seconds);
argv[0] = createStringObject("EXPIREAT",8); argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key; argv[1] = key;
argv[2] = createObject(REDIS_STRING, argv[2] = createStringObjectFromLongLong(when);
sdscatprintf(sdsempty(),"%ld",when)); buf = catAppendOnlyGenericCommand(buf, 3, argv);
buf = catAppendOnlyGenericCommand(buf, argc, argv);
decrRefCount(argv[0]); decrRefCount(argv[0]);
decrRefCount(argv[2]); decrRefCount(argv[2]);
return buf; return buf;
@ -216,18 +233,22 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
server.appendseldb = dictid; server.appendseldb = dictid;
} }
if (cmd->proc == expireCommand) { if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
/* Translate EXPIRE into EXPIREAT */ cmd->proc == expireatCommand) {
buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
} else if (cmd->proc == setexCommand) { buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
/* Translate SETEX to SET and EXPIREAT */ } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3); tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1]; tmpargv[1] = argv[1];
tmpargv[2] = argv[3]; tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv); buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]); decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else { } else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv); buf = catAppendOnlyGenericCommand(buf,argc,argv);
} }
@ -608,13 +629,12 @@ int rewriteAppendOnlyFile(char *filename) {
} }
/* Save the expire time */ /* Save the expire time */
if (expiretime != -1) { if (expiretime != -1) {
char cmd[]="*4\r\n$8\r\nEXPIREAT\r\n"; char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
/* If this key is already expired skip it */ /* If this key is already expired skip it */
if (expiretime < now) continue; if (expiretime < now) continue;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
if (rioWriteBulkString(&aof,"ms",2) == 0) goto werr;
} }
} }
dictReleaseIterator(di); dictReleaseIterator(di);

View File

@ -522,25 +522,15 @@ int stringObjectEqualsMs(robj *a) {
return tolower(arg[0]) == 'm' && tolower(arg[1]) == 's' && arg[2] == '\0'; return tolower(arg[0]) == 'm' && tolower(arg[1]) == 's' && arg[2] == '\0';
} }
void expireGenericCommand(redisClient *c, long long offset) { void expireGenericCommand(redisClient *c, long long offset, int unit) {
dictEntry *de; dictEntry *de;
robj *key = c->argv[1], *param = c->argv[2]; robj *key = c->argv[1], *param = c->argv[2];
long long milliseconds; long long milliseconds;
int time_in_seconds = 1;
if (getLongLongFromObjectOrReply(c, param, &milliseconds, NULL) != REDIS_OK) if (getLongLongFromObjectOrReply(c, param, &milliseconds, NULL) != REDIS_OK)
return; return;
/* If no "ms" argument was passed the time is in second, so we need if (unit == UNIT_SECONDS) milliseconds *= 1000;
* to multilpy it by 1000 */
if (c->argc == 4) {
if (!stringObjectEqualsMs(c->argv[3])) {
addReply(c,shared.syntaxerr);
return;
}
time_in_seconds = 0; /* "ms" argument passed. */
}
if (time_in_seconds) milliseconds *= 1000;
milliseconds -= offset; milliseconds -= offset;
de = dictFind(c->db->dict,key->ptr); de = dictFind(c->db->dict,key->ptr);
@ -578,33 +568,23 @@ void expireGenericCommand(redisClient *c, long long offset) {
} }
void expireCommand(redisClient *c) { void expireCommand(redisClient *c) {
if (c->argc > 4) { expireGenericCommand(c,0,UNIT_SECONDS);
addReply(c,shared.syntaxerr);
return;
}
expireGenericCommand(c,0);
} }
void expireatCommand(redisClient *c) { void expireatCommand(redisClient *c) {
if (c->argc > 4) { expireGenericCommand(c,mstime(),UNIT_SECONDS);
addReply(c,shared.syntaxerr);
return;
}
expireGenericCommand(c,mstime());
} }
void ttlCommand(redisClient *c) { void pexpireCommand(redisClient *c) {
long long expire, ttl = -1; expireGenericCommand(c,0,UNIT_MILLISECONDS);
int output_ms = 0; }
if (c->argc == 3) { void pexpireatCommand(redisClient *c) {
if (stringObjectEqualsMs(c->argv[2])) { expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
output_ms = 1; }
} else {
addReply(c,shared.syntaxerr); void ttlGenericCommand(redisClient *c, int output_ms) {
return; long long expire, ttl = -1;
}
}
expire = getExpire(c->db,c->argv[1]); expire = getExpire(c->db,c->argv[1]);
if (expire != -1) { if (expire != -1) {
@ -618,6 +598,14 @@ void ttlCommand(redisClient *c) {
} }
} }
void ttlCommand(redisClient *c) {
ttlGenericCommand(c, 0);
}
void pttlCommand(redisClient *c) {
ttlGenericCommand(c, 1);
}
void persistCommand(redisClient *c) { void persistCommand(redisClient *c) {
dictEntry *de; dictEntry *de;

View File

@ -91,6 +91,7 @@ struct redisCommand redisCommandTable[] = {
{"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,noPreloadGetKeys,2,2,1,0,0}, {"setex",setexCommand,4,"wm",0,noPreloadGetKeys,2,2,1,0,0},
{"psetex",psetexCommand,4,"wm",0,noPreloadGetKeys,2,2,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"r",0,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,"r",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,noPreloadGetKeys,1,-1,1,0,0}, {"del",delCommand,-2,"w",0,noPreloadGetKeys,1,-1,1,0,0},
@ -172,8 +173,10 @@ struct redisCommand redisCommandTable[] = {
{"move",moveCommand,3,"w",0,NULL,1,1,1,0,0}, {"move",moveCommand,3,"w",0,NULL,1,1,1,0,0},
{"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, {"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
{"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, {"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
{"expire",expireCommand,-3,"w",0,NULL,1,1,1,0,0}, {"expire",expireCommand,3,"w",0,NULL,1,1,1,0,0},
{"expireat",expireatCommand,-3,"w",0,NULL,1,1,1,0,0}, {"expireat",expireatCommand,3,"w",0,NULL,1,1,1,0,0},
{"pexpire",pexpireCommand,3,"w",0,NULL,1,1,1,0,0},
{"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
{"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0}, {"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0}, {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"r",0,NULL,0,0,0,0,0}, {"auth",authCommand,2,"r",0,NULL,0,0,0,0,0},
@ -194,7 +197,8 @@ struct redisCommand redisCommandTable[] = {
{"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0}, {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
{"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0}, {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
{"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0}, {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
{"ttl",ttlCommand,-2,"r",0,NULL,1,1,1,0,0}, {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0}, {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
{"slaveof",slaveofCommand,3,"aws",0,NULL,0,0,0,0,0}, {"slaveof",slaveofCommand,3,"aws",0,NULL,0,0,0,0,0},
{"debug",debugCommand,-2,"aw",0,NULL,0,0,0,0,0}, {"debug",debugCommand,-2,"aw",0,NULL,0,0,0,0,0},

View File

@ -211,6 +211,10 @@
/* Scripting */ /* Scripting */
#define REDIS_LUA_TIME_LIMIT 5000 /* milliseconds */ #define REDIS_LUA_TIME_LIMIT 5000 /* milliseconds */
/* Units */
#define UNIT_SECONDS 0
#define UNIT_MILLISECONDS 1
/* We can print the stacktrace, so our assert is defined this way: */ /* We can print the stacktrace, so our assert is defined this way: */
#define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1))) #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@ -988,6 +992,7 @@ void echoCommand(redisClient *c);
void setCommand(redisClient *c); void setCommand(redisClient *c);
void setnxCommand(redisClient *c); void setnxCommand(redisClient *c);
void setexCommand(redisClient *c); void setexCommand(redisClient *c);
void psetexCommand(redisClient *c);
void getCommand(redisClient *c); void getCommand(redisClient *c);
void delCommand(redisClient *c); void delCommand(redisClient *c);
void existsCommand(redisClient *c); void existsCommand(redisClient *c);
@ -1048,8 +1053,11 @@ void mgetCommand(redisClient *c);
void monitorCommand(redisClient *c); void monitorCommand(redisClient *c);
void expireCommand(redisClient *c); void expireCommand(redisClient *c);
void expireatCommand(redisClient *c); void expireatCommand(redisClient *c);
void pexpireCommand(redisClient *c);
void pexpireatCommand(redisClient *c);
void getsetCommand(redisClient *c); void getsetCommand(redisClient *c);
void ttlCommand(redisClient *c); void ttlCommand(redisClient *c);
void pttlCommand(redisClient *c);
void persistCommand(redisClient *c); void persistCommand(redisClient *c);
void slaveofCommand(redisClient *c); void slaveofCommand(redisClient *c);
void debugCommand(redisClient *c); void debugCommand(redisClient *c);

View File

@ -12,16 +12,17 @@ static int checkStringLength(redisClient *c, long long size) {
return REDIS_OK; return REDIS_OK;
} }
void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) { void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire, int unit) {
long seconds = 0; /* initialized to avoid an harmness warning */ long long milliseconds = 0; /* initialized to avoid an harmness warning */
if (expire) { if (expire) {
if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return; return;
if (seconds <= 0) { if (milliseconds <= 0) {
addReplyError(c,"invalid expire time in SETEX"); addReplyError(c,"invalid expire time in SETEX");
return; return;
} }
if (unit == UNIT_SECONDS) milliseconds *= 1000;
} }
if (lookupKeyWrite(c->db,key) != NULL && nx) { if (lookupKeyWrite(c->db,key) != NULL && nx) {
@ -30,23 +31,28 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
} }
setKey(c->db,key,val); setKey(c->db,key,val);
server.dirty++; server.dirty++;
if (expire) setExpire(c->db,key,(time(NULL)+seconds)*1000); if (expire) setExpire(c->db,key,mstime()+milliseconds);
addReply(c, nx ? shared.cone : shared.ok); addReply(c, nx ? shared.cone : shared.ok);
} }
void setCommand(redisClient *c) { void setCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]); c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,0,c->argv[1],c->argv[2],NULL); setGenericCommand(c,0,c->argv[1],c->argv[2],NULL,0);
} }
void setnxCommand(redisClient *c) { void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]); c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,1,c->argv[1],c->argv[2],NULL); setGenericCommand(c,1,c->argv[1],c->argv[2],NULL,0);
} }
void setexCommand(redisClient *c) { void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]); c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]); setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS);
}
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS);
} }
int getGenericCommand(redisClient *c) { int getGenericCommand(redisClient *c) {