diff --git a/src/redis.c b/src/redis.c index 6f5c4020..277d2e4b 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1529,6 +1529,7 @@ void initServerConfig(void) { server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); + server.sremCommand = lookupCommandByCString("srem"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -2001,6 +2002,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { * + REDIS_PROPAGATE_NONE (no propagation of command at all) * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled) * + REDIS_PROPAGATE_REPL (propagate into the replication link) + * + * This should not be used inside commands implementation. Use instead + * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) @@ -2027,6 +2031,13 @@ void forceCommandPropagation(redisClient *c, int flags) { if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF; } +/* Avoid that the executed command is propagated at all. This way we + * are free to just propagate what we want using the alsoPropagate() + * API. */ +void preventCommandPropagation(redisClient *c) { + c->flags |= REDIS_PREVENT_PROP; +} + /* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start, duration; @@ -2080,7 +2091,7 @@ void call(redisClient *c, int flags) { } /* Propagate the command into the AOF and replication link */ - if (flags & REDIS_CALL_PROPAGATE) { + if (flags & REDIS_CALL_PROPAGATE && (c->flags & REDIS_PREVENT_PROP) == 0) { int flags = REDIS_PROPAGATE_NONE; if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; @@ -2091,13 +2102,15 @@ void call(redisClient *c, int flags) { propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } - /* Restore the old FORCE_AOF/REPL flags, since call can be executed + /* Restore the old replication flags, since call can be executed * recursively. */ - c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); - c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); + c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP); + c->flags |= client_old_flags & + (REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP); /* Handle the alsoPropagate() API to handle commands that want to propagate - * multiple separated commands. */ + * multiple separated commands. Note that alsoPropagate() is not affected + * by REDIS_PREVENT_PROP flag. */ if (server.also_propagate.numops) { int j; redisOp *rop; diff --git a/src/redis.h b/src/redis.h index 2170c5d2..a675d4f1 100644 --- a/src/redis.h +++ b/src/redis.h @@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */ #define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */ #define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */ +#define REDIS_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */ /* Client block type (btype field in client structure) * if REDIS_BLOCKED flag is set. */ @@ -708,7 +709,7 @@ struct redisServer { off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, - *rpopCommand; + *rpopCommand, *sremCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1252,6 +1253,7 @@ void call(redisClient *c, int flags); void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags); void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target); void forceCommandPropagation(redisClient *c, int flags); +void preventCommandPropagation(redisClient *c); int prepareForShutdown(); #ifdef __GNUC__ void redisLog(int level, const char *fmt, ...) diff --git a/src/t_set.c b/src/t_set.c index f3f8bbac..619b0f8a 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -45,6 +45,11 @@ robj *setTypeCreate(robj *value) { return createSetObject(); } +/* Add the specified value into a set. The function takes care of incrementing + * the reference count of the object if needed in order to retain a copy. + * + * If the value was already member of the set, nothing is done and 0 is + * returned, otherwise the new element is added and 1 is returned. */ int setTypeAdd(robj *subject, robj *value) { long long llval; if (subject->encoding == REDIS_ENCODING_HT) { @@ -487,7 +492,7 @@ void spopWithCountCommand(redisClient *c) { long l; unsigned long count, size; unsigned long elements_returned; - robj *set, *aux, *aux_set; + robj *set, *aux_set; int64_t llele; /* Get the count argument */ @@ -522,7 +527,6 @@ void spopWithCountCommand(redisClient *c) { * The number of requested elements is greater than or equal to * the number of elements inside the set: simply return the whole set. */ if (count >= size) { - /* We just return the entire set */ sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION); @@ -531,10 +535,9 @@ void spopWithCountCommand(redisClient *c) { notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id); /* Replicate/AOF this command as an SREM operation */ - aux = createStringObject("DEL",3); - rewriteClientCommandVector(c,2,aux,c->argv[1]); - decrRefCount(aux); - + rewriteClientCommandVector(c,2,shared.del,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + server.dirty++; return; } @@ -544,9 +547,7 @@ void spopWithCountCommand(redisClient *c) { /* We need an auxiliary set. Optimistically, we create a set using an * Intset internally. */ - aux = createStringObjectFromLongLong(0); - aux_set = setTypeCreate(aux); - decrRefCount(aux); + aux_set = createIntsetObject(); /* Get the count requested of random elements from the set into our * auxiliary set. */ @@ -555,47 +556,43 @@ void spopWithCountCommand(redisClient *c) { { setTypeIterator *si; - robj *objele; + robj *objele, **propargv; int element_encoding; addReplyMultiBulkLen(c, elements_returned); - /* Replicate/AOF this command as an SREM operation */ - aux = createStringObject("SREM",4); - si = setTypeInitIterator(aux_set); while ((element_encoding = setTypeNext(si, &objele, &llele)) != -1) { if (element_encoding == REDIS_ENCODING_HT) { - - addReplyBulk(c, objele); - - /* Replicate/AOF this command as an SREM commands */ - rewriteClientCommandVector(c, 3, aux, c->argv[1], objele); - setTypeRemove(set, objele); - } - else if (element_encoding == REDIS_ENCODING_INTSET) { - /* TODO: setTypeRemove() forces us to convert all of the ints - * to string... isn't there a nicer way to do this? */ + incrRefCount(objele); + } else if (element_encoding == REDIS_ENCODING_INTSET) { objele = createStringObjectFromLongLong(llele); - addReplyBulk(c, objele); - - /* Replicate/AOF this command as an SREM commands */ - rewriteClientCommandVector(c, 3, aux, c->argv[1], objele); - setTypeRemove(set, objele); - - /* We created it, we kill it. */ - decrRefCount(objele); - } - else { + } else { redisPanic("Unknown set encoding"); } + setTypeRemove(set, objele); + addReplyBulk(c, objele); + + /* Replicate/AOF this command as an SREM operation */ + propargv = zmalloc(sizeof(robj*)*3); + propargv[0] = createStringObject("SREM",4); + propargv[1] = c->argv[1]; + incrRefCount(c->argv[1]); + propargv[2] = objele; + incrRefCount(objele); + + alsoPropagate(server.sremCommand,c->db->id,propargv,3,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); + decrRefCount(objele); + server.dirty++; } setTypeReleaseIterator(si); - - decrRefCount(aux); } - /* Free the auxiliary set - we need it no more. */ + /* Don't propagate the command itself even if we incremented the + * dirty counter. We don't want to propagate an SPOP command since + * we propagated the command as a set of SREMs operations using + * the alsoPropagate() API. */ + preventCommandPropagation(c); decrRefCount(aux_set); }