mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 16:10:50 +00:00
Generic iterator code for usage in ZUNIONSTORE/ZINTERSTORE
This commit is contained in:
parent
bbfe232f60
commit
56ce42faf1
431
src/t_zset.c
431
src/t_zset.c
@ -810,6 +810,12 @@ void zsConvert(robj *zobj, int encoding) {
|
||||
* Sorted set commands
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
// if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
// } else if (zobj->encoding == REDIS_ENCODING_RAW) {
|
||||
// } else {
|
||||
// redisPanic("Unknown sorted set encoding");
|
||||
// }
|
||||
|
||||
/* This generic command implements both ZADD and ZINCRBY. */
|
||||
void zaddGenericCommand(redisClient *c, int incr) {
|
||||
static char *nanerr = "resulting score is not a number (NaN)";
|
||||
@ -1075,16 +1081,327 @@ void zremrangebyrankCommand(redisClient *c) {
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
dict *dict;
|
||||
robj *subject;
|
||||
int type; /* Set, sorted set */
|
||||
int encoding;
|
||||
double weight;
|
||||
|
||||
union {
|
||||
/* Set iterators. */
|
||||
union _iterset {
|
||||
struct {
|
||||
intset *is;
|
||||
int ii;
|
||||
} is;
|
||||
struct {
|
||||
dict *dict;
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
} ht;
|
||||
} set;
|
||||
|
||||
/* Sorted set iterators. */
|
||||
union _iterzset {
|
||||
struct {
|
||||
unsigned char *zl;
|
||||
unsigned char *eptr, *sptr;
|
||||
} zl;
|
||||
struct {
|
||||
zset *zs;
|
||||
zskiplistNode *node;
|
||||
} sl;
|
||||
} zset;
|
||||
} iter;
|
||||
} zsetopsrc;
|
||||
|
||||
int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
|
||||
zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
|
||||
unsigned long size1, size2;
|
||||
size1 = d1->dict ? dictSize(d1->dict) : 0;
|
||||
size2 = d2->dict ? dictSize(d2->dict) : 0;
|
||||
return size1 - size2;
|
||||
|
||||
/* Use dirty flags for pointers that need to be cleaned up in the next
|
||||
* iteration over the zsetopval. The dirty flag for the long long value is
|
||||
* special, since long long values don't need cleanup. Instead, it means that
|
||||
* we already checked that "ell" holds a long long, or tried to convert another
|
||||
* representation into a long long value. When this was successful,
|
||||
* OPVAL_VALID_LL is set as well. */
|
||||
#define OPVAL_DIRTY_ROBJ 1
|
||||
#define OPVAL_DIRTY_LL 2
|
||||
#define OPVAL_VALID_LL 4
|
||||
|
||||
/* Store value retrieved from the iterator. */
|
||||
typedef struct {
|
||||
int flags;
|
||||
unsigned char _buf[32]; /* Private buffer. */
|
||||
robj *ele;
|
||||
unsigned char *estr;
|
||||
unsigned int elen;
|
||||
long long ell;
|
||||
double score;
|
||||
} zsetopval;
|
||||
|
||||
typedef union _iterset iterset;
|
||||
typedef union _iterzset iterzset;
|
||||
|
||||
void zuiInitIterator(zsetopsrc *op) {
|
||||
if (op->subject == NULL)
|
||||
return;
|
||||
|
||||
if (op->type == REDIS_SET) {
|
||||
iterset *it = &op->iter.set;
|
||||
if (op->encoding == REDIS_ENCODING_INTSET) {
|
||||
it->is.is = op->subject->ptr;
|
||||
it->is.ii = 0;
|
||||
} else if (op->encoding == REDIS_ENCODING_HT) {
|
||||
it->ht.dict = op->subject->ptr;
|
||||
it->ht.di = dictGetIterator(op->subject->ptr);
|
||||
it->ht.de = dictNext(it->ht.di);
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (op->type == REDIS_ZSET) {
|
||||
iterzset *it = &op->iter.zset;
|
||||
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
it->zl.zl = op->subject->ptr;
|
||||
it->zl.eptr = ziplistIndex(it->zl.zl,0);
|
||||
if (it->zl.eptr != NULL) {
|
||||
it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
|
||||
redisAssert(it->zl.sptr != NULL);
|
||||
}
|
||||
} else if (op->encoding == REDIS_ENCODING_RAW) {
|
||||
it->sl.zs = op->subject->ptr;
|
||||
it->sl.node = it->sl.zs->zsl->header->level[0].forward;
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unsupported type");
|
||||
}
|
||||
}
|
||||
|
||||
void zuiClearIterator(zsetopsrc *op) {
|
||||
if (op->subject == NULL)
|
||||
return;
|
||||
|
||||
if (op->type == REDIS_SET) {
|
||||
iterset *it = &op->iter.set;
|
||||
if (op->encoding == REDIS_ENCODING_INTSET) {
|
||||
REDIS_NOTUSED(it); /* skip */
|
||||
} else if (op->encoding == REDIS_ENCODING_HT) {
|
||||
dictReleaseIterator(it->ht.di);
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (op->type == REDIS_ZSET) {
|
||||
iterzset *it = &op->iter.zset;
|
||||
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
REDIS_NOTUSED(it); /* skip */
|
||||
} else if (op->encoding == REDIS_ENCODING_RAW) {
|
||||
REDIS_NOTUSED(it); /* skip */
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unsupported type");
|
||||
}
|
||||
}
|
||||
|
||||
int zuiLength(zsetopsrc *op) {
|
||||
if (op->subject == NULL)
|
||||
return 0;
|
||||
|
||||
if (op->type == REDIS_SET) {
|
||||
iterset *it = &op->iter.set;
|
||||
if (op->encoding == REDIS_ENCODING_INTSET) {
|
||||
return intsetLen(it->is.is);
|
||||
} else if (op->encoding == REDIS_ENCODING_HT) {
|
||||
return dictSize(it->ht.dict);
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (op->type == REDIS_ZSET) {
|
||||
iterzset *it = &op->iter.zset;
|
||||
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
return zzlLength(it->zl.zl);
|
||||
} else if (op->encoding == REDIS_ENCODING_RAW) {
|
||||
return it->sl.zs->zsl->length;
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unsupported type");
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if the current value is valid. If so, store it in the passed structure
|
||||
* and move to the next element. If not valid, this means we have reached the
|
||||
* end of the structure and can abort. */
|
||||
int zuiNext(zsetopsrc *op, zsetopval *val) {
|
||||
if (op->subject == NULL)
|
||||
return 0;
|
||||
|
||||
if (val->flags & OPVAL_DIRTY_ROBJ)
|
||||
decrRefCount(val->ele);
|
||||
|
||||
bzero(val,sizeof(zsetopval));
|
||||
|
||||
if (op->type == REDIS_SET) {
|
||||
iterset *it = &op->iter.set;
|
||||
if (op->encoding == REDIS_ENCODING_INTSET) {
|
||||
if (!intsetGet(it->is.is,it->is.ii,&val->ell))
|
||||
return 0;
|
||||
val->score = 1.0;
|
||||
|
||||
/* Move to next element. */
|
||||
it->is.ii++;
|
||||
} else if (op->encoding == REDIS_ENCODING_HT) {
|
||||
if (it->ht.de == NULL)
|
||||
return 0;
|
||||
val->ele = dictGetEntryKey(it->ht.de);
|
||||
val->score = 1.0;
|
||||
|
||||
/* Move to next element. */
|
||||
it->ht.de = dictNext(it->ht.di);
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (op->type == REDIS_ZSET) {
|
||||
iterzset *it = &op->iter.zset;
|
||||
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
/* No need to check both, but better be explicit. */
|
||||
if (it->zl.eptr == NULL || it->zl.sptr == NULL)
|
||||
return 0;
|
||||
redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
|
||||
val->score = zzlGetScore(it->zl.sptr);
|
||||
|
||||
/* Move to next element. */
|
||||
zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
|
||||
} else if (op->encoding == REDIS_ENCODING_RAW) {
|
||||
if (it->sl.node == NULL)
|
||||
return 0;
|
||||
val->ele = it->sl.node->obj;
|
||||
val->score = it->sl.node->score;
|
||||
|
||||
/* Move to next element. */
|
||||
it->sl.node = it->sl.node->level[0].forward;
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unsupported type");
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int zuiLongLongFromValue(zsetopval *val) {
|
||||
if (!(val->flags & OPVAL_DIRTY_LL)) {
|
||||
val->flags |= OPVAL_DIRTY_LL;
|
||||
|
||||
if (val->ele != NULL) {
|
||||
if (val->ele->encoding == REDIS_ENCODING_INT) {
|
||||
val->ell = (long)val->ele->ptr;
|
||||
val->flags |= OPVAL_VALID_LL;
|
||||
} else if (val->ele->encoding == REDIS_ENCODING_RAW) {
|
||||
if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
|
||||
val->flags |= OPVAL_VALID_LL;
|
||||
} else {
|
||||
redisPanic("Unsupported element encoding");
|
||||
}
|
||||
} else if (val->estr != NULL) {
|
||||
if (string2ll((char*)val->estr,val->elen,&val->ell))
|
||||
val->flags |= OPVAL_VALID_LL;
|
||||
} else {
|
||||
/* The long long was already set, flag as valid. */
|
||||
val->flags |= OPVAL_VALID_LL;
|
||||
}
|
||||
}
|
||||
return val->flags & OPVAL_VALID_LL;
|
||||
}
|
||||
|
||||
robj *zuiObjectFromValue(zsetopval *val) {
|
||||
if (val->ele == NULL) {
|
||||
if (val->estr != NULL) {
|
||||
val->ele = createStringObject((char*)val->estr,val->elen);
|
||||
} else {
|
||||
val->ele = createStringObjectFromLongLong(val->ell);
|
||||
}
|
||||
val->flags |= OPVAL_DIRTY_ROBJ;
|
||||
}
|
||||
return val->ele;
|
||||
}
|
||||
|
||||
int zuiBufferFromValue(zsetopval *val) {
|
||||
if (val->estr == NULL) {
|
||||
if (val->ele != NULL) {
|
||||
if (val->ele->encoding == REDIS_ENCODING_INT) {
|
||||
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
|
||||
val->estr = val->_buf;
|
||||
} else if (val->ele->encoding == REDIS_ENCODING_RAW) {
|
||||
val->elen = sdslen(val->ele->ptr);
|
||||
val->estr = val->ele->ptr;
|
||||
} else {
|
||||
redisPanic("Unsupported element encoding");
|
||||
}
|
||||
} else {
|
||||
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
|
||||
val->estr = val->_buf;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Find value pointed to by val in the source pointer to by op. When found,
|
||||
* return 1 and store its score in target. Return 0 otherwise. */
|
||||
int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
|
||||
if (op->subject == NULL)
|
||||
return 0;
|
||||
|
||||
if (op->type == REDIS_SET) {
|
||||
iterset *it = &op->iter.set;
|
||||
|
||||
if (op->encoding == REDIS_ENCODING_INTSET) {
|
||||
if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
|
||||
*score = 1.0;
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else if (op->encoding == REDIS_ENCODING_HT) {
|
||||
zuiObjectFromValue(val);
|
||||
if (dictFind(it->ht.dict,val->ele) != NULL) {
|
||||
*score = 1.0;
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (op->type == REDIS_ZSET) {
|
||||
iterzset *it = &op->iter.zset;
|
||||
zuiObjectFromValue(val);
|
||||
|
||||
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
if (zzlFind(op->subject,val->ele,score) != NULL) {
|
||||
/* Score is already set by zzlFind. */
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else if (op->encoding == REDIS_ENCODING_RAW) {
|
||||
dictEntry *de;
|
||||
if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
|
||||
*score = *(double*)dictGetEntryVal(de);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unsupported type");
|
||||
}
|
||||
}
|
||||
|
||||
int zuiCompareByCardinality(const void *s1, const void *s2) {
|
||||
return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
|
||||
}
|
||||
|
||||
#define REDIS_AGGR_SUM 1
|
||||
@ -1113,11 +1430,11 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
|
||||
int i, j, setnum;
|
||||
int aggregate = REDIS_AGGR_SUM;
|
||||
zsetopsrc *src;
|
||||
zsetopval zval;
|
||||
robj *tmp;
|
||||
robj *dstobj;
|
||||
zset *dstzset;
|
||||
zskiplistNode *znode;
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
int touched = 0;
|
||||
|
||||
/* expect setnum input keys to be given */
|
||||
@ -1135,24 +1452,24 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
|
||||
}
|
||||
|
||||
/* read keys to be used for input */
|
||||
src = zmalloc(sizeof(zsetopsrc) * setnum);
|
||||
src = zcalloc(sizeof(zsetopsrc) * setnum);
|
||||
for (i = 0, j = 3; i < setnum; i++, j++) {
|
||||
robj *obj = lookupKeyWrite(c->db,c->argv[j]);
|
||||
if (!obj) {
|
||||
src[i].dict = NULL;
|
||||
} else {
|
||||
if (obj->type == REDIS_ZSET) {
|
||||
src[i].dict = ((zset*)obj->ptr)->dict;
|
||||
} else if (obj->type == REDIS_SET) {
|
||||
src[i].dict = (obj->ptr);
|
||||
} else {
|
||||
if (obj != NULL) {
|
||||
if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
|
||||
zfree(src);
|
||||
addReply(c,shared.wrongtypeerr);
|
||||
return;
|
||||
}
|
||||
|
||||
src[i].subject = obj;
|
||||
src[i].type = obj->type;
|
||||
src[i].encoding = obj->encoding;
|
||||
} else {
|
||||
src[i].subject = NULL;
|
||||
}
|
||||
|
||||
/* default all weights to 1 */
|
||||
/* Default all weights to 1. */
|
||||
src[i].weight = 1.0;
|
||||
}
|
||||
|
||||
@ -1193,82 +1510,82 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < setnum; i++)
|
||||
zuiInitIterator(&src[i]);
|
||||
|
||||
/* sort sets from the smallest to largest, this will improve our
|
||||
* algorithm's performance */
|
||||
qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
|
||||
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
|
||||
|
||||
dstobj = createZsetObject();
|
||||
dstzset = dstobj->ptr;
|
||||
|
||||
if (op == REDIS_OP_INTER) {
|
||||
/* skip going over all entries if the smallest zset is NULL or empty */
|
||||
if (src[0].dict && dictSize(src[0].dict) > 0) {
|
||||
/* precondition: as src[0].dict is non-empty and the zsets are ordered
|
||||
* from small to large, all src[i > 0].dict are non-empty too */
|
||||
di = dictGetIterator(src[0].dict);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
/* Skip everything if the smallest input is empty. */
|
||||
if (zuiLength(&src[0]) > 0) {
|
||||
/* Precondition: as src[0] is non-empty and the inputs are ordered
|
||||
* by size, all src[i > 0] are non-empty too. */
|
||||
while (zuiNext(&src[0],&zval)) {
|
||||
double score, value;
|
||||
|
||||
score = src[0].weight * zunionInterDictValue(de);
|
||||
score = src[0].weight * zval.score;
|
||||
for (j = 1; j < setnum; j++) {
|
||||
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
|
||||
if (other) {
|
||||
value = src[j].weight * zunionInterDictValue(other);
|
||||
if (zuiFind(&src[j],&zval,&value)) {
|
||||
value *= src[j].weight;
|
||||
zunionInterAggregate(&score,value,aggregate);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Only continue when present in every source dict. */
|
||||
/* Only continue when present in every input. */
|
||||
if (j == setnum) {
|
||||
robj *o = dictGetEntryKey(de);
|
||||
znode = zslInsert(dstzset->zsl,score,o);
|
||||
incrRefCount(o); /* added to skiplist */
|
||||
dictAdd(dstzset->dict,o,&znode->score);
|
||||
incrRefCount(o); /* added to dictionary */
|
||||
tmp = zuiObjectFromValue(&zval);
|
||||
znode = zslInsert(dstzset->zsl,score,tmp);
|
||||
incrRefCount(tmp); /* added to skiplist */
|
||||
dictAdd(dstzset->dict,tmp,&znode->score);
|
||||
incrRefCount(tmp); /* added to dictionary */
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
} else if (op == REDIS_OP_UNION) {
|
||||
for (i = 0; i < setnum; i++) {
|
||||
if (!src[i].dict) continue;
|
||||
if (zuiLength(&src[0]) == 0)
|
||||
continue;
|
||||
|
||||
di = dictGetIterator(src[i].dict);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
while (zuiNext(&src[i],&zval)) {
|
||||
double score, value;
|
||||
|
||||
/* skip key when already processed */
|
||||
if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
|
||||
/* Skip key when already processed */
|
||||
if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
|
||||
continue;
|
||||
|
||||
/* initialize score */
|
||||
score = src[i].weight * zunionInterDictValue(de);
|
||||
/* Initialize score */
|
||||
score = src[i].weight * zval.score;
|
||||
|
||||
/* because the zsets are sorted by size, its only possible
|
||||
* for sets at larger indices to hold this entry */
|
||||
/* Because the inputs are sorted by size, it's only possible
|
||||
* for sets at larger indices to hold this element. */
|
||||
for (j = (i+1); j < setnum; j++) {
|
||||
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
|
||||
if (other) {
|
||||
value = src[j].weight * zunionInterDictValue(other);
|
||||
if (zuiFind(&src[j],&zval,&value)) {
|
||||
value *= src[j].weight;
|
||||
zunionInterAggregate(&score,value,aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
robj *o = dictGetEntryKey(de);
|
||||
znode = zslInsert(dstzset->zsl,score,o);
|
||||
incrRefCount(o); /* added to skiplist */
|
||||
dictAdd(dstzset->dict,o,&znode->score);
|
||||
incrRefCount(o); /* added to dictionary */
|
||||
tmp = zuiObjectFromValue(&zval);
|
||||
znode = zslInsert(dstzset->zsl,score,tmp);
|
||||
incrRefCount(zval.ele); /* added to skiplist */
|
||||
dictAdd(dstzset->dict,tmp,&znode->score);
|
||||
incrRefCount(zval.ele); /* added to dictionary */
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
} else {
|
||||
/* unknown operator */
|
||||
redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
|
||||
redisPanic("Unknown operator");
|
||||
}
|
||||
|
||||
for (i = 0; i < setnum; i++)
|
||||
zuiClearIterator(&src[i]);
|
||||
|
||||
if (dbDelete(c->db,dstkey)) {
|
||||
signalModifiedKey(c->db,dstkey);
|
||||
touched = 1;
|
||||
|
Loading…
x
Reference in New Issue
Block a user