diff --git a/src/t_zset.c b/src/t_zset.c index ab094654..f5f2d7b5 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1981,51 +1981,78 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { zuiClearIterator(&src[0]); } } else if (op == REDIS_OP_UNION) { + dict *accumulator = dictCreate(&setDictType,NULL); + dictIterator *di; + dictEntry *de; + double score; + + if (setnum) { + /* Our union is at least as large as the largest set. + * Resize the dictionary ASAP to avoid useless rehashing. */ + int minlen = setnum ? zuiLength(&src[setnum-1]) : 0; + dictExpand(accumulator,minlen); + } + + /* Step 1: Create a dictionary of elements -> aggregated-scores + * by iterating one sorted set after the other. */ for (i = 0; i < setnum; i++) { - if (zuiLength(&src[i]) == 0) - continue; + if (zuiLength(&src[i]) == 0) continue; zuiInitIterator(&src[i]); while (zuiNext(&src[i],&zval)) { - double score, value; - - /* Skip an element that when already processed */ - if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL) - continue; - - /* Initialize score */ + /* Initialize value */ score = src[i].weight * zval.score; if (isnan(score)) score = 0; - /* We need to check only next sets to see if this element - * exists, since we process every element just one time so - * it can't exist in a previous set (otherwise it would be - * already processed). */ - for (j = (i+1); j < setnum; j++) { - /* It is not safe to access the zset we are - * iterating, so explicitly check for equal object. */ - if(src[j].subject == src[i].subject) { - value = zval.score*src[j].weight; - zunionInterAggregate(&score,value,aggregate); - } else if (zuiFind(&src[j],&zval,&value)) { - value *= src[j].weight; - zunionInterAggregate(&score,value,aggregate); + /* Search for this element in the accumulating dictionary. */ + de = dictFind(accumulator,zuiObjectFromValue(&zval)); + /* If we don't have it, we need to create a new entry. */ + if (de == NULL) { + tmp = zuiObjectFromValue(&zval); + /* Remember the longest single element encountered, + * to understand if it's possible to convert to ziplist + * at the end. */ + if (sdsEncodedObject(tmp)) { + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } - } - - tmp = zuiObjectFromValue(&zval); - znode = zslInsert(dstzset->zsl,score,tmp); - incrRefCount(zval.ele); /* added to skiplist */ - dictAdd(dstzset->dict,tmp,&znode->score); - incrRefCount(zval.ele); /* added to dictionary */ - - if (sdsEncodedObject(tmp)) { - if (sdslen(tmp->ptr) > maxelelen) - maxelelen = sdslen(tmp->ptr); + /* Add the element with its initial score. */ + de = dictAddRaw(accumulator,tmp); + incrRefCount(tmp); + dictSetDoubleVal(de,score); + } else { + /* Update the score with the score of the new instance + * of the element found in the current sorted set. + * + * Here we access directly the dictEntry double + * value inside the union as it is a big speedup + * compared to using the getDouble/setDouble API. */ + zunionInterAggregate(&de->v.d,score,aggregate); } } zuiClearIterator(&src[i]); } + + /* Step 2: convert the dictionary into the final sorted set. */ + di = dictGetIterator(accumulator); + + /* We now are aware of the final size of the resulting sorted set, + * let's resize the dictionary embedded inside the sorted set to the + * right size, in order to save rehashing time. */ + dictExpand(dstzset->dict,dictSize(accumulator)); + + while((de = dictNext(di)) != NULL) { + robj *ele = dictGetKey(de); + score = dictGetDoubleVal(de); + znode = zslInsert(dstzset->zsl,score,ele); + incrRefCount(ele); /* added to skiplist */ + dictAdd(dstzset->dict,ele,&znode->score); + incrRefCount(ele); /* added to dictionary */ + } + dictReleaseIterator(di); + + /* We can free the accumulator dictionary now. */ + dictRelease(accumulator); } else { redisPanic("Unknown operator"); }