Added a new API to replicate an additional command after the replication of the currently executed command, in order to propagte the LPUSH originating from RPOPLPUSH and indirectly by BRPOPLPUSH.

This commit is contained in:
antirez 2012-02-28 18:03:08 +01:00
parent d8b1228bf6
commit eeb34eff52
3 changed files with 71 additions and 22 deletions

View File

@ -962,6 +962,7 @@ void initServerConfig() {
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@ -1192,6 +1193,20 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
/* Used inside commands to propatate an additional command if needed. */
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int target)
{
propagatedItem *pi = &server.also_propagate;
redisAssert(pi->target == REDIS_PROPAGATE_NONE);
pi->cmd = cmd;
pi->dbid = dbid;
pi->argv = argv;
pi->argc = argc;
pi->target = target;
}
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
@ -1202,6 +1217,7 @@ void call(redisClient *c, int flags) {
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
/* Call the command. */
server.also_propagate.target = REDIS_PROPAGATE_NONE;
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
@ -1232,6 +1248,16 @@ void call(redisClient *c, int flags) {
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
/* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
* PUSH command. */
if (server.also_propagate.target != REDIS_PROPAGATE_NONE) {
int j;
propagatedItem *pi = &server.also_propagate;
propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target);
for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]);
zfree(pi->argv);
}
server.stat_numcommands++;
}

View File

@ -399,6 +399,17 @@ typedef struct clientBufferLimitsConfig {
time_t soft_limit_seconds;
} clientBufferLimitsConfig;
/* Currently only used to additionally propagate more commands to AOF/Replication
* after the propagation of the executed command.
* The structure contains everything needed to propagate a command:
* argv and argc, the ID of the database, pointer to the command table entry,
* and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */
typedef struct propagatedItem {
robj **argv;
int argc, dbid, target;
struct redisCommand *cmd;
} propagatedItem;
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
@ -562,7 +573,7 @@ struct redisServer {
off_t loading_loaded_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand;
struct redisCommand *delCommand, *multiCommand, *lpushCommand;
int cronloops; /* Number of times the cron function run */
time_t lastsave; /* Unix time of last save succeeede */
/* Fields used only for stats */
@ -612,6 +623,8 @@ struct redisServer {
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
/* Propagation of commands in AOF / replication */
propagatedItem also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
@ -956,6 +969,7 @@ struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(char *s);
void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
int prepareForShutdown();
void redisLog(int level, const char *fmt, ...);
void redisLogRaw(int level, const char *msg);

View File

@ -288,6 +288,18 @@ void pushGenericCommand(redisClient *c, int where) {
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
/* Alter the replication of the command accordingly to the number of
* list elements delivered to clients waiting into a blocking operation.
* We do that only if there were waiting clients, and only if still some
* element was pushed into the list (othewise dirty is 0 and nothign will
* be propagated). */
if (waiting && pushed) {
/* CMD KEY a b C D E */
for (j = 2; j < pushed+2; j++)
rewriteClientCommandArgument(c,j,c->argv[j+waiting]);
c->argc -= waiting;
}
}
void lpushCommand(redisClient *c) {
@ -655,8 +667,6 @@ void lremCommand(redisClient *c) {
*/
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
robj *aux;
if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
@ -666,27 +676,19 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey,
signalModifiedKey(c->db,dstkey);
}
listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key
* watched by BRPOPLPUSH, we need to rewrite the command vector
* as an LPUSH.
*
* If this is called directly by RPOPLPUSH (either directly
* or via a BRPOPLPUSH where the popped list exists)
* we should replicate the RPOPLPUSH command itself. */
if (c != origclient) {
aux = createStringObject("LPUSH",5);
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
decrRefCount(aux);
} else {
/* Make sure to always use RPOPLPUSH in the replication / AOF,
* even if the original command was BRPOPLPUSH. */
aux = createStringObject("RPOPLPUSH",9);
rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
decrRefCount(aux);
/* Additionally propagate this PUSH operation together with
* the operation performed by the command. */
{
robj **argv = zmalloc(sizeof(robj*)*3);
argv[0] = createStringObject("LPUSH",5);
argv[1] = dstkey;
argv[2] = value;
incrRefCount(argv[1]);
incrRefCount(argv[2]);
alsoPropagate(server.lpushCommand,c->db->id,argv,3,
REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
}
server.dirty++;
}
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
@ -717,6 +719,13 @@ void rpoplpushCommand(redisClient *c) {
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
/* Replicate this as a simple RPOP since the LPUSH side is replicated
* by rpoplpushHandlePush() call if needed (it may not be needed
* if a client is blocking wait a push against the list). */
rewriteClientCommandVector(c,2,
resetRefCount(createStringObject("RPOP",4)),
c->argv[1]);
}
}