Added command propagation API.

This commit is contained in:
antirez 2012-02-28 16:17:00 +01:00
parent 64ef44d568
commit ad08d059d0
2 changed files with 40 additions and 7 deletions

View File

@ -1175,10 +1175,33 @@ struct redisCommand *lookupCommandByCString(char *s) {
return cmd;
}
/* Propagate the specified command (in the context of the specified database id)
* to AOF, Slaves and Monitors.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not geneated from reading an AOF. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
/* Call the command. */
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
@ -1189,20 +1212,25 @@ void call(redisClient *c, int flags) {
if (server.loading && c->flags & REDIS_LUA_CLIENT)
flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
if (flags & REDIS_CALL_SLOWLOG)
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
if (flags & REDIS_CALL_STATS) {
c->cmd->microseconds += duration;
c->cmd->calls++;
}
/* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
int flags = REDIS_PROPAGATE_NONE;
if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
flags |= REDIS_PROPAGATE_REPL;
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
server.stat_numcommands++;
}

View File

@ -245,6 +245,11 @@
#define REDIS_CALL_PROPAGATE 4
#define REDIS_CALL_FULL (REDIS_CALL_SLOWLOG | REDIS_CALL_STATS | REDIS_CALL_PROPAGATE)
/* Command propagation flags, see propagate() function */
#define REDIS_PROPAGATE_NONE 0
#define REDIS_PROPAGATE_AOF 1
#define REDIS_PROPAGATE_REPL 2
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))