mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 08:30:51 +00:00
Lazyfree: Sorted sets convereted to plain SDS. (several commits squashed)
This commit is contained in:
parent
86d48efbfd
commit
a7c5be18a8
@ -826,7 +826,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
|
||||
dictEntry *de;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *eleobj = dictGetKey(de);
|
||||
sds ele = dictGetKey(de);
|
||||
if (count == 0) {
|
||||
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
|
||||
AOF_REWRITE_ITEMS_PER_CMD : items;
|
||||
@ -835,7 +835,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
|
||||
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
}
|
||||
if (rioWriteBulkObject(r,eleobj) == 0) return 0;
|
||||
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
|
||||
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
|
||||
items--;
|
||||
}
|
||||
@ -892,7 +892,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
|
||||
dictEntry *de;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *eleobj = dictGetKey(de);
|
||||
sds ele = dictGetKey(de);
|
||||
double *score = dictGetVal(de);
|
||||
|
||||
if (count == 0) {
|
||||
@ -904,7 +904,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
}
|
||||
if (rioWriteBulkDouble(r,*score) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,eleobj) == 0) return 0;
|
||||
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
|
||||
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
|
||||
items--;
|
||||
}
|
||||
|
@ -4087,7 +4087,10 @@ void clusterCommand(client *c) {
|
||||
keys = zmalloc(sizeof(robj*)*maxkeys);
|
||||
numkeys = getKeysInSlot(slot, keys, maxkeys);
|
||||
addReplyMultiBulkLen(c,numkeys);
|
||||
for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
addReplyBulk(c,keys[j]);
|
||||
decrRefCount(keys[j]);
|
||||
}
|
||||
zfree(keys);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
|
||||
/* CLUSTER FORGET <NODE ID> */
|
||||
|
24
src/db.c
24
src/db.c
@ -423,8 +423,8 @@ void scanCallback(void *privdata, const dictEntry *de) {
|
||||
val = dictGetVal(de);
|
||||
incrRefCount(val);
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
key = dictGetKey(de);
|
||||
incrRefCount(key);
|
||||
sds keysds = dictGetKey(de);
|
||||
key = createStringObject(keysds,sdslen(keysds));
|
||||
val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
|
||||
} else {
|
||||
serverPanic("Type not handled in SCAN callback.");
|
||||
@ -1181,14 +1181,13 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||
void slotToKeyAdd(robj *key) {
|
||||
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
|
||||
|
||||
zslInsert(server.cluster->slots_to_keys,hashslot,key);
|
||||
incrRefCount(key);
|
||||
sds sdskey = sdsdup(key->ptr);
|
||||
zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
|
||||
}
|
||||
|
||||
void slotToKeyDel(robj *key) {
|
||||
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
|
||||
|
||||
zslDelete(server.cluster->slots_to_keys,hashslot,key);
|
||||
zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
|
||||
}
|
||||
|
||||
void slotToKeyFlush(void) {
|
||||
@ -1196,6 +1195,9 @@ void slotToKeyFlush(void) {
|
||||
server.cluster->slots_to_keys = zslCreate();
|
||||
}
|
||||
|
||||
/* Pupulate the specified array of objects with keys in the specified slot.
|
||||
* New objects are returned to represent keys, it's up to the caller to
|
||||
* decrement the reference count to release the keys names. */
|
||||
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
|
||||
zskiplistNode *n;
|
||||
zrangespec range;
|
||||
@ -1206,7 +1208,7 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
|
||||
|
||||
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
|
||||
while(n && n->score == hashslot && count--) {
|
||||
keys[j++] = n->obj;
|
||||
keys[j++] = createStringObject(n->ele,sdslen(n->ele));
|
||||
n = n->level[0].forward;
|
||||
}
|
||||
return j;
|
||||
@ -1224,9 +1226,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
|
||||
|
||||
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
|
||||
while(n && n->score == hashslot) {
|
||||
robj *key = n->obj;
|
||||
sds sdskey = n->ele;
|
||||
robj *key = createStringObject(sdskey,sdslen(sdskey));
|
||||
n = n->level[0].forward; /* Go to the next item before freeing it. */
|
||||
incrRefCount(key); /* Protect the object while freeing it. */
|
||||
dbDelete(&server.db[0],key);
|
||||
decrRefCount(key);
|
||||
j++;
|
||||
@ -1248,7 +1250,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
|
||||
|
||||
/* Use rank of first element, if any, to determine preliminary count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->obj);
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count = (zsl->length - (rank - 1));
|
||||
|
||||
/* Find last element in range */
|
||||
@ -1256,7 +1258,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
|
||||
|
||||
/* Use rank of last element, if any, to determine the actual count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->obj);
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count -= (zsl->length - rank);
|
||||
}
|
||||
}
|
||||
|
@ -163,12 +163,9 @@ void computeDatasetDigest(unsigned char *final) {
|
||||
listTypeReleaseIterator(li);
|
||||
} else if (o->type == OBJ_SET) {
|
||||
setTypeIterator *si = setTypeInitIterator(o);
|
||||
robj *ele;
|
||||
sds sdsele;
|
||||
while((sdsele = setTypeNextObject(si)) != NULL) {
|
||||
ele = createObject(OBJ_STRING,sdsele);
|
||||
xorObjectDigest(digest,ele);
|
||||
decrRefCount(ele);
|
||||
xorDigest(digest,sdsele,sdslen(sdsele));
|
||||
}
|
||||
setTypeReleaseIterator(si);
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
@ -210,12 +207,12 @@ void computeDatasetDigest(unsigned char *final) {
|
||||
dictEntry *de;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *eleobj = dictGetKey(de);
|
||||
sds sdsele = dictGetKey(de);
|
||||
double *score = dictGetVal(de);
|
||||
|
||||
snprintf(buf,sizeof(buf),"%.17g",*score);
|
||||
memset(eledigest,0,20);
|
||||
mixObjectDigest(eledigest,eleobj);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
xorDigest(digest,eledigest,20);
|
||||
}
|
||||
|
20
src/geo.c
20
src/geo.c
@ -112,7 +112,7 @@ int extractLongLatOrReply(client *c, robj **argv,
|
||||
int longLatFromMember(robj *zobj, robj *member, double *xy) {
|
||||
double score = 0;
|
||||
|
||||
if (zsetScore(zobj, member, &score) == C_ERR) return C_ERR;
|
||||
if (zsetScore(zobj, member->ptr, &score) == C_ERR) return C_ERR;
|
||||
if (!decodeGeohash(score, xy)) return C_ERR;
|
||||
return C_OK;
|
||||
}
|
||||
@ -261,16 +261,14 @@ int geoGetPointsInRange(robj *zobj, double min, double max, double lon, double l
|
||||
}
|
||||
|
||||
while (ln) {
|
||||
robj *o = ln->obj;
|
||||
sds ele = ln->ele;
|
||||
/* Abort when the node is no longer in range. */
|
||||
if (!zslValueLteMax(ln->score, &range))
|
||||
break;
|
||||
|
||||
member = (o->encoding == OBJ_ENCODING_INT) ?
|
||||
sdsfromlonglong((long)o->ptr) :
|
||||
sdsdup(o->ptr);
|
||||
if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,member)
|
||||
== C_ERR) sdsfree(member);
|
||||
ele = sdsdup(ele);
|
||||
if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,ele)
|
||||
== C_ERR) sdsfree(ele);
|
||||
ln = ln->level[0].forward;
|
||||
}
|
||||
}
|
||||
@ -606,7 +604,7 @@ void geohashCommand(client *c) {
|
||||
addReplyMultiBulkLen(c,c->argc-2);
|
||||
for (j = 2; j < c->argc; j++) {
|
||||
double score;
|
||||
if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
|
||||
if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
|
||||
addReply(c,shared.nullbulk);
|
||||
} else {
|
||||
/* The internal format we use for geocoding is a bit different
|
||||
@ -660,7 +658,7 @@ void geoposCommand(client *c) {
|
||||
addReplyMultiBulkLen(c,c->argc-2);
|
||||
for (j = 2; j < c->argc; j++) {
|
||||
double score;
|
||||
if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
|
||||
if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
|
||||
addReply(c,shared.nullmultibulk);
|
||||
} else {
|
||||
/* Decode... */
|
||||
@ -700,8 +698,8 @@ void geodistCommand(client *c) {
|
||||
|
||||
/* Get the scores. We need both otherwise NULL is returned. */
|
||||
double score1, score2, xyxy[4];
|
||||
if (zsetScore(zobj, c->argv[2], &score1) == C_ERR ||
|
||||
zsetScore(zobj, c->argv[3], &score2) == C_ERR)
|
||||
if (zsetScore(zobj, c->argv[2]->ptr, &score1) == C_ERR ||
|
||||
zsetScore(zobj, c->argv[3]->ptr, &score2) == C_ERR)
|
||||
{
|
||||
addReply(c,shared.nullbulk);
|
||||
return;
|
||||
|
27
src/rdb.c
27
src/rdb.c
@ -622,10 +622,11 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
|
||||
nwritten += n;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *eleobj = dictGetKey(de);
|
||||
sds ele = dictGetKey(de);
|
||||
double *score = dictGetVal(de);
|
||||
|
||||
if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
|
||||
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
|
||||
== -1) return -1;
|
||||
nwritten += n;
|
||||
if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
|
||||
nwritten += n;
|
||||
@ -992,7 +993,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
return NULL;
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_INTSET) {
|
||||
/* Fetch integer value from element */
|
||||
/* Fetch integer value from element. */
|
||||
if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
|
||||
o->ptr = intsetAdd(o->ptr,llval,NULL);
|
||||
} else {
|
||||
@ -1002,7 +1003,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
}
|
||||
|
||||
/* This will also be called when the set was just converted
|
||||
* to a regular hash table encoded set */
|
||||
* to a regular hash table encoded set. */
|
||||
if (o->encoding == OBJ_ENCODING_HT) {
|
||||
dictAdd((dict*)o->ptr,sdsele,NULL);
|
||||
} else {
|
||||
@ -1010,7 +1011,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
}
|
||||
}
|
||||
} else if (rdbtype == RDB_TYPE_ZSET) {
|
||||
/* Read list/set value */
|
||||
/* Read list/set value. */
|
||||
size_t zsetlen;
|
||||
size_t maxelelen = 0;
|
||||
zset *zs;
|
||||
@ -1019,23 +1020,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
o = createZsetObject();
|
||||
zs = o->ptr;
|
||||
|
||||
/* Load every single element of the list/set */
|
||||
/* Load every single element of the sorted set. */
|
||||
while(zsetlen--) {
|
||||
robj *ele;
|
||||
sds sdsele;
|
||||
double score;
|
||||
zskiplistNode *znode;
|
||||
|
||||
if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
|
||||
ele = tryObjectEncoding(ele);
|
||||
if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL)
|
||||
return NULL;
|
||||
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
|
||||
|
||||
/* Don't care about integer-encoded strings. */
|
||||
if (sdsEncodedObject(ele) && sdslen(ele->ptr) > maxelelen)
|
||||
maxelelen = sdslen(ele->ptr);
|
||||
if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
|
||||
|
||||
znode = zslInsert(zs->zsl,score,ele);
|
||||
dictAdd(zs->dict,ele,&znode->score);
|
||||
incrRefCount(ele); /* added to skiplist */
|
||||
znode = zslInsert(zs->zsl,score,sdsele);
|
||||
dictAdd(zs->dict,sdsele,&znode->score);
|
||||
}
|
||||
|
||||
/* Convert *after* loading, since sorted sets are not stored ordered. */
|
||||
|
10
src/server.c
10
src/server.c
@ -554,11 +554,11 @@ dictType setDictType = {
|
||||
|
||||
/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
|
||||
dictType zsetDictType = {
|
||||
dictEncObjHash, /* hash function */
|
||||
dictSdsHash, /* hash function */
|
||||
NULL, /* key dup */
|
||||
NULL, /* val dup */
|
||||
dictEncObjKeyCompare, /* key compare */
|
||||
dictObjectDestructor, /* key destructor */
|
||||
dictSdsKeyCompare, /* key compare */
|
||||
NULL, /* Note: SDS string shared & freed by skiplist */
|
||||
NULL /* val destructor */
|
||||
};
|
||||
|
||||
@ -1428,8 +1428,8 @@ void createSharedObjects(void) {
|
||||
* actually used for their value but as a special object meaning
|
||||
* respectively the minimum possible string and the maximum possible
|
||||
* string in string comparisons for the ZRANGEBYLEX command. */
|
||||
shared.minstring = createStringObject("minstring",9);
|
||||
shared.maxstring = createStringObject("maxstring",9);
|
||||
shared.minstring = sdsnew("minstring");
|
||||
shared.maxstring = sdsnew("maxstring");
|
||||
}
|
||||
|
||||
void initServerConfig(void) {
|
||||
|
17
src/server.h
17
src/server.h
@ -612,16 +612,17 @@ struct sharedObjectsStruct {
|
||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
||||
*lpush, *emptyscan, *minstring, *maxstring,
|
||||
*lpush, *emptyscan,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
*integers[OBJ_SHARED_INTEGERS],
|
||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||
*bulkhdr[OBJ_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */
|
||||
sds minstring, maxstring;
|
||||
};
|
||||
|
||||
/* ZSETs use a specialized version of Skiplists */
|
||||
typedef struct zskiplistNode {
|
||||
robj *obj;
|
||||
sds ele;
|
||||
double score;
|
||||
struct zskiplistNode *backward;
|
||||
struct zskiplistLevel {
|
||||
@ -1261,15 +1262,15 @@ typedef struct {
|
||||
|
||||
/* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */
|
||||
typedef struct {
|
||||
robj *min, *max; /* May be set to shared.(minstring|maxstring) */
|
||||
sds min, max; /* May be set to shared.(minstring|maxstring) */
|
||||
int minex, maxex; /* are min or max exclusive? */
|
||||
} zlexrangespec;
|
||||
|
||||
zskiplist *zslCreate(void);
|
||||
void zslFree(zskiplist *zsl);
|
||||
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
|
||||
unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
|
||||
int zslDelete(zskiplist *zsl, double score, robj *obj);
|
||||
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele);
|
||||
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score);
|
||||
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node);
|
||||
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range);
|
||||
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range);
|
||||
double zzlGetScore(unsigned char *sptr);
|
||||
@ -1277,8 +1278,8 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
|
||||
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
|
||||
unsigned int zsetLength(robj *zobj);
|
||||
void zsetConvert(robj *zobj, int encoding);
|
||||
int zsetScore(robj *zobj, robj *member, double *score);
|
||||
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o);
|
||||
int zsetScore(robj *zobj, sds member, double *score);
|
||||
unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
|
||||
|
||||
/* Core functions */
|
||||
int freeMemoryIfNeeded(void);
|
||||
|
10
src/sort.c
10
src/sort.c
@ -399,7 +399,7 @@ void sortCommand(client *c) {
|
||||
zset *zs = sortval->ptr;
|
||||
zskiplist *zsl = zs->zsl;
|
||||
zskiplistNode *ln;
|
||||
robj *ele;
|
||||
sds sdsele;
|
||||
int rangelen = vectorlen;
|
||||
|
||||
/* Check if starting point is trivial, before doing log(N) lookup. */
|
||||
@ -417,8 +417,8 @@ void sortCommand(client *c) {
|
||||
|
||||
while(rangelen--) {
|
||||
serverAssertWithInfo(c,sortval,ln != NULL);
|
||||
ele = ln->obj;
|
||||
vector[j].obj = ele;
|
||||
sdsele = ln->ele;
|
||||
vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
|
||||
vector[j].u.score = 0;
|
||||
vector[j].u.cmpobj = NULL;
|
||||
j++;
|
||||
@ -431,9 +431,11 @@ void sortCommand(client *c) {
|
||||
dict *set = ((zset*)sortval->ptr)->dict;
|
||||
dictIterator *di;
|
||||
dictEntry *setele;
|
||||
sds sdsele;
|
||||
di = dictGetIterator(set);
|
||||
while((setele = dictNext(di)) != NULL) {
|
||||
vector[j].obj = dictGetKey(setele);
|
||||
sdsele = dictGetKey(setele);
|
||||
vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
|
||||
vector[j].u.score = 0;
|
||||
vector[j].u.cmpobj = NULL;
|
||||
j++;
|
||||
|
442
src/t_zset.c
442
src/t_zset.c
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user