From 25bb8a4452d9c74ee522c89f682115ab45fe51a4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Sep 2010 14:35:25 +0200 Subject: [PATCH 1/4] Add ZREVRANGEBYSCORE and refactor Z*RANGEBYSCORE --- src/redis.c | 1 + src/redis.h | 1 + src/t_zset.c | 264 ++++++++++++++++++++++++--------------- tests/unit/type/zset.tcl | 95 +++++++------- 4 files changed, 219 insertions(+), 142 deletions(-) diff --git a/src/redis.c b/src/redis.c index 7b1b3f4f..fca5f1b3 100644 --- a/src/redis.c +++ b/src/redis.c @@ -120,6 +120,7 @@ struct redisCommand readonlyCommandTable[] = { {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, + {"zrevrangebyscore",zrevrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcard",zcardCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, diff --git a/src/redis.h b/src/redis.h index 3de054f2..d53ccf5b 100644 --- a/src/redis.h +++ b/src/redis.h @@ -895,6 +895,7 @@ void zaddCommand(redisClient *c); void zincrbyCommand(redisClient *c); void zrangeCommand(redisClient *c); void zrangebyscoreCommand(redisClient *c); +void zrevrangebyscoreCommand(redisClient *c); void zcountCommand(redisClient *c); void zrevrangeCommand(redisClient *c); void zcardCommand(redisClient *c); diff --git a/src/t_zset.c b/src/t_zset.c index 93ade5aa..e528205a 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -296,6 +296,44 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { return NULL; } +typedef struct { + double min, max; + int minex, maxex; /* are min or max exclusive? */ +} zrangespec; + +/* Populate the rangespec according to the objects min and max. */ +int zslParseRange(robj *min, robj *max, zrangespec *spec) { + spec->minex = spec->maxex = 0; + + /* Parse the min-max interval. If one of the values is prefixed + * by the "(" character, it's considered "open". For instance + * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max + * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ + if (min->encoding == REDIS_ENCODING_INT) { + spec->min = (long)min->ptr; + } else { + if (((char*)min->ptr)[0] == '(') { + spec->min = strtod((char*)min->ptr+1,NULL); + spec->minex = 1; + } else { + spec->min = strtod((char*)min->ptr,NULL); + } + } + if (max->encoding == REDIS_ENCODING_INT) { + spec->max = (long)max->ptr; + } else { + if (((char*)max->ptr)[0] == '(') { + spec->max = strtod((char*)max->ptr+1,NULL); + spec->maxex = 1; + } else { + spec->max = strtod((char*)max->ptr,NULL); + } + } + + return REDIS_OK; +} + + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ @@ -781,125 +819,153 @@ void zrevrangeCommand(redisClient *c) { zrangeGenericCommand(c,1); } -/* This command implements both ZRANGEBYSCORE and ZCOUNT. - * If justcount is non-zero, just the count is returned. */ -void genericZrangebyscoreCommand(redisClient *c, int justcount) { - robj *o; - double min, max; - int minex = 0, maxex = 0; /* are min or max exclusive? */ +/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. + * If "justcount", only the number of elements in the range is returned. */ +void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { + zrangespec range; + robj *o, *emptyreply; + zset *zsetobj; + zskiplist *zsl; + zskiplistNode *ln; int offset = 0, limit = -1; int withscores = 0; - int badsyntax = 0; + unsigned long rangelen = 0; + void *replylen = NULL; - /* Parse the min-max interval. If one of the values is prefixed - * by the "(" character, it's considered "open". For instance - * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max - * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ - if (((char*)c->argv[2]->ptr)[0] == '(') { - min = strtod((char*)c->argv[2]->ptr+1,NULL); - minex = 1; - } else { - min = strtod(c->argv[2]->ptr,NULL); - } - if (((char*)c->argv[3]->ptr)[0] == '(') { - max = strtod((char*)c->argv[3]->ptr+1,NULL); - maxex = 1; - } else { - max = strtod(c->argv[3]->ptr,NULL); - } + /* Parse the range arguments. */ + zslParseRange(c->argv[2],c->argv[3],&range); - /* Parse "WITHSCORES": note that if the command was called with - * the name ZCOUNT then we are sure that c->argc == 4, so we'll never - * enter the following paths to parse WITHSCORES and LIMIT. */ - if (c->argc == 5 || c->argc == 8) { - if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0) - withscores = 1; - else - badsyntax = 1; - } - if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) - badsyntax = 1; - if (badsyntax) { - addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE"); - return; - } + /* Parse optional extra arguments. Note that ZCOUNT will exactly have + * 4 arguments, so we'll never enter the following code path. */ + if (c->argc > 4) { + int remaining = c->argc - 4; + int pos = 4; - /* Parse "LIMIT" */ - if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) { - addReply(c,shared.syntaxerr); - return; - } else if (c->argc == (7 + withscores)) { - offset = atoi(c->argv[5]->ptr); - limit = atoi(c->argv[6]->ptr); - if (offset < 0) offset = 0; + while (remaining) { + if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) { + pos++; remaining--; + withscores = 1; + } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { + offset = atoi(c->argv[pos+1]->ptr); + limit = atoi(c->argv[pos+2]->ptr); + pos += 3; remaining -= 3; + } else { + addReply(c,shared.syntaxerr); + return; + } + } } /* Ok, lookup the key and get the range */ - o = lookupKeyRead(c->db,c->argv[1]); - if (o == NULL) { - addReply(c,justcount ? shared.czero : shared.emptymultibulk); - } else { - if (o->type != REDIS_ZSET) { - addReply(c,shared.wrongtypeerr); + emptyreply = justcount ? shared.czero : shared.emptymultibulk; + if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL || + checkType(c,o,REDIS_ZSET)) return; + zsetobj = o->ptr; + zsl = zsetobj->zsl; + + /* If reversed, assume the elements are sorted from high to low score. */ + ln = zslFirstWithScore(zsl,range.min); + if (reverse) { + /* If range.min is out of range, ln will be NULL and we need to use + * the tail of the skiplist as first node of the range. */ + if (ln == NULL) ln = zsl->tail; + + /* zslFirstWithScore returns the first element with where with + * score >= range.min, so backtrack to make sure the element we use + * here has score <= range.min. */ + while (ln && ln->score > range.min) ln = ln->backward; + + /* Move to the right element according to the range spec. */ + if (range.minex) { + /* Find last element with score < range.min */ + while (ln && ln->score == range.min) ln = ln->backward; } else { - zset *zsetobj = o->ptr; - zskiplist *zsl = zsetobj->zsl; - zskiplistNode *ln; - robj *ele; - void *replylen = NULL; - unsigned long rangelen = 0; - - /* Get the first node with the score >= min, or with - * score > min if 'minex' is true. */ - ln = zslFirstWithScore(zsl,min); - while (minex && ln && ln->score == min) ln = ln->level[0].forward; - - if (ln == NULL) { - /* No element matching the speciifed interval */ - addReply(c,justcount ? shared.czero : shared.emptymultibulk); - return; - } - - /* We don't know in advance how many matching elements there - * are in the list, so we push this object that will represent - * the multi-bulk length in the output buffer, and will "fix" - * it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); - - while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { - if (offset) { - offset--; - ln = ln->level[0].forward; - continue; - } - if (limit == 0) break; - if (!justcount) { - ele = ln->obj; - addReplyBulk(c,ele); - if (withscores) - addReplyDouble(c,ln->score); - } + /* Find last element with score <= range.min */ + while (ln && ln->level[0].forward && + ln->level[0].forward->score == range.min) ln = ln->level[0].forward; - rangelen++; - if (limit > 0) limit--; - } - if (justcount) { - addReplyLongLong(c,(long)rangelen); + } + } else { + if (range.minex) { + /* Find first element with score > range.min */ + while (ln && ln->score == range.min) ln = ln->level[0].forward; + } + } + + /* No "first" element in the specified interval. */ + if (ln == NULL) { + addReply(c,emptyreply); + return; + } + + /* We don't know in advance how many matching elements there + * are in the list, so we push this object that will represent + * the multi-bulk length in the output buffer, and will "fix" + * it later */ + if (!justcount) + replylen = addDeferredMultiBulkLength(c); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while(ln && offset--) { + if (reverse) + ln = ln->backward; + else + ln = ln->level[0].forward; + } + + while (ln && limit--) { + /* Check if this this element is in range. */ + if (reverse) { + if (range.maxex) { + /* Element should have score > range.max */ + if (ln->score <= range.max) break; } else { - setDeferredMultiBulkLength(c,replylen, - withscores ? (rangelen*2) : rangelen); + /* Element should have score >= range.max */ + if (ln->score < range.max) break; + } + } else { + if (range.maxex) { + /* Element should have score < range.max */ + if (ln->score >= range.max) break; + } else { + /* Element should have score <= range.max */ + if (ln->score > range.max) break; } } + + /* Do our magic */ + rangelen++; + if (!justcount) { + addReplyBulk(c,ln->obj); + if (withscores) + addReplyDouble(c,ln->score); + } + + if (reverse) + ln = ln->backward; + else + ln = ln->level[0].forward; + } + + if (justcount) { + addReplyLongLong(c,(long)rangelen); + } else { + setDeferredMultiBulkLength(c,replylen, + withscores ? (rangelen*2) : rangelen); } } void zrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0); + genericZrangebyscoreCommand(c,0,0); +} + +void zrevrangebyscoreCommand(redisClient *c) { + genericZrangebyscoreCommand(c,1,0); } void zcountCommand(redisClient *c) { - genericZrangebyscoreCommand(c,1); + genericZrangebyscoreCommand(c,0,1); } void zcardCommand(redisClient *c) { diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 642922e9..949681eb 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -199,26 +199,59 @@ start_server {tags {"zset"}} { list $v1 $v2 [r zscore zset foo] [r zscore zset bar] } {{bar foo} {foo bar} -2 6} - test {ZRANGEBYSCORE and ZCOUNT basics} { - r del zset - r zadd zset 1 a - r zadd zset 2 b - r zadd zset 3 c - r zadd zset 4 d - r zadd zset 5 e - list [r zrangebyscore zset 2 4] [r zrangebyscore zset (2 (4] \ - [r zcount zset 2 4] [r zcount zset (2 (4] - } {{b c d} c 3 1} + proc create_default_zset {} { + create_zset zset {-inf a 1 b 2 c 3 d 4 e 5 f +inf g} + } - test {ZRANGEBYSCORE withscores} { - r del zset - r zadd zset 1 a - r zadd zset 2 b - r zadd zset 3 c - r zadd zset 4 d - r zadd zset 5 e - r zrangebyscore zset 2 4 withscores - } {b 2 c 3 d 4} + test "ZRANGEBYSCORE/ZREVRANGEBYSCORE/ZCOUNT basics" { + create_default_zset + + # inclusive range + assert_equal {a b c} [r zrangebyscore zset -inf 2] + assert_equal {b c d} [r zrangebyscore zset 0 3] + assert_equal {d e f} [r zrangebyscore zset 3 6] + assert_equal {e f g} [r zrangebyscore zset 4 +inf] + assert_equal {c b a} [r zrevrangebyscore zset 2 -inf] + assert_equal {d c b} [r zrevrangebyscore zset 3 0] + assert_equal {f e d} [r zrevrangebyscore zset 6 3] + assert_equal {g f e} [r zrevrangebyscore zset +inf 4] + assert_equal 3 [r zcount zset 0 3] + + # exclusive range + assert_equal {b} [r zrangebyscore zset (-inf (2] + assert_equal {b c} [r zrangebyscore zset (0 (3] + assert_equal {e f} [r zrangebyscore zset (3 (6] + assert_equal {f} [r zrangebyscore zset (4 (+inf] + assert_equal {b} [r zrevrangebyscore zset (2 (-inf] + assert_equal {c b} [r zrevrangebyscore zset (3 (0] + assert_equal {f e} [r zrevrangebyscore zset (6 (3] + assert_equal {f} [r zrevrangebyscore zset (+inf (4] + assert_equal 2 [r zcount zset (0 (3] + } + + test "ZRANGEBYSCORE with WITHSCORES" { + create_default_zset + assert_equal {b 1 c 2 d 3} [r zrangebyscore zset 0 3 withscores] + assert_equal {d 3 c 2 b 1} [r zrevrangebyscore zset 3 0 withscores] + } + + test "ZRANGEBYSCORE with LIMIT" { + create_default_zset + assert_equal {b c} [r zrangebyscore zset 0 10 LIMIT 0 2] + assert_equal {d e f} [r zrangebyscore zset 0 10 LIMIT 2 3] + assert_equal {d e f} [r zrangebyscore zset 0 10 LIMIT 2 10] + assert_equal {} [r zrangebyscore zset 0 10 LIMIT 20 10] + assert_equal {f e} [r zrevrangebyscore zset 10 0 LIMIT 0 2] + assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 3] + assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 10] + assert_equal {} [r zrevrangebyscore zset 10 0 LIMIT 20 10] + } + + test "ZRANGEBYSCORE with LIMIT and WITHSCORES" { + create_default_zset + assert_equal {e 4 f 5} [r zrangebyscore zset 2 5 LIMIT 2 3 WITHSCORES] + assert_equal {d 3 c 2} [r zrevrangebyscore zset 5 2 LIMIT 2 3 WITHSCORES] + } tags {"slow"} { test {ZRANGEBYSCORE fuzzy test, 100 ranges in 1000 elements sorted set} { @@ -302,30 +335,6 @@ start_server {tags {"zset"}} { } {} } - test {ZRANGEBYSCORE with LIMIT} { - r del zset - r zadd zset 1 a - r zadd zset 2 b - r zadd zset 3 c - r zadd zset 4 d - r zadd zset 5 e - list \ - [r zrangebyscore zset 0 10 LIMIT 0 2] \ - [r zrangebyscore zset 0 10 LIMIT 2 3] \ - [r zrangebyscore zset 0 10 LIMIT 2 10] \ - [r zrangebyscore zset 0 10 LIMIT 20 10] - } {{a b} {c d e} {c d e} {}} - - test {ZRANGEBYSCORE with LIMIT and withscores} { - r del zset - r zadd zset 10 a - r zadd zset 20 b - r zadd zset 30 c - r zadd zset 40 d - r zadd zset 50 e - r zrangebyscore zset 20 50 LIMIT 2 3 withscores - } {d 40 e 50} - test {ZREMRANGEBYSCORE basics} { r del zset r zadd zset 1 a From d433ebc6810b15c21120e502dea3a27fc2a5b348 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Sep 2010 15:36:36 +0200 Subject: [PATCH 2/4] Finished code for sorted set memory efficiency --- src/rdb.c | 9 +++++---- src/t_zset.c | 29 +++++++++++++++-------------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index c15fc6f2..a401a5b9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -730,13 +730,14 @@ robj *rdbLoadObject(int type, FILE *fp) { /* Load every single element of the list/set */ while(zsetlen--) { robj *ele; - double *score = zmalloc(sizeof(double)); + double score; + zskiplistNode *znode; if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (rdbLoadDoubleValue(fp,score) == -1) return NULL; - dictAdd(zs->dict,ele,score); - zslInsert(zs->zsl,*score,ele); + if (rdbLoadDoubleValue(fp,&score) == -1) return NULL; + znode = zslInsert(zs->zsl,score,ele); + dictAdd(zs->dict,ele,&znode->score); incrRefCount(ele); /* added to skiplist */ } } else if (type == REDIS_HASH) { diff --git a/src/t_zset.c b/src/t_zset.c index e528205a..5df8f288 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -663,25 +663,23 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { * from small to large, all src[i > 0].dict are non-empty too */ di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { - double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * zunionInterDictValue(de); + double score, value; + score = src[0].weight * zunionInterDictValue(de); for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { value = src[j].weight * zunionInterDictValue(other); - zunionInterAggregate(score, value, aggregate); + zunionInterAggregate(&score,value,aggregate); } else { break; } } - /* skip entry when not present in every source dict */ - if (j != setnum) { - zfree(score); - } else { + /* Only continue when present in every source dict. */ + if (j == setnum) { robj *o = dictGetEntryKey(de); - znode = zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,score,o); incrRefCount(o); /* added to skiplist */ dictAdd(dstzset->dict,o,&znode->score); incrRefCount(o); /* added to dictionary */ @@ -695,11 +693,14 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[i].dict); while((de = dictNext(di)) != NULL) { - /* skip key when already processed */ - if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; + double score, value; - double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * zunionInterDictValue(de); + /* skip key when already processed */ + if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) + continue; + + /* initialize score */ + score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ @@ -707,12 +708,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { value = src[j].weight * zunionInterDictValue(other); - zunionInterAggregate(score, value, aggregate); + zunionInterAggregate(&score,value,aggregate); } } robj *o = dictGetEntryKey(de); - znode = zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,score,o); incrRefCount(o); /* added to skiplist */ dictAdd(dstzset->dict,o,&znode->score); incrRefCount(o); /* added to dictionary */ From 91504b6cbec2f555a3aa81113372c173adadad66 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 13 Oct 2010 21:43:58 +0200 Subject: [PATCH 3/4] Make ZREMRANGEBYSCORE accept the same range spec as ZRANGEBYSCORE This allows to use inclusive/exclusive bounds for min and max when deleting a range of scores from a sorted set. --- src/t_zset.c | 46 ++++++++++++++------------- tests/unit/type/zset.tcl | 67 +++++++++++++++++++++++++++++----------- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/src/t_zset.c b/src/t_zset.c index 1a67febc..f1a10378 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -174,25 +174,35 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { return 0; /* not found */ } +/* Struct to hold a inclusive/exclusive range spec. */ +typedef struct { + double min, max; + int minex, maxex; /* are min or max exclusive? */ +} zrangespec; + /* Delete all the elements with score between min and max from the skiplist. * Min and mx are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. */ -unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) { +unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned long removed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->level[i].forward && x->level[i].forward->score < min) - x = x->level[i].forward; + while (x->level[i].forward && (range.minex ? + x->level[i].forward->score <= range.min : + x->level[i].forward->score < range.min)) + x = x->level[i].forward; update[i] = x; } - /* We may have multiple elements with the same score, what we need - * is to find the element with both the right score and object. */ + + /* Current node is the last with score < or <= min. */ x = x->level[0].forward; - while (x && x->score <= max) { + + /* Delete nodes while in range. */ + while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); dictDelete(dict,x->obj); @@ -200,7 +210,7 @@ unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict removed++; x = next; } - return removed; /* not found */ + return removed; } /* Delete all the elements with rank between start and end from the skiplist. @@ -296,11 +306,6 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { return NULL; } -typedef struct { - double min, max; - int minex, maxex; /* are min or max exclusive? */ -} zrangespec; - /* Populate the rangespec according to the objects min and max. */ int zslParseRange(robj *min, robj *max, zrangespec *spec) { spec->minex = spec->maxex = 0; @@ -470,20 +475,19 @@ void zremCommand(redisClient *c) { } void zremrangebyscoreCommand(redisClient *c) { - double min; - double max; + zrangespec range; long deleted; - robj *zsetobj; + robj *o; zset *zs; - if ((getDoubleFromObjectOrReply(c, c->argv[2], &min, NULL) != REDIS_OK) || - (getDoubleFromObjectOrReply(c, c->argv[3], &max, NULL) != REDIS_OK)) return; + /* Parse the range arguments. */ + zslParseRange(c->argv[2],c->argv[3],&range); - if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,zsetobj,REDIS_ZSET)) return; + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_ZSET)) return; - zs = zsetobj->ptr; - deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); + zs = o->ptr; + deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); if (deleted) touchWatchedKey(c->db,c->argv[1]); diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 949681eb..47c05607 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -335,25 +335,56 @@ start_server {tags {"zset"}} { } {} } - test {ZREMRANGEBYSCORE basics} { - r del zset - r zadd zset 1 a - r zadd zset 2 b - r zadd zset 3 c - r zadd zset 4 d - r zadd zset 5 e - list [r zremrangebyscore zset 2 4] [r zrange zset 0 -1] - } {3 {a e}} + test "ZREMRANGEBYSCORE basics" { + proc remrangebyscore {min max} { + create_zset zset {1 a 2 b 3 c 4 d 5 e} + r zremrangebyscore zset $min $max + } - test {ZREMRANGEBYSCORE from -inf to +inf} { - r del zset - r zadd zset 1 a - r zadd zset 2 b - r zadd zset 3 c - r zadd zset 4 d - r zadd zset 5 e - list [r zremrangebyscore zset -inf +inf] [r zrange zset 0 -1] - } {5 {}} + # inner range + assert_equal 3 [remrangebyscore 2 4] + assert_equal {a e} [r zrange zset 0 -1] + + # start underflow + assert_equal 1 [remrangebyscore -10 1] + assert_equal {b c d e} [r zrange zset 0 -1] + + # end overflow + assert_equal 1 [remrangebyscore 5 10] + assert_equal {a b c d} [r zrange zset 0 -1] + + # switch min and max + assert_equal 0 [remrangebyscore 4 2] + assert_equal {a b c d e} [r zrange zset 0 -1] + + # -inf to mid + assert_equal 3 [remrangebyscore -inf 3] + assert_equal {d e} [r zrange zset 0 -1] + + # mid to +inf + assert_equal 3 [remrangebyscore 3 +inf] + assert_equal {a b} [r zrange zset 0 -1] + + # -inf to +inf + assert_equal 5 [remrangebyscore -inf +inf] + assert_equal {} [r zrange zset 0 -1] + + # exclusive min + assert_equal 4 [remrangebyscore (1 5] + assert_equal {a} [r zrange zset 0 -1] + assert_equal 3 [remrangebyscore (2 5] + assert_equal {a b} [r zrange zset 0 -1] + + # exclusive max + assert_equal 4 [remrangebyscore 1 (5] + assert_equal {e} [r zrange zset 0 -1] + assert_equal 3 [remrangebyscore 1 (4] + assert_equal {d e} [r zrange zset 0 -1] + + # exclusive min and max + assert_equal 3 [remrangebyscore (1 (5] + assert_equal {a e} [r zrange zset 0 -1] + } test "ZREMRANGEBYRANK basics" { proc remrangebyrank {min max} { From 7236fdb22f7fdb60833c32121b0828f11a13883c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 13 Oct 2010 21:58:21 +0200 Subject: [PATCH 4/4] Return error when min and/or max in the sorted set range spec is not a double --- src/t_zset.c | 25 ++++++++++++++++++------- tests/unit/type/zset.tcl | 12 ++++++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/t_zset.c b/src/t_zset.c index f1a10378..d45e9369 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -307,7 +307,8 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { } /* Populate the rangespec according to the objects min and max. */ -int zslParseRange(robj *min, robj *max, zrangespec *spec) { +static int zslParseRange(robj *min, robj *max, zrangespec *spec) { + char *eptr; spec->minex = spec->maxex = 0; /* Parse the min-max interval. If one of the values is prefixed @@ -318,20 +319,24 @@ int zslParseRange(robj *min, robj *max, zrangespec *spec) { spec->min = (long)min->ptr; } else { if (((char*)min->ptr)[0] == '(') { - spec->min = strtod((char*)min->ptr+1,NULL); + spec->min = strtod((char*)min->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR; spec->minex = 1; } else { - spec->min = strtod((char*)min->ptr,NULL); + spec->min = strtod((char*)min->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR; } } if (max->encoding == REDIS_ENCODING_INT) { spec->max = (long)max->ptr; } else { if (((char*)max->ptr)[0] == '(') { - spec->max = strtod((char*)max->ptr+1,NULL); + spec->max = strtod((char*)max->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR; spec->maxex = 1; } else { - spec->max = strtod((char*)max->ptr,NULL); + spec->max = strtod((char*)max->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR; } } @@ -481,7 +486,10 @@ void zremrangebyscoreCommand(redisClient *c) { zset *zs; /* Parse the range arguments. */ - zslParseRange(c->argv[2],c->argv[3],&range); + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a double"); + return; + } if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_ZSET)) return; @@ -838,7 +846,10 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { void *replylen = NULL; /* Parse the range arguments. */ - zslParseRange(c->argv[2],c->argv[3],&range); + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a double"); + return; + } /* Parse optional extra arguments. Note that ZCOUNT will exactly have * 4 arguments, so we'll never enter the following code path. */ diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 47c05607..6b8fc54a 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -253,6 +253,12 @@ start_server {tags {"zset"}} { assert_equal {d 3 c 2} [r zrevrangebyscore zset 5 2 LIMIT 2 3 WITHSCORES] } + test "ZRANGEBYSCORE with non-value min or max" { + assert_error "*not a double*" {r zrangebyscore fooz str 1} + assert_error "*not a double*" {r zrangebyscore fooz 1 str} + assert_error "*not a double*" {r zrangebyscore fooz 1 NaN} + } + tags {"slow"} { test {ZRANGEBYSCORE fuzzy test, 100 ranges in 1000 elements sorted set} { set err {} @@ -386,6 +392,12 @@ start_server {tags {"zset"}} { assert_equal {a e} [r zrange zset 0 -1] } + test "ZREMRANGEBYSCORE with non-value min or max" { + assert_error "*not a double*" {r zremrangebyscore fooz str 1} + assert_error "*not a double*" {r zremrangebyscore fooz 1 str} + assert_error "*not a double*" {r zremrangebyscore fooz 1 NaN} + } + test "ZREMRANGEBYRANK basics" { proc remrangebyrank {min max} { create_zset zset {1 a 2 b 3 c 4 d 5 e}