mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 16:10:50 +00:00
SPOP with count: initial fixes to the implementation.
Severan problems are addressed but still a few missing. Since replication of this command was more complex than others since it needs to replicate multiple SREM commands, an old API able to do this was reused (it was taken inside the implementation since it was pretty obvious soon or later that would be useful). The API was improved a bit so that now a command may opt-out for the standard command replication when the server.dirty counter is incremented, in order to "manually" replicate what it wants.
This commit is contained in:
parent
585d1a60bf
commit
6b5922dcbb
23
src/redis.c
23
src/redis.c
@ -1529,6 +1529,7 @@ void initServerConfig(void) {
|
||||
server.lpushCommand = lookupCommandByCString("lpush");
|
||||
server.lpopCommand = lookupCommandByCString("lpop");
|
||||
server.rpopCommand = lookupCommandByCString("rpop");
|
||||
server.sremCommand = lookupCommandByCString("srem");
|
||||
|
||||
/* Slow log */
|
||||
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
|
||||
@ -2001,6 +2002,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
|
||||
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
|
||||
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
|
||||
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
|
||||
*
|
||||
* This should not be used inside commands implementation. Use instead
|
||||
* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
|
||||
*/
|
||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
|
||||
int flags)
|
||||
@ -2027,6 +2031,13 @@ void forceCommandPropagation(redisClient *c, int flags) {
|
||||
if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF;
|
||||
}
|
||||
|
||||
/* Avoid that the executed command is propagated at all. This way we
|
||||
* are free to just propagate what we want using the alsoPropagate()
|
||||
* API. */
|
||||
void preventCommandPropagation(redisClient *c) {
|
||||
c->flags |= REDIS_PREVENT_PROP;
|
||||
}
|
||||
|
||||
/* Call() is the core of Redis execution of a command */
|
||||
void call(redisClient *c, int flags) {
|
||||
long long dirty, start, duration;
|
||||
@ -2080,7 +2091,7 @@ void call(redisClient *c, int flags) {
|
||||
}
|
||||
|
||||
/* Propagate the command into the AOF and replication link */
|
||||
if (flags & REDIS_CALL_PROPAGATE) {
|
||||
if (flags & REDIS_CALL_PROPAGATE && (c->flags & REDIS_PREVENT_PROP) == 0) {
|
||||
int flags = REDIS_PROPAGATE_NONE;
|
||||
|
||||
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
|
||||
@ -2091,13 +2102,15 @@ void call(redisClient *c, int flags) {
|
||||
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
|
||||
}
|
||||
|
||||
/* Restore the old FORCE_AOF/REPL flags, since call can be executed
|
||||
/* Restore the old replication flags, since call can be executed
|
||||
* recursively. */
|
||||
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
|
||||
c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
|
||||
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
|
||||
c->flags |= client_old_flags &
|
||||
(REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
|
||||
|
||||
/* Handle the alsoPropagate() API to handle commands that want to propagate
|
||||
* multiple separated commands. */
|
||||
* multiple separated commands. Note that alsoPropagate() is not affected
|
||||
* by REDIS_PREVENT_PROP flag. */
|
||||
if (server.also_propagate.numops) {
|
||||
int j;
|
||||
redisOp *rop;
|
||||
|
@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
|
||||
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
|
||||
#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
|
||||
#define REDIS_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if REDIS_BLOCKED flag is set. */
|
||||
@ -708,7 +709,7 @@ struct redisServer {
|
||||
off_t loading_process_events_interval_bytes;
|
||||
/* Fast pointers to often looked up command */
|
||||
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
|
||||
*rpopCommand;
|
||||
*rpopCommand, *sremCommand;
|
||||
/* Fields used only for stats */
|
||||
time_t stat_starttime; /* Server start time */
|
||||
long long stat_numcommands; /* Number of processed commands */
|
||||
@ -1252,6 +1253,7 @@ void call(redisClient *c, int flags);
|
||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
|
||||
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
|
||||
void forceCommandPropagation(redisClient *c, int flags);
|
||||
void preventCommandPropagation(redisClient *c);
|
||||
int prepareForShutdown();
|
||||
#ifdef __GNUC__
|
||||
void redisLog(int level, const char *fmt, ...)
|
||||
|
69
src/t_set.c
69
src/t_set.c
@ -45,6 +45,11 @@ robj *setTypeCreate(robj *value) {
|
||||
return createSetObject();
|
||||
}
|
||||
|
||||
/* Add the specified value into a set. The function takes care of incrementing
|
||||
* the reference count of the object if needed in order to retain a copy.
|
||||
*
|
||||
* If the value was already member of the set, nothing is done and 0 is
|
||||
* returned, otherwise the new element is added and 1 is returned. */
|
||||
int setTypeAdd(robj *subject, robj *value) {
|
||||
long long llval;
|
||||
if (subject->encoding == REDIS_ENCODING_HT) {
|
||||
@ -487,7 +492,7 @@ void spopWithCountCommand(redisClient *c) {
|
||||
long l;
|
||||
unsigned long count, size;
|
||||
unsigned long elements_returned;
|
||||
robj *set, *aux, *aux_set;
|
||||
robj *set, *aux_set;
|
||||
int64_t llele;
|
||||
|
||||
/* Get the count argument */
|
||||
@ -522,7 +527,6 @@ void spopWithCountCommand(redisClient *c) {
|
||||
* The number of requested elements is greater than or equal to
|
||||
* the number of elements inside the set: simply return the whole set. */
|
||||
if (count >= size) {
|
||||
|
||||
/* We just return the entire set */
|
||||
sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION);
|
||||
|
||||
@ -531,10 +535,9 @@ void spopWithCountCommand(redisClient *c) {
|
||||
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
|
||||
|
||||
/* Replicate/AOF this command as an SREM operation */
|
||||
aux = createStringObject("DEL",3);
|
||||
rewriteClientCommandVector(c,2,aux,c->argv[1]);
|
||||
decrRefCount(aux);
|
||||
|
||||
rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
|
||||
signalModifiedKey(c->db,c->argv[1]);
|
||||
server.dirty++;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -544,9 +547,7 @@ void spopWithCountCommand(redisClient *c) {
|
||||
|
||||
/* We need an auxiliary set. Optimistically, we create a set using an
|
||||
* Intset internally. */
|
||||
aux = createStringObjectFromLongLong(0);
|
||||
aux_set = setTypeCreate(aux);
|
||||
decrRefCount(aux);
|
||||
aux_set = createIntsetObject();
|
||||
|
||||
/* Get the count requested of random elements from the set into our
|
||||
* auxiliary set. */
|
||||
@ -555,47 +556,43 @@ void spopWithCountCommand(redisClient *c) {
|
||||
|
||||
{
|
||||
setTypeIterator *si;
|
||||
robj *objele;
|
||||
robj *objele, **propargv;
|
||||
int element_encoding;
|
||||
|
||||
addReplyMultiBulkLen(c, elements_returned);
|
||||
|
||||
/* Replicate/AOF this command as an SREM operation */
|
||||
aux = createStringObject("SREM",4);
|
||||
|
||||
si = setTypeInitIterator(aux_set);
|
||||
while ((element_encoding = setTypeNext(si, &objele, &llele)) != -1) {
|
||||
if (element_encoding == REDIS_ENCODING_HT) {
|
||||
|
||||
addReplyBulk(c, objele);
|
||||
|
||||
/* Replicate/AOF this command as an SREM commands */
|
||||
rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
|
||||
setTypeRemove(set, objele);
|
||||
}
|
||||
else if (element_encoding == REDIS_ENCODING_INTSET) {
|
||||
/* TODO: setTypeRemove() forces us to convert all of the ints
|
||||
* to string... isn't there a nicer way to do this? */
|
||||
incrRefCount(objele);
|
||||
} else if (element_encoding == REDIS_ENCODING_INTSET) {
|
||||
objele = createStringObjectFromLongLong(llele);
|
||||
addReplyBulk(c, objele);
|
||||
|
||||
/* Replicate/AOF this command as an SREM commands */
|
||||
rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
|
||||
setTypeRemove(set, objele);
|
||||
|
||||
/* We created it, we kill it. */
|
||||
decrRefCount(objele);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
setTypeRemove(set, objele);
|
||||
addReplyBulk(c, objele);
|
||||
|
||||
/* Replicate/AOF this command as an SREM operation */
|
||||
propargv = zmalloc(sizeof(robj*)*3);
|
||||
propargv[0] = createStringObject("SREM",4);
|
||||
propargv[1] = c->argv[1];
|
||||
incrRefCount(c->argv[1]);
|
||||
propargv[2] = objele;
|
||||
incrRefCount(objele);
|
||||
|
||||
alsoPropagate(server.sremCommand,c->db->id,propargv,3,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
|
||||
decrRefCount(objele);
|
||||
server.dirty++;
|
||||
}
|
||||
setTypeReleaseIterator(si);
|
||||
|
||||
decrRefCount(aux);
|
||||
}
|
||||
|
||||
/* Free the auxiliary set - we need it no more. */
|
||||
/* Don't propagate the command itself even if we incremented the
|
||||
* dirty counter. We don't want to propagate an SPOP command since
|
||||
* we propagated the command as a set of SREMs operations using
|
||||
* the alsoPropagate() API. */
|
||||
preventCommandPropagation(c);
|
||||
decrRefCount(aux_set);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user