diff --git a/src/aof.c b/src/aof.c index 7a80a935..3ab6c85b 100644 --- a/src/aof.c +++ b/src/aof.c @@ -826,7 +826,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { dictEntry *de; while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); + sds ele = dictGetKey(de); if (count == 0) { int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items; @@ -835,7 +835,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkString(r,"SADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } - if (rioWriteBulkObject(r,eleobj) == 0) return 0; + if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; items--; } @@ -892,7 +892,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { dictEntry *de; while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); + sds ele = dictGetKey(de); double *score = dictGetVal(de); if (count == 0) { @@ -904,7 +904,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkDouble(r,*score) == 0) return 0; - if (rioWriteBulkObject(r,eleobj) == 0) return 0; + if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; items--; } diff --git a/src/cluster.c b/src/cluster.c index 4c87a456..6fdc2217 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4087,7 +4087,10 @@ void clusterCommand(client *c) { keys = zmalloc(sizeof(robj*)*maxkeys); numkeys = getKeysInSlot(slot, keys, maxkeys); addReplyMultiBulkLen(c,numkeys); - for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]); + for (j = 0; j < numkeys; j++) { + addReplyBulk(c,keys[j]); + decrRefCount(keys[j]); + } zfree(keys); } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { /* CLUSTER FORGET */ diff --git a/src/db.c b/src/db.c index 92a59082..aa7be75e 100644 --- a/src/db.c +++ b/src/db.c @@ -423,8 +423,8 @@ void scanCallback(void *privdata, const dictEntry *de) { val = dictGetVal(de); incrRefCount(val); } else if (o->type == OBJ_ZSET) { - key = dictGetKey(de); - incrRefCount(key); + sds keysds = dictGetKey(de); + key = createStringObject(keysds,sdslen(keysds)); val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0); } else { serverPanic("Type not handled in SCAN callback."); @@ -1181,14 +1181,13 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) void slotToKeyAdd(robj *key) { unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); - zslInsert(server.cluster->slots_to_keys,hashslot,key); - incrRefCount(key); + sds sdskey = sdsdup(key->ptr); + zslInsert(server.cluster->slots_to_keys,hashslot,sdskey); } void slotToKeyDel(robj *key) { unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); - - zslDelete(server.cluster->slots_to_keys,hashslot,key); + zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL); } void slotToKeyFlush(void) { @@ -1196,6 +1195,9 @@ void slotToKeyFlush(void) { server.cluster->slots_to_keys = zslCreate(); } +/* Pupulate the specified array of objects with keys in the specified slot. + * New objects are returned to represent keys, it's up to the caller to + * decrement the reference count to release the keys names. */ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { zskiplistNode *n; zrangespec range; @@ -1206,7 +1208,7 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun n = zslFirstInRange(server.cluster->slots_to_keys, &range); while(n && n->score == hashslot && count--) { - keys[j++] = n->obj; + keys[j++] = createStringObject(n->ele,sdslen(n->ele)); n = n->level[0].forward; } return j; @@ -1224,9 +1226,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) { n = zslFirstInRange(server.cluster->slots_to_keys, &range); while(n && n->score == hashslot) { - robj *key = n->obj; + sds sdskey = n->ele; + robj *key = createStringObject(sdskey,sdslen(sdskey)); n = n->level[0].forward; /* Go to the next item before freeing it. */ - incrRefCount(key); /* Protect the object while freeing it. */ dbDelete(&server.db[0],key); decrRefCount(key); j++; @@ -1248,7 +1250,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) { /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count = (zsl->length - (rank - 1)); /* Find last element in range */ @@ -1256,7 +1258,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) { /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count -= (zsl->length - rank); } } diff --git a/src/debug.c b/src/debug.c index 61c1c3c8..fb13f798 100644 --- a/src/debug.c +++ b/src/debug.c @@ -163,12 +163,9 @@ void computeDatasetDigest(unsigned char *final) { listTypeReleaseIterator(li); } else if (o->type == OBJ_SET) { setTypeIterator *si = setTypeInitIterator(o); - robj *ele; sds sdsele; while((sdsele = setTypeNextObject(si)) != NULL) { - ele = createObject(OBJ_STRING,sdsele); - xorObjectDigest(digest,ele); - decrRefCount(ele); + xorDigest(digest,sdsele,sdslen(sdsele)); } setTypeReleaseIterator(si); } else if (o->type == OBJ_ZSET) { @@ -210,12 +207,12 @@ void computeDatasetDigest(unsigned char *final) { dictEntry *de; while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); + sds sdsele = dictGetKey(de); double *score = dictGetVal(de); snprintf(buf,sizeof(buf),"%.17g",*score); memset(eledigest,0,20); - mixObjectDigest(eledigest,eleobj); + mixDigest(eledigest,sdsele,sdslen(sdsele)); mixDigest(eledigest,buf,strlen(buf)); xorDigest(digest,eledigest,20); } diff --git a/src/geo.c b/src/geo.c index ff507fe7..d0381ce5 100644 --- a/src/geo.c +++ b/src/geo.c @@ -112,7 +112,7 @@ int extractLongLatOrReply(client *c, robj **argv, int longLatFromMember(robj *zobj, robj *member, double *xy) { double score = 0; - if (zsetScore(zobj, member, &score) == C_ERR) return C_ERR; + if (zsetScore(zobj, member->ptr, &score) == C_ERR) return C_ERR; if (!decodeGeohash(score, xy)) return C_ERR; return C_OK; } @@ -261,16 +261,14 @@ int geoGetPointsInRange(robj *zobj, double min, double max, double lon, double l } while (ln) { - robj *o = ln->obj; + sds ele = ln->ele; /* Abort when the node is no longer in range. */ if (!zslValueLteMax(ln->score, &range)) break; - member = (o->encoding == OBJ_ENCODING_INT) ? - sdsfromlonglong((long)o->ptr) : - sdsdup(o->ptr); - if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,member) - == C_ERR) sdsfree(member); + ele = sdsdup(ele); + if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,ele) + == C_ERR) sdsfree(ele); ln = ln->level[0].forward; } } @@ -606,7 +604,7 @@ void geohashCommand(client *c) { addReplyMultiBulkLen(c,c->argc-2); for (j = 2; j < c->argc; j++) { double score; - if (zsetScore(zobj, c->argv[j], &score) == C_ERR) { + if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) { addReply(c,shared.nullbulk); } else { /* The internal format we use for geocoding is a bit different @@ -660,7 +658,7 @@ void geoposCommand(client *c) { addReplyMultiBulkLen(c,c->argc-2); for (j = 2; j < c->argc; j++) { double score; - if (zsetScore(zobj, c->argv[j], &score) == C_ERR) { + if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) { addReply(c,shared.nullmultibulk); } else { /* Decode... */ @@ -700,8 +698,8 @@ void geodistCommand(client *c) { /* Get the scores. We need both otherwise NULL is returned. */ double score1, score2, xyxy[4]; - if (zsetScore(zobj, c->argv[2], &score1) == C_ERR || - zsetScore(zobj, c->argv[3], &score2) == C_ERR) + if (zsetScore(zobj, c->argv[2]->ptr, &score1) == C_ERR || + zsetScore(zobj, c->argv[3]->ptr, &score2) == C_ERR) { addReply(c,shared.nullbulk); return; diff --git a/src/rdb.c b/src/rdb.c index 3d98a767..61ac354b 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -622,10 +622,11 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); + sds ele = dictGetKey(de); double *score = dictGetVal(de); - if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) + == -1) return -1; nwritten += n; if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1; nwritten += n; @@ -992,7 +993,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { return NULL; if (o->encoding == OBJ_ENCODING_INTSET) { - /* Fetch integer value from element */ + /* Fetch integer value from element. */ if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) { o->ptr = intsetAdd(o->ptr,llval,NULL); } else { @@ -1002,7 +1003,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { } /* This will also be called when the set was just converted - * to a regular hash table encoded set */ + * to a regular hash table encoded set. */ if (o->encoding == OBJ_ENCODING_HT) { dictAdd((dict*)o->ptr,sdsele,NULL); } else { @@ -1010,7 +1011,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { } } } else if (rdbtype == RDB_TYPE_ZSET) { - /* Read list/set value */ + /* Read list/set value. */ size_t zsetlen; size_t maxelelen = 0; zset *zs; @@ -1019,23 +1020,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { o = createZsetObject(); zs = o->ptr; - /* Load every single element of the list/set */ + /* Load every single element of the sorted set. */ while(zsetlen--) { - robj *ele; + sds sdsele; double score; zskiplistNode *znode; - if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; - ele = tryObjectEncoding(ele); + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) + return NULL; if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; /* Don't care about integer-encoded strings. */ - if (sdsEncodedObject(ele) && sdslen(ele->ptr) > maxelelen) - maxelelen = sdslen(ele->ptr); + if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele); - znode = zslInsert(zs->zsl,score,ele); - dictAdd(zs->dict,ele,&znode->score); - incrRefCount(ele); /* added to skiplist */ + znode = zslInsert(zs->zsl,score,sdsele); + dictAdd(zs->dict,sdsele,&znode->score); } /* Convert *after* loading, since sorted sets are not stored ordered. */ diff --git a/src/server.c b/src/server.c index bd45e7d2..bbec0296 100644 --- a/src/server.c +++ b/src/server.c @@ -554,11 +554,11 @@ dictType setDictType = { /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ dictType zsetDictType = { - dictEncObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictEncObjKeyCompare, /* key compare */ - dictObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + NULL, /* Note: SDS string shared & freed by skiplist */ NULL /* val destructor */ }; @@ -1428,8 +1428,8 @@ void createSharedObjects(void) { * actually used for their value but as a special object meaning * respectively the minimum possible string and the maximum possible * string in string comparisons for the ZRANGEBYLEX command. */ - shared.minstring = createStringObject("minstring",9); - shared.maxstring = createStringObject("maxstring",9); + shared.minstring = sdsnew("minstring"); + shared.maxstring = sdsnew("maxstring"); } void initServerConfig(void) { diff --git a/src/server.h b/src/server.h index 50899f8f..ff5d6432 100644 --- a/src/server.h +++ b/src/server.h @@ -612,16 +612,17 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop, - *lpush, *emptyscan, *minstring, *maxstring, + *lpush, *emptyscan, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ *bulkhdr[OBJ_SHARED_BULKHDR_LEN]; /* "$\r\n" */ + sds minstring, maxstring; }; /* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { - robj *obj; + sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { @@ -1261,15 +1262,15 @@ typedef struct { /* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */ typedef struct { - robj *min, *max; /* May be set to shared.(minstring|maxstring) */ + sds min, max; /* May be set to shared.(minstring|maxstring) */ int minex, maxex; /* are min or max exclusive? */ } zlexrangespec; zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); -zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); -unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score); -int zslDelete(zskiplist *zsl, double score, robj *obj); +zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele); +unsigned char *zzlInsert(unsigned char *zl, sds ele, double score); +int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node); zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range); zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range); double zzlGetScore(unsigned char *sptr); @@ -1277,8 +1278,8 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); unsigned int zsetLength(robj *zobj); void zsetConvert(robj *zobj, int encoding); -int zsetScore(robj *zobj, robj *member, double *score); -unsigned long zslGetRank(zskiplist *zsl, double score, robj *o); +int zsetScore(robj *zobj, sds member, double *score); +unsigned long zslGetRank(zskiplist *zsl, double score, sds o); /* Core functions */ int freeMemoryIfNeeded(void); diff --git a/src/sort.c b/src/sort.c index af185dbd..536302bd 100644 --- a/src/sort.c +++ b/src/sort.c @@ -399,7 +399,7 @@ void sortCommand(client *c) { zset *zs = sortval->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; - robj *ele; + sds sdsele; int rangelen = vectorlen; /* Check if starting point is trivial, before doing log(N) lookup. */ @@ -417,8 +417,8 @@ void sortCommand(client *c) { while(rangelen--) { serverAssertWithInfo(c,sortval,ln != NULL); - ele = ln->obj; - vector[j].obj = ele; + sdsele = ln->ele; + vector[j].obj = createStringObject(sdsele,sdslen(sdsele)); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; @@ -431,9 +431,11 @@ void sortCommand(client *c) { dict *set = ((zset*)sortval->ptr)->dict; dictIterator *di; dictEntry *setele; + sds sdsele; di = dictGetIterator(set); while((setele = dictNext(di)) != NULL) { - vector[j].obj = dictGetKey(setele); + sdsele = dictGetKey(setele); + vector[j].obj = createStringObject(sdsele,sdslen(sdsele)); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; diff --git a/src/t_zset.c b/src/t_zset.c index 772b06ef..84bea5aa 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -38,9 +38,16 @@ * * The elements are added to a hash table mapping Redis objects to scores. * At the same time the elements are added to a skip list mapping scores - * to Redis objects (so objects are sorted by scores in this "view"). */ - -/* This skiplist implementation is almost a C translation of the original + * to Redis objects (so objects are sorted by scores in this "view"). + * + * Note that the SDS string representing the element is the same in both + * the hash table and skiplist in order to save memory. What we do in order + * to manage the shared SDS string more easily is to free the SDS string + * only in zslFreeNode(). The dictionary has no value free method set. + * So we should always remove an element from the dictionary, and later from + * the skiplist. + * + * This skiplist implementation is almost a C translation of the original * algorithm described by William Pugh in "Skip Lists: A Probabilistic * Alternative to Balanced Trees", modified in three ways: * a) this implementation allows for repeated scores. @@ -52,16 +59,24 @@ #include "server.h" #include -static int zslLexValueGteMin(robj *value, zlexrangespec *spec); -static int zslLexValueLteMax(robj *value, zlexrangespec *spec); +/*----------------------------------------------------------------------------- + * Skiplist implementation of the low level API + *----------------------------------------------------------------------------*/ -zskiplistNode *zslCreateNode(int level, double score, robj *obj) { - zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); +static int zslLexValueGteMin(sds value, zlexrangespec *spec); +static int zslLexValueLteMax(sds value, zlexrangespec *spec); + +/* Create a skiplist node with the specified number of levels. + * The SDS string 'ele' is referenced by the node after the call. */ +zskiplistNode *zslCreateNode(int level, double score, sds ele) { + zskiplistNode *zn = + zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; - zn->obj = obj; + zn->ele = ele; return zn; } +/* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; @@ -79,11 +94,15 @@ zskiplist *zslCreate(void) { return zsl; } +/* Free the specified skiplist node. The referenced SDS string representation + * of the element is freed too, unless node->ele is set to NULL before calling + * this function. */ void zslFreeNode(zskiplistNode *node) { - decrRefCount(node->obj); + sdsfree(node->ele); zfree(node); } +/* Free a whole skiplist. */ void zslFree(zskiplist *zsl) { zskiplistNode *node = zsl->header->level[0].forward, *next; @@ -107,7 +126,10 @@ int zslRandomLevel(void) { return (levellevel-1) ? 0 : rank[i+1]; while (x->level[i].forward && - (x->level[i].forward->score < score || - (x->level[i].forward->score == score && - compareStringObjects(x->level[i].forward->obj,obj) < 0))) { + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) < 0))) + { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } - /* we assume the key is not already inside, since we allow duplicated - * scores, and the re-insertion of score and redis object should never - * happen since the caller of zslInsert() should test in the hash table - * if the element is already inside or not. */ + /* we assume the element is not already inside, since we allow duplicated + * scores, reinserting the same element should never happen since the + * caller of zslInsert() should test in the hash table if the element is + * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { @@ -139,7 +162,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) { } zsl->level = level; } - x = zslCreateNode(level,score,obj); + x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; @@ -184,26 +207,38 @@ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { zsl->length--; } -/* Delete an element with matching score/object from the skiplist. */ -int zslDelete(zskiplist *zsl, double score, robj *obj) { +/* Delete an element with matching score/element from the skiplist. + * The function returns 1 if the node was found and deleted, otherwise + * 0 is returned. + * + * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise + * it is not freed (but just unlinked) and *node is set to the node pointer, + * so that it is possible for the caller to reuse the node (including the + * referenced SDS string at node->ele). */ +int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && - (x->level[i].forward->score < score || - (x->level[i].forward->score == score && - compareStringObjects(x->level[i].forward->obj,obj) < 0))) + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) < 0))) + { x = x->level[i].forward; + } update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->level[0].forward; - if (x && score == x->score && equalStringObjects(x->obj,obj)) { + if (x && score == x->score && sdscmp(x->ele,ele) == 0) { zslDeleteNode(zsl, x, update); - zslFreeNode(x); + if (!node) + zslFreeNode(x); + else + *node = x; return 1; } return 0; /* not found */ @@ -312,8 +347,8 @@ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dic { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); - dictDelete(dict,x->obj); - zslFreeNode(x); + dictDelete(dict,x->ele); + zslFreeNode(x); /* Here is where x->ele is actually released. */ removed++; x = next; } @@ -329,7 +364,7 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && - !zslLexValueGteMin(x->level[i].forward->obj,range)) + !zslLexValueGteMin(x->level[i].forward->ele,range)) x = x->level[i].forward; update[i] = x; } @@ -338,11 +373,11 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di x = x->level[0].forward; /* Delete nodes while in range. */ - while (x && zslLexValueLteMax(x->obj,range)) { + while (x && zslLexValueLteMax(x->ele,range)) { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); - dictDelete(dict,x->obj); - zslFreeNode(x); + dictDelete(dict,x->ele); + zslFreeNode(x); /* Here is where x->ele is actually released. */ removed++; x = next; } @@ -370,7 +405,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned while (x && traversed <= end) { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); - dictDelete(dict,x->obj); + dictDelete(dict,x->ele); zslFreeNode(x); removed++; traversed++; @@ -383,7 +418,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ -unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { +unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { zskiplistNode *x; unsigned long rank = 0; int i; @@ -393,13 +428,13 @@ unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && - compareStringObjects(x->level[i].forward->obj,o) <= 0))) { + sdscmp(x->level[i].forward->ele,ele) <= 0))) { rank += x->level[i].span; x = x->level[i].forward; } /* x might be equal to zsl->header, so test if obj is non-NULL */ - if (x->obj && equalStringObjects(x->obj,o)) { + if (x->ele && sdscmp(x->ele,ele) == 0) { return rank; } } @@ -478,7 +513,7 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) { * * If the string is not a valid range C_ERR is returned, and the value * of *dest and *ex is undefined. */ -int zslParseLexRangeItem(robj *item, robj **dest, int *ex) { +int zslParseLexRangeItem(robj *item, sds *dest, int *ex) { char *c = item->ptr; switch(c[0]) { @@ -486,27 +521,34 @@ int zslParseLexRangeItem(robj *item, robj **dest, int *ex) { if (c[1] != '\0') return C_ERR; *ex = 0; *dest = shared.maxstring; - incrRefCount(shared.maxstring); return C_OK; case '-': if (c[1] != '\0') return C_ERR; *ex = 0; *dest = shared.minstring; - incrRefCount(shared.minstring); return C_OK; case '(': *ex = 1; - *dest = createStringObject(c+1,sdslen(c)-1); + *dest = sdsnewlen(c+1,sdslen(c)-1); return C_OK; case '[': *ex = 0; - *dest = createStringObject(c+1,sdslen(c)-1); + *dest = sdsnewlen(c+1,sdslen(c)-1); return C_OK; default: return C_ERR; } } +/* Free a lex range structure, must be called only after zelParseLexRange() + * populated the structure with success (C_OK returned). */ +void zslFreeLexRange(zlexrangespec *spec) { + if (spec->min != shared.minstring && + spec->min != shared.maxstring) sdsfree(spec->min); + if (spec->max != shared.minstring && + spec->max != shared.maxstring) sdsfree(spec->max); +} + /* Populate the rangespec according to the objects min and max. * * Return C_OK on success. On error C_ERR is returned. @@ -521,42 +563,33 @@ static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) { spec->min = spec->max = NULL; if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR || zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) { - if (spec->min) decrRefCount(spec->min); - if (spec->max) decrRefCount(spec->max); + zslFreeLexRange(spec); return C_ERR; } else { return C_OK; } } -/* Free a lex range structure, must be called only after zelParseLexRange() - * populated the structure with success (C_OK returned). */ -void zslFreeLexRange(zlexrangespec *spec) { - decrRefCount(spec->min); - decrRefCount(spec->max); -} - -/* This is just a wrapper to compareStringObjects() that is able to +/* This is just a wrapper to sdscmp() that is able to * handle shared.minstring and shared.maxstring as the equivalent of * -inf and +inf for strings */ -int compareStringObjectsForLexRange(robj *a, robj *b) { - if (a == b) return 0; /* This makes sure that we handle inf,inf and - -inf,-inf ASAP. One special case less. */ +int sdscmplex(sds a, sds b) { + if (a == b) return 0; if (a == shared.minstring || b == shared.maxstring) return -1; if (a == shared.maxstring || b == shared.minstring) return 1; - return compareStringObjects(a,b); + return sdscmp(a,b); } -static int zslLexValueGteMin(robj *value, zlexrangespec *spec) { +static int zslLexValueGteMin(sds value, zlexrangespec *spec) { return spec->minex ? - (compareStringObjectsForLexRange(value,spec->min) > 0) : - (compareStringObjectsForLexRange(value,spec->min) >= 0); + (sdscmplex(value,spec->min) > 0) : + (sdscmplex(value,spec->min) >= 0); } -static int zslLexValueLteMax(robj *value, zlexrangespec *spec) { +static int zslLexValueLteMax(sds value, zlexrangespec *spec) { return spec->maxex ? - (compareStringObjectsForLexRange(value,spec->max) < 0) : - (compareStringObjectsForLexRange(value,spec->max) <= 0); + (sdscmplex(value,spec->max) < 0) : + (sdscmplex(value,spec->max) <= 0); } /* Returns if there is a part of the zset is in the lex range. */ @@ -564,15 +597,15 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ - if (compareStringObjectsForLexRange(range->min,range->max) > 1 || - (compareStringObjects(range->min,range->max) == 0 && + if (sdscmplex(range->min,range->max) > 1 || + (sdscmp(range->min,range->max) == 0 && (range->minex || range->maxex))) return 0; x = zsl->tail; - if (x == NULL || !zslLexValueGteMin(x->obj,range)) + if (x == NULL || !zslLexValueGteMin(x->ele,range)) return 0; x = zsl->header->level[0].forward; - if (x == NULL || !zslLexValueLteMax(x->obj,range)) + if (x == NULL || !zslLexValueLteMax(x->ele,range)) return 0; return 1; } @@ -590,7 +623,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) { for (i = zsl->level-1; i >= 0; i--) { /* Go forward while *OUT* of range. */ while (x->level[i].forward && - !zslLexValueGteMin(x->level[i].forward->obj,range)) + !zslLexValueGteMin(x->level[i].forward->ele,range)) x = x->level[i].forward; } @@ -599,7 +632,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) { serverAssert(x != NULL); /* Check if score <= max. */ - if (!zslLexValueLteMax(x->obj,range)) return NULL; + if (!zslLexValueLteMax(x->ele,range)) return NULL; return x; } @@ -616,7 +649,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) { for (i = zsl->level-1; i >= 0; i--) { /* Go forward while *IN* range. */ while (x->level[i].forward && - zslLexValueLteMax(x->level[i].forward->obj,range)) + zslLexValueLteMax(x->level[i].forward->ele,range)) x = x->level[i].forward; } @@ -624,7 +657,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) { serverAssert(x != NULL); /* Check if score >= min. */ - if (!zslLexValueGteMin(x->obj,range)) return NULL; + if (!zslLexValueGteMin(x->ele,range)) return NULL; return x; } @@ -653,10 +686,8 @@ double zzlGetScore(unsigned char *sptr) { return score; } -/* Return a ziplist element as a Redis string object. - * This simple abstraction can be used to simplifies some code at the - * cost of some performance. */ -robj *ziplistGetObject(unsigned char *sptr) { +/* Return a ziplist element as an SDS string. */ +sds ziplistGetObject(unsigned char *sptr) { unsigned char *vstr; unsigned int vlen; long long vlong; @@ -665,9 +696,9 @@ robj *ziplistGetObject(unsigned char *sptr) { serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong)); if (vstr) { - return createStringObject((char*)vstr,vlen); + return sdsnewlen((char*)vstr,vlen); } else { - return createStringObjectFromLongLong(vlong); + return sdsfromlonglong(vlong); } } @@ -822,16 +853,16 @@ unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) { } static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) { - robj *value = ziplistGetObject(p); + sds value = ziplistGetObject(p); int res = zslLexValueGteMin(value,spec); - decrRefCount(value); + sdsfree(value); return res; } static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) { - robj *value = ziplistGetObject(p); + sds value = ziplistGetObject(p); int res = zslLexValueLteMax(value,spec); - decrRefCount(value); + sdsfree(value); return res; } @@ -841,8 +872,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) { unsigned char *p; /* Test for ranges that will always be empty. */ - if (compareStringObjectsForLexRange(range->min,range->max) > 1 || - (compareStringObjects(range->min,range->max) == 0 && + if (sdscmplex(range->min,range->max) > 1 || + (sdscmp(range->min,range->max) == 0 && (range->minex || range->maxex))) return 0; @@ -912,26 +943,22 @@ unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) { return NULL; } -unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) { +unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; - ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(NULL,ele,sptr != NULL); + serverAssert(sptr != NULL); - if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) { + if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) { /* Matching element, pull out score. */ if (score != NULL) *score = zzlGetScore(sptr); - decrRefCount(ele); return eptr; } /* Move to next element. */ eptr = ziplistNext(zl,sptr); } - - decrRefCount(ele); return NULL; } @@ -946,41 +973,38 @@ unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { return zl; } -unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) { +unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; size_t offset; - serverAssertWithInfo(NULL,ele,sdsEncodedObject(ele)); scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { - zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL); + zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL); zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL); } else { /* Keep offset relative to zl, as it might be re-allocated. */ offset = eptr-zl; - zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr)); + zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele)); eptr = zl+offset; /* Insert score after the element. */ - serverAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL); + serverAssert((sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } - return zl; } /* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */ -unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { +unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; - ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(NULL,ele,sptr != NULL); + serverAssert(sptr != NULL); s = zzlGetScore(sptr); if (s > score) { @@ -991,7 +1015,7 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ - if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) { + if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { zl = zzlInsertAt(zl,eptr,ele,score); break; } @@ -1004,8 +1028,6 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { /* Push on tail of list when it was not yet inserted. */ if (eptr == NULL) zl = zzlInsertAt(zl,NULL,ele,score); - - decrRefCount(ele); return zl; } @@ -1093,7 +1115,7 @@ unsigned int zsetLength(robj *zobj) { void zsetConvert(robj *zobj, int encoding) { zset *zs; zskiplistNode *node, *next; - robj *ele; + sds ele; double score; if (zobj->encoding == encoding) return; @@ -1120,14 +1142,12 @@ void zsetConvert(robj *zobj, int encoding) { score = zzlGetScore(sptr); serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) - ele = createStringObjectFromLongLong(vlong); + ele = sdsfromlonglong(vlong); else - ele = createStringObject((char*)vstr,vlen); + ele = sdsnewlen((char*)vstr,vlen); - /* Has incremented refcount since it was just created. */ node = zslInsert(zs->zsl,score,ele); - serverAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK); - incrRefCount(ele); /* Added to dictionary. */ + serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); zzlNext(zl,&eptr,&sptr); } @@ -1149,10 +1169,7 @@ void zsetConvert(robj *zobj, int encoding) { zfree(zs->zsl); while (node) { - ele = getDecodedObject(node->obj); - zl = zzlInsertAt(zl,NULL,ele,node->score); - decrRefCount(ele); - + zl = zzlInsertAt(zl,NULL,node->ele,node->score); next = node->level[0].forward; zslFreeNode(node); node = next; @@ -1170,7 +1187,7 @@ void zsetConvert(robj *zobj, int encoding) { * storing it into *score. If the element does not exist C_ERR is returned * otherwise C_OK is returned and *score is correctly populated. * If 'zobj' or 'member' is NULL, C_ERR is returned. */ -int zsetScore(robj *zobj, robj *member, double *score) { +int zsetScore(robj *zobj, sds member, double *score) { if (!zobj || !member) return C_ERR; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { @@ -1199,9 +1216,8 @@ int zsetScore(robj *zobj, robj *member, double *score) { void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; - robj *ele; robj *zobj; - robj *curobj; + sds ele; double score = 0, *scores = NULL, curscore = 0.0; int j, elements; int scoreidx = 0; @@ -1285,11 +1301,10 @@ void zaddGenericCommand(client *c, int flags) { for (j = 0; j < elements; j++) { score = scores[j]; + ele = c->argv[scoreidx+1+j*2]->ptr; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; - /* Prefer non-encoded element when dealing with ziplists. */ - ele = c->argv[scoreidx+1+j*2]; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (nx) continue; if (incr) { @@ -1314,7 +1329,7 @@ void zaddGenericCommand(client *c, int flags) { zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); - if (sdslen(ele->ptr) > server.zset_max_ziplist_value) + if (sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); server.dirty++; added++; @@ -1325,12 +1340,9 @@ void zaddGenericCommand(client *c, int flags) { zskiplistNode *znode; dictEntry *de; - ele = c->argv[scoreidx+1+j*2] = - tryObjectEncoding(c->argv[scoreidx+1+j*2]); de = dictFind(zs->dict,ele); if (de != NULL) { if (nx) continue; - curobj = dictGetKey(de); curscore = *(double*)dictGetVal(de); if (incr) { @@ -1343,23 +1355,27 @@ void zaddGenericCommand(client *c, int flags) { } } - /* Remove and re-insert when score changed. We can safely - * delete the key object from the skiplist, since the - * dictionary still has a reference to it. */ + /* Remove and re-insert when score changes. */ if (score != curscore) { - serverAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj)); - znode = zslInsert(zs->zsl,score,curobj); - incrRefCount(curobj); /* Re-inserted in skiplist. */ + zskiplistNode *node; + serverAssert(zslDelete(zs->zsl,curscore,ele,&node)); + znode = zslInsert(zs->zsl,score,node->ele); + /* We reused the node->ele SDS string, free the node now + * since zslInsert created a new one. */ + node->ele = NULL; + zslFreeNode(node); + /* Note that we did not removed the original element from + * the hash table representing the sorted set, so we just + * update the score. */ dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } processed++; } else if (!xx) { + ele = sdsdup(ele); znode = zslInsert(zs->zsl,score,ele); - incrRefCount(ele); /* Inserted in skiplist. */ - serverAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK); - incrRefCount(ele); /* Added to dictionary. */ + serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); server.dirty++; added++; processed++; @@ -1408,7 +1424,7 @@ void zremCommand(client *c) { unsigned char *eptr; for (j = 2; j < c->argc; j++) { - if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) { + if ((eptr = zzlFind(zobj->ptr,c->argv[j]->ptr,NULL)) != NULL) { deleted++; zobj->ptr = zzlDelete(zobj->ptr,eptr); if (zzlLength(zobj->ptr) == 0) { @@ -1424,16 +1440,26 @@ void zremCommand(client *c) { double score; for (j = 2; j < c->argc; j++) { - de = dictFind(zs->dict,c->argv[j]); + de = dictFind(zs->dict,c->argv[j]->ptr); if (de != NULL) { deleted++; - /* Delete from the skiplist */ + /* Get the score in order to delete from the skiplist later. */ score = *(double*)dictGetVal(de); - serverAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j])); - /* Delete from the hash table */ - dictDelete(zs->dict,c->argv[j]); + /* Delete from the hash table and later from the skiplist. + * Note that the order is important: deleting from the skiplist + * actually releases the SDS string representing the element, + * which is shared between the skiplist and the hash table, so + * we need to delete from the skiplist as the final step. */ + int retval1 = dictDelete(zs->dict,c->argv[j]->ptr); + + /* Delete from skiplist. */ + int retval2 = zslDelete(zs->zsl,score,c->argv[j]->ptr,NULL); + + serverAssertWithInfo(c,c->argv[j], + retval1 == DICT_OK && retval2); + if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) { dbDelete(c->db,key); @@ -1613,7 +1639,7 @@ typedef struct { * we already checked that "ell" holds a long long, or tried to convert another * representation into a long long value. When this was successful, * OPVAL_VALID_LL is set as well. */ -#define OPVAL_DIRTY_ROBJ 1 +#define OPVAL_DIRTY_SDS 1 #define OPVAL_DIRTY_LL 2 #define OPVAL_VALID_LL 4 @@ -1621,7 +1647,7 @@ typedef struct { typedef struct { int flags; unsigned char _buf[32]; /* Private buffer. */ - robj *ele; + sds ele; unsigned char *estr; unsigned int elen; long long ell; @@ -1728,8 +1754,8 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { if (op->subject == NULL) return 0; - if (val->flags & OPVAL_DIRTY_ROBJ) - decrRefCount(val->ele); + if (val->flags & OPVAL_DIRTY_SDS) + sdsfree(val->ele); memset(val,0,sizeof(zsetopval)); @@ -1770,7 +1796,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { if (it->sl.node == NULL) return 0; - val->ele = it->sl.node->obj; + val->ele = it->sl.node->ele; val->score = it->sl.node->score; /* Move to next element. */ @@ -1789,15 +1815,8 @@ int zuiLongLongFromValue(zsetopval *val) { val->flags |= OPVAL_DIRTY_LL; if (val->ele != NULL) { - if (val->ele->encoding == OBJ_ENCODING_INT) { - val->ell = (long)val->ele->ptr; + if (string2ll(val->ele,sdslen(val->ele),&val->ell)) val->flags |= OPVAL_VALID_LL; - } else if (sdsEncodedObject(val->ele)) { - if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell)) - val->flags |= OPVAL_VALID_LL; - } else { - serverPanic("Unsupported element encoding"); - } } else if (val->estr != NULL) { if (string2ll((char*)val->estr,val->elen,&val->ell)) val->flags |= OPVAL_VALID_LL; @@ -1809,30 +1828,41 @@ int zuiLongLongFromValue(zsetopval *val) { return val->flags & OPVAL_VALID_LL; } -robj *zuiObjectFromValue(zsetopval *val) { +sds zuiSdsFromValue(zsetopval *val) { if (val->ele == NULL) { if (val->estr != NULL) { - val->ele = createStringObject((char*)val->estr,val->elen); + val->ele = sdsnewlen((char*)val->estr,val->elen); } else { - val->ele = createStringObjectFromLongLong(val->ell); + val->ele = sdsfromlonglong(val->ell); } - val->flags |= OPVAL_DIRTY_ROBJ; + val->flags |= OPVAL_DIRTY_SDS; } return val->ele; } +/* This is different from zuiSdsFromValue since returns a new SDS string + * which is up to the caller to free. */ +sds zuiNewSdsFromValue(zsetopval *val) { + if (val->flags & OPVAL_DIRTY_SDS) { + /* We have already one to return! */ + sds ele = val->ele; + val->flags &= ~OPVAL_DIRTY_SDS; + val->ele = NULL; + return ele; + } else if (val->ele) { + return sdsdup(val->ele); + } else if (val->estr) { + return sdsnewlen((char*)val->estr,val->elen); + } else { + return sdsfromlonglong(val->ell); + } +} + int zuiBufferFromValue(zsetopval *val) { if (val->estr == NULL) { if (val->ele != NULL) { - if (val->ele->encoding == OBJ_ENCODING_INT) { - val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr); - val->estr = val->_buf; - } else if (sdsEncodedObject(val->ele)) { - val->elen = sdslen(val->ele->ptr); - val->estr = val->ele->ptr; - } else { - serverPanic("Unsupported element encoding"); - } + val->elen = sdslen(val->ele); + val->estr = (unsigned char*)val->ele; } else { val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell); val->estr = val->_buf; @@ -1859,7 +1889,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { } } else if (op->encoding == OBJ_ENCODING_HT) { dict *ht = op->subject->ptr; - zuiObjectFromValue(val); + zuiSdsFromValue(val); if (dictFind(ht,val->ele) != NULL) { *score = 1.0; return 1; @@ -1870,7 +1900,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { serverPanic("Unknown set encoding"); } } else if (op->type == OBJ_ZSET) { - zuiObjectFromValue(val); + zuiSdsFromValue(val); if (op->encoding == OBJ_ENCODING_ZIPLIST) { if (zzlFind(op->subject->ptr,val->ele,score) != NULL) { @@ -1922,13 +1952,25 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } } +unsigned int dictSdsHash(const void *key); +int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2); + +dictType setAccumulatorDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL /* val destructor */ +}; + void zunionInterGenericCommand(client *c, robj *dstkey, int op) { int i, j; long setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; zsetopval zval; - robj *tmp; + sds tmp; unsigned int maxelelen = 0; robj *dstobj; zset *dstzset; @@ -1978,7 +2020,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && + !strcasecmp(c->argv[j]->ptr,"weights")) + { j++; remaining--; for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, @@ -1988,7 +2032,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { return; } } - } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) { + } else if (remaining >= 2 && + !strcasecmp(c->argv[j]->ptr,"aggregate")) + { j++; remaining--; if (!strcasecmp(c->argv[j]->ptr,"sum")) { aggregate = REDIS_AGGR_SUM; @@ -2046,22 +2092,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { /* Only continue when present in every input. */ if (j == setnum) { - tmp = zuiObjectFromValue(&zval); + tmp = zuiNewSdsFromValue(&zval); znode = zslInsert(dstzset->zsl,score,tmp); - incrRefCount(tmp); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); - incrRefCount(tmp); /* added to dictionary */ - - if (sdsEncodedObject(tmp)) { - if (sdslen(tmp->ptr) > maxelelen) - maxelelen = sdslen(tmp->ptr); - } + if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp); } } zuiClearIterator(&src[0]); } } else if (op == SET_OP_UNION) { - dict *accumulator = dictCreate(&setDictType,NULL); + dict *accumulator = dictCreate(&setAccumulatorDictType,NULL); dictIterator *di; dictEntry *de; double score; @@ -2084,20 +2124,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { if (isnan(score)) score = 0; /* Search for this element in the accumulating dictionary. */ - de = dictFind(accumulator,zuiObjectFromValue(&zval)); + de = dictFind(accumulator,zuiSdsFromValue(&zval)); /* If we don't have it, we need to create a new entry. */ if (de == NULL) { - tmp = zuiObjectFromValue(&zval); + tmp = zuiNewSdsFromValue(&zval); /* Remember the longest single element encountered, * to understand if it's possible to convert to ziplist * at the end. */ - if (sdsEncodedObject(tmp)) { - if (sdslen(tmp->ptr) > maxelelen) - maxelelen = sdslen(tmp->ptr); - } + if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp); /* Add the element with its initial score. */ de = dictAddRaw(accumulator,tmp); - incrRefCount(tmp); dictSetDoubleVal(de,score); } else { /* Update the score with the score of the new instance @@ -2121,16 +2157,12 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { dictExpand(dstzset->dict,dictSize(accumulator)); while((de = dictNext(di)) != NULL) { - robj *ele = dictGetKey(de); + sds ele = dictGetKey(de); score = dictGetDoubleVal(de); znode = zslInsert(dstzset->zsl,score,ele); - incrRefCount(ele); /* added to skiplist */ dictAdd(dstzset->dict,ele,&znode->score); - incrRefCount(ele); /* added to dictionary */ } dictReleaseIterator(di); - - /* We can free the accumulator dictionary now. */ dictRelease(accumulator); } else { serverPanic("Unknown operator"); @@ -2247,7 +2279,7 @@ void zrangeGenericCommand(client *c, int reverse) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; - robj *ele; + sds ele; /* Check if starting point is trivial, before doing log(N) lookup. */ if (reverse) { @@ -2262,8 +2294,8 @@ void zrangeGenericCommand(client *c, int reverse) { while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); - ele = ln->obj; - addReplyBulk(c,ele); + ele = ln->ele; + addReplyBulkCBuffer(c,ele,sdslen(ele)); if (withscores) addReplyDouble(c,ln->score); ln = reverse ? ln->backward : ln->level[0].forward; @@ -2317,8 +2349,13 @@ void genericZrangebyscoreCommand(client *c, int reverse) { pos++; remaining--; withscores = 1; } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { - if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) || - (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return; + if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) + != C_OK) || + (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) + != C_OK)) + { + return; + } pos += 3; remaining -= 3; } else { addReply(c,shared.syntaxerr); @@ -2444,7 +2481,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) { } rangelen++; - addReplyBulk(c,ln->obj); + addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele)); if (withscores) { addReplyDouble(c,ln->score); @@ -2534,7 +2571,7 @@ void zcountCommand(client *c) { /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count = (zsl->length - (rank - 1)); /* Find last element in range */ @@ -2542,7 +2579,7 @@ void zcountCommand(client *c) { /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count -= (zsl->length - rank); } } @@ -2612,7 +2649,7 @@ void zlexcountCommand(client *c) { /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count = (zsl->length - (rank - 1)); /* Find last element in range */ @@ -2620,7 +2657,7 @@ void zlexcountCommand(client *c) { /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->obj); + rank = zslGetRank(zsl, zn->score, zn->ele); count -= (zsl->length - rank); } } @@ -2786,13 +2823,13 @@ void genericZrangebylexCommand(client *c, int reverse) { while (ln && limit--) { /* Abort when the node is no longer in range. */ if (reverse) { - if (!zslLexValueGteMin(ln->obj,&range)) break; + if (!zslLexValueGteMin(ln->ele,&range)) break; } else { - if (!zslLexValueLteMax(ln->obj,&range)) break; + if (!zslLexValueLteMax(ln->ele,&range)) break; } rangelen++; - addReplyBulk(c,ln->obj); + addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele)); /* Move to next node */ if (reverse) { @@ -2835,7 +2872,7 @@ void zscoreCommand(client *c) { if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; - if (zsetScore(zobj,c->argv[2],&score) == C_ERR) { + if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) { addReply(c,shared.nullbulk); } else { addReplyDouble(c,score); @@ -2887,11 +2924,12 @@ void zrankGenericCommand(client *c, int reverse) { double score; ele = c->argv[2]; - de = dictFind(zs->dict,ele); + de = dictFind(zs->dict,ele->ptr); if (de != NULL) { score = *(double*)dictGetVal(de); - rank = zslGetRank(zsl,score,ele); - serverAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */ + rank = zslGetRank(zsl,score,ele->ptr); + /* Existing elements always have a rank. */ + serverAssertWithInfo(c,ele,rank); if (reverse) addReplyLongLong(c,llen-rank); else