From 6efb6c1e069b414305a92cf57cee31d13a84a44a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 11 May 2018 17:31:46 +0200 Subject: [PATCH] ZPOP: renaming to have explicit MIN/MAX score idea. This commit also adds a top comment about a subtle behavior of mixing blocking operations of different types in the same key. --- src/blocked.c | 29 +++++++++----- src/server.c | 16 ++++---- src/server.h | 19 +++++---- src/t_zset.c | 63 ++++++++++++++---------------- tests/unit/type/zset.tcl | 84 ++++++++++++++++++++-------------------- 5 files changed, 111 insertions(+), 100 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 61fd9fa8..f9fd94ea 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -204,14 +204,25 @@ void disconnectAllBlockedClients(void) { /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after - * being called by a client. + * being called by a client. It handles serving clients blocked in + * lists, streams, and sorted sets, via a blocking commands. * * All the keys with at least one client blocked that received at least - * one new element via some PUSH/XADD operation are accumulated into + * one new element via some write operation are accumulated into * the server.ready_keys list. This function will run the list and will * serve clients accordingly. Note that the function will iterate again and * again as a result of serving BRPOPLPUSH we can have new blocking clients - * to serve because of the PUSH side of BRPOPLPUSH. */ + * to serve because of the PUSH side of BRPOPLPUSH. + * + * This function is normally "fair", that is, it will server clients + * using a FIFO behavior. However this fairness is violated in certain + * edge cases, that is, when we have clients blocked at the same time + * in a sorted set and in a list, for the same key (a very odd thing to + * do client side, indeed!). Because mismatching clients (blocking for + * a different type compared to the current key type) are moved in the + * other side of the linked list. However as long as the key starts to + * be used only for a single type, like virtually any Redis application will + * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { while(listLength(server.ready_keys) != 0) { list *l; @@ -316,14 +327,14 @@ void handleClientsBlockedOnKeys(void) { continue; } - int reverse = (receiver->lastcmd && - receiver->lastcmd->proc == bzpopCommand) ? - 0 : 1; + int where = (receiver->lastcmd && + receiver->lastcmd->proc == bzpopminCommand) + ? ZSET_MIN : ZSET_MAX; unblockClient(receiver); - genericZpopCommand(receiver,&rl->key,1,reverse); + genericZpopCommand(receiver,&rl->key,1,where); - propagate(reverse ? - server.zrevpopCommand : server.zpopCommand, + propagate(where == ZSET_MIN ? + server.zpopminCommand : server.zpopmaxCommand, receiver->db->id,receiver->argv,receiver->argc, PROPAGATE_AOF|PROPAGATE_REPL); } diff --git a/src/server.c b/src/server.c index 22317be7..2ff93301 100644 --- a/src/server.c +++ b/src/server.c @@ -198,10 +198,10 @@ struct redisCommand redisCommandTable[] = { {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, - {"zpop",zpopCommand,-2,"wF",0,NULL,1,-1,1,0,0}, - {"zrevpop",zrevpopCommand,-2,"wF",0,NULL,1,-1,1,0,0}, - {"bzpop",bzpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, - {"bzrevpop",bzrevpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, + {"zpopmin",zpopminCommand,-2,"wF",0,NULL,1,-1,1,0,0}, + {"zpopmax",zpopmaxCommand,-2,"wF",0,NULL,1,-1,1,0,0}, + {"bzpopmin",bzpopminCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, + {"bzpopmax",bzpopmaxCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, @@ -1373,8 +1373,8 @@ void createSharedObjects(void) { shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); - shared.zpop = createStringObject("ZPOP",4); - shared.zrevpop = createStringObject("ZREVPOP",7); + shared.zpopmin = createStringObject("ZPOPMIN",7); + shared.zpopmax = createStringObject("ZPOPMAX",7); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -1568,8 +1568,8 @@ void initServerConfig(void) { server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); - server.zpopCommand = lookupCommandByCString("zpop"); - server.zrevpopCommand = lookupCommandByCString("zrevpop"); + server.zpopminCommand = lookupCommandByCString("zpopmin"); + server.zpopmaxCommand = lookupCommandByCString("zpopmax"); server.sremCommand = lookupCommandByCString("srem"); server.execCommand = lookupCommandByCString("exec"); server.expireCommand = lookupCommandByCString("expire"); diff --git a/src/server.h b/src/server.h index a8039dde..b5675b47 100644 --- a/src/server.h +++ b/src/server.h @@ -316,6 +316,8 @@ typedef long long mstime_t; /* millisecond time type. */ /* List related stuff */ #define LIST_HEAD 0 #define LIST_TAIL 1 +#define ZSET_MIN 0 +#define ZSET_MAX 1 /* Sort operations */ #define SORT_OP_GET 0 @@ -763,7 +765,7 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, - *rpop, *lpop, *lpush, *zpop, *zrevpop, *emptyscan, + *rpop, *lpop, *lpush, *zpopmin, *zpopmax, *emptyscan, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -960,9 +962,10 @@ struct redisServer { time_t loading_start_time; off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ - struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, - *rpopCommand, *zpopCommand, *zrevpopCommand, *sremCommand, - *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand; + struct redisCommand *delCommand, *multiCommand, *lpushCommand, + *lpopCommand, *rpopCommand, *zpopminCommand, + *zpopmaxCommand, *sremCommand, *execCommand, + *expireCommand, *pexpireCommand, *xclaimCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1970,10 +1973,10 @@ void zremCommand(client *c); void zscoreCommand(client *c); void zremrangebyscoreCommand(client *c); void zremrangebylexCommand(client *c); -void zpopCommand(client *c); -void zrevpopCommand(client *c); -void bzpopCommand(client *c); -void bzrevpopCommand(client *c); +void zpopminCommand(client *c); +void zpopmaxCommand(client *c); +void bzpopminCommand(client *c); +void bzpopmaxCommand(client *c); void multiCommand(client *c); void execCommand(client *c); void discardCommand(client *c); diff --git a/src/t_zset.c b/src/t_zset.c index 2b11c7d3..b58e58bc 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3070,16 +3070,15 @@ void zscanCommand(client *c) { } /* This command implements the generic zpop operation, used by: - * ZPOP, ZREVPOP, BZPOP and BZREVPOP */ -void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { + * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { int idx; robj *key; robj *zobj; sds ele; double score; - char *events[2] = {"zpop","zrevpop"}; - // Check type and break on the first error, otherwise identify candidate + /* Check type and break on the first error, otherwise identify candidate. */ idx = 0; while (idx < keyc) { key = keyv[idx++]; @@ -3089,7 +3088,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { break; } - // No candidate for zpopping, return empty + /* No candidate for zpopping, return empty. */ if (!zobj) { addReply(c,shared.emptymultibulk); return; @@ -3102,11 +3101,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { unsigned int vlen; long long vlong; - // Get the first or last element in the sorted set - eptr = ziplistIndex(zl,reverse ? -2 : 0); - serverAssertWithInfo(c,zobj,eptr != NULL); - - // There must be an element in the sorted set + /* Get the first or last element in the sorted set. */ + eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); serverAssertWithInfo(c,zobj,eptr != NULL); serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) @@ -3114,22 +3110,22 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { else ele = sdsnewlen(vstr,vlen); - // Get the score + /* Get the score. */ sptr = ziplistNext(zl,eptr); serverAssertWithInfo(c,zobj,sptr != NULL); score = zzlGetScore(sptr); } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; - zskiplistNode *ln; + zskiplistNode *zln; // Get the first or last element in the sorted set - ln = (reverse ? zsl->tail : zsl->header->level[0].forward); + zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward); // There must be an element in the sorted set - serverAssertWithInfo(c,zobj,ln != NULL); - ele = sdsdup(ln->ele); - score = ln->score; + serverAssertWithInfo(c,zobj,zln != NULL); + ele = sdsdup(zln->ele); + score = zln->score; } else { serverPanic("Unknown sorted set encoding"); } @@ -3138,7 +3134,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); server.dirty++; signalModifiedKey(c->db,key); - notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id); + char *events[2] = {"zpopmin","zpopmax"}; + notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); // Remove the key, if indeed needed if (zsetLength(zobj) == 0) { @@ -3153,18 +3150,18 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { sdsfree(ele); } -// ZPOP key [key ...] -void zpopCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,0); +// ZPOPMIN key [key ...] +void zpopminCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN); } -// ZREVPOP key [key ...] -void zrevpopCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,1); +// ZMAXPOP key [key ...] +void zpopmaxCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX); } -/* Blocking Z[REV]POP */ -void blockingGenericZpopCommand(client *c, int reverse) { +/* BZPOPMIN / BZPOPMAX actual implementation. */ +void blockingGenericZpopCommand(client *c, int where) { robj *o; mstime_t timeout; int j; @@ -3181,10 +3178,10 @@ void blockingGenericZpopCommand(client *c, int reverse) { } else { if (zsetLength(o) != 0) { /* Non empty zset, this is like a normal Z[REV]POP. */ - genericZpopCommand(c,&c->argv[j],1,reverse); + genericZpopCommand(c,&c->argv[j],1,where); /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ rewriteClientCommandVector(c,2, - reverse ? shared.zrevpop : shared.zpop, + where == ZSET_MAX ? shared.zpopmax : shared.zpopmin, c->argv[j]); return; } @@ -3203,12 +3200,12 @@ void blockingGenericZpopCommand(client *c, int reverse) { blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL); } -// BZPOP key [key ...] timeout -void bzpopCommand(client *c) { - blockingGenericZpopCommand(c,0); +// BZPOPMIN key [key ...] timeout +void bzpopminCommand(client *c) { + blockingGenericZpopCommand(c,ZSET_MIN); } -// BZREVPOP key [key ...] timeout -void bzrevpopCommand(client *c) { - blockingGenericZpopCommand(c,1); +// BZPOPMAX key [key ...] timeout +void bzpopmaxCommand(client *c) { + blockingGenericZpopCommand(c,ZSET_MAX); } diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 79704579..47f1e728 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -649,74 +649,74 @@ start_server {tags {"zset"}} { } } - test "Basic Z\[REV\]POP with a single key - $encoding" { + test "Basic ZPOP with a single key - $encoding" { r del zset - assert_equal {} [r zpop zset] + assert_equal {} [r zpopmin zset] create_zset zset {-1 a 1 b 2 c 3 d 4 e} - assert_equal {zset -1 a} [r zpop zset] - assert_equal {zset 1 b} [r zpop zset] - assert_equal {zset 4 e} [r zrevpop zset] - assert_equal {zset 3 d} [r zrevpop zset] - assert_equal {zset 2 c} [r zpop zset] + assert_equal {zset -1 a} [r zpopmin zset] + assert_equal {zset 1 b} [r zpopmin zset] + assert_equal {zset 4 e} [r zpopmax zset] + assert_equal {zset 3 d} [r zpopmax zset] + assert_equal {zset 2 c} [r zpopmin zset] assert_equal 0 [r exists zset] r set foo bar - assert_error "*WRONGTYPE*" {r zpop foo} + assert_error "*WRONGTYPE*" {r zpopmin foo} } - test "Z\[REV\]POP with multiple keys - $encoding" { + test "ZPOP with multiple keys - $encoding" { r del z1 z2 z3 foo r set foo bar - assert_equal {} [r zpop z1 z2 z3] - assert_error "*WRONGTYPE*" {r zpop z1 foo} + assert_equal {} [r zpopmin z1 z2 z3] + assert_error "*WRONGTYPE*" {r zpopmin z1 foo} create_zset z1 {0 a 1 b 2 c} - assert_equal {z1 0 a} [r zpop z1 z2 z3] - assert_equal {z1 1 b} [r zpop z3 z2 z1] + assert_equal {z1 0 a} [r zpopmin z1 z2 z3] + assert_equal {z1 1 b} [r zpopmin z3 z2 z1] create_zset z3 {0 a 1 b 2 c} - assert_equal {z3 2 c} [r zrevpop z3 z2 z1] + assert_equal {z3 2 c} [r zpopmax z3 z2 z1] assert_equal 1 [r exists z1] assert_equal 1 [r exists z3] } - test "BZ\[REV\]POP with a single existing sorted set - $encoding" { + test "BZPOP with a single existing sorted set - $encoding" { set rd [redis_deferring_client] create_zset zset {0 a 1 b 2 c} - $rd bzpop zset 5 + $rd bzpopmin zset 5 assert_equal {zset 0 a} [$rd read] - $rd bzpop zset 5 + $rd bzpopmin zset 5 assert_equal {zset 1 b} [$rd read] - $rd bzrevpop zset 5 + $rd bzpopmax zset 5 assert_equal {zset 2 c} [$rd read] assert_equal 0 [r exists zset] } - test "BZ\[REV\]POP with multiple existing sorted sets - $encoding" { + test "BZPOP with multiple existing sorted sets - $encoding" { set rd [redis_deferring_client] create_zset z1 {0 a 1 b 2 c} create_zset z2 {3 d 4 e 5 f} - $rd bzpop z1 z2 5 + $rd bzpopmin z1 z2 5 assert_equal {z1 0 a} [$rd read] - $rd bzrevpop z1 z2 5 + $rd bzpopmax z1 z2 5 assert_equal {z1 2 c} [$rd read] assert_equal 1 [r zcard z1] assert_equal 3 [r zcard z2] - $rd bzrevpop z2 z1 5 + $rd bzpopmax z2 z1 5 assert_equal {z2 5 f} [$rd read] - $rd bzpop z2 z1 5 + $rd bzpopmin z2 z1 5 assert_equal {z2 3 d} [$rd read] assert_equal 1 [r zcard z1] assert_equal 1 [r zcard z2] } - test "BZ\[REV\]POP second sorted set has members - $encoding" { + test "BZPOP second sorted set has members - $encoding" { set rd [redis_deferring_client] r del z1 create_zset z2 {3 d 4 e 5 f} - $rd bzrevpop z1 z2 5 + $rd bzpopmax z1 z2 5 assert_equal {z2 5 f} [$rd read] - $rd bzpop z2 z1 5 + $rd bzpopmin z2 z1 5 assert_equal {z2 3 d} [$rd read] assert_equal 0 [r zcard z1] assert_equal 1 [r zcard z2] @@ -1099,11 +1099,11 @@ start_server {tags {"zset"}} { assert_equal {} $err } - test "BZPOP, ZADD + DEL should not awake blocked client" { + test "BZPOPMIN, ZADD + DEL should not awake blocked client" { set rd [redis_deferring_client] r del zset - $rd bzpop zset 0 + $rd bzpopmin zset 0 r multi r zadd zset 0 foo r del zset @@ -1113,13 +1113,13 @@ start_server {tags {"zset"}} { $rd read } {zset 1 bar} - test "BZPOP, ZADD + DEL + SET should not awake blocked client" { + test "BZPOPMIN, ZADD + DEL + SET should not awake blocked client" { set rd [redis_deferring_client] r del list r del zset - $rd bzpop zset 0 + $rd bzpopmin zset 0 r multi r zadd zset 0 foo r del zset @@ -1130,31 +1130,31 @@ start_server {tags {"zset"}} { $rd read } {zset 1 bar} - test "BZPOP with same key multiple times should work" { + test "BZPOPMIN with same key multiple times should work" { set rd [redis_deferring_client] r del z1 z2 - # Data arriving after the BZPOP. - $rd bzpop z1 z2 z2 z1 0 + # Data arriving after the BZPOPMIN. + $rd bzpopmin z1 z2 z2 z1 0 r zadd z1 0 a assert_equal [$rd read] {z1 0 a} - $rd bzpop z1 z2 z2 z1 0 + $rd bzpopmin z1 z2 z2 z1 0 r zadd z2 1 b assert_equal [$rd read] {z2 1 b} # Data already there. r zadd z1 0 a r zadd z2 1 b - $rd bzpop z1 z2 z2 z1 0 + $rd bzpopmin z1 z2 z2 z1 0 assert_equal [$rd read] {z1 0 a} - $rd bzpop z1 z2 z2 z1 0 + $rd bzpopmin z1 z2 z2 z1 0 assert_equal [$rd read] {z2 1 b} } - test "MULTI/EXEC is isolated from the point of view of BZPOP" { + test "MULTI/EXEC is isolated from the point of view of BZPOPMIN" { set rd [redis_deferring_client] r del zset - $rd bzpop zset 0 + $rd bzpopmin zset 0 r multi r zadd zset 0 a r zadd zset 1 b @@ -1163,11 +1163,11 @@ start_server {tags {"zset"}} { $rd read } {zset 0 a} - test "BZPOP with variadic ZADD" { + test "BZPOPMIN with variadic ZADD" { set rd [redis_deferring_client] r del zset if {$::valgrind} {after 100} - $rd bzpop zset 0 + $rd bzpopmin zset 0 if {$::valgrind} {after 100} assert_equal 2 [r zadd zset -1 foo 1 bar] if {$::valgrind} {after 100} @@ -1175,10 +1175,10 @@ start_server {tags {"zset"}} { assert_equal {bar} [r zrange zset 0 -1] } - test "BZPOP with zero timeout should block indefinitely" { + test "BZPOPMIN with zero timeout should block indefinitely" { set rd [redis_deferring_client] r del zset - $rd bzpop zset 0 + $rd bzpopmin zset 0 after 1000 r zadd zset 0 foo assert_equal {zset 0 foo} [$rd read]