ZUNIONSTORE reimplemented for speed.

The user @kjmph provided excellent ideas to improve speed of ZUNIONSTORE
(in certain cases by many order of magnitude), together with an
implementation of the ideas.

While the ideas were sounding, the implementation could be improved both
in terms of speed and clearness, so that's my attempt at reimplementing
the speedup proposed, trying to improve by directly using just a
dictionary with an embedded score inside, and reusing the single-pass
aggregate + order-later approach.

Note that you can't apply this commit without applying the previous
commit in this branch that adds a double in the dictEntry value union.

Issue #1786.
This commit is contained in:
antirez 2014-06-06 18:04:04 +02:00
parent d1cb6a0fc4
commit 119813e968

View File

@ -1981,51 +1981,78 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
zuiClearIterator(&src[0]); zuiClearIterator(&src[0]);
} }
} else if (op == REDIS_OP_UNION) { } else if (op == REDIS_OP_UNION) {
dict *accumulator = dictCreate(&setDictType,NULL);
dictIterator *di;
dictEntry *de;
double score;
if (setnum) {
/* Our union is at least as large as the largest set.
* Resize the dictionary ASAP to avoid useless rehashing. */
int minlen = setnum ? zuiLength(&src[setnum-1]) : 0;
dictExpand(accumulator,minlen);
}
/* Step 1: Create a dictionary of elements -> aggregated-scores
* by iterating one sorted set after the other. */
for (i = 0; i < setnum; i++) { for (i = 0; i < setnum; i++) {
if (zuiLength(&src[i]) == 0) if (zuiLength(&src[i]) == 0) continue;
continue;
zuiInitIterator(&src[i]); zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) { while (zuiNext(&src[i],&zval)) {
double score, value; /* Initialize value */
/* Skip an element that when already processed */
if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
continue;
/* Initialize score */
score = src[i].weight * zval.score; score = src[i].weight * zval.score;
if (isnan(score)) score = 0; if (isnan(score)) score = 0;
/* We need to check only next sets to see if this element /* Search for this element in the accumulating dictionary. */
* exists, since we process every element just one time so de = dictFind(accumulator,zuiObjectFromValue(&zval));
* it can't exist in a previous set (otherwise it would be /* If we don't have it, we need to create a new entry. */
* already processed). */ if (de == NULL) {
for (j = (i+1); j < setnum; j++) { tmp = zuiObjectFromValue(&zval);
/* It is not safe to access the zset we are /* Remember the longest single element encountered,
* iterating, so explicitly check for equal object. */ * to understand if it's possible to convert to ziplist
if(src[j].subject == src[i].subject) { * at the end. */
value = zval.score*src[j].weight; if (sdsEncodedObject(tmp)) {
zunionInterAggregate(&score,value,aggregate); if (sdslen(tmp->ptr) > maxelelen)
} else if (zuiFind(&src[j],&zval,&value)) { maxelelen = sdslen(tmp->ptr);
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} }
} /* Add the element with its initial score. */
de = dictAddRaw(accumulator,tmp);
tmp = zuiObjectFromValue(&zval); incrRefCount(tmp);
znode = zslInsert(dstzset->zsl,score,tmp); dictSetDoubleVal(de,score);
incrRefCount(zval.ele); /* added to skiplist */ } else {
dictAdd(dstzset->dict,tmp,&znode->score); /* Update the score with the score of the new instance
incrRefCount(zval.ele); /* added to dictionary */ * of the element found in the current sorted set.
*
if (sdsEncodedObject(tmp)) { * Here we access directly the dictEntry double
if (sdslen(tmp->ptr) > maxelelen) * value inside the union as it is a big speedup
maxelelen = sdslen(tmp->ptr); * compared to using the getDouble/setDouble API. */
zunionInterAggregate(&de->v.d,score,aggregate);
} }
} }
zuiClearIterator(&src[i]); zuiClearIterator(&src[i]);
} }
/* Step 2: convert the dictionary into the final sorted set. */
di = dictGetIterator(accumulator);
/* We now are aware of the final size of the resulting sorted set,
* let's resize the dictionary embedded inside the sorted set to the
* right size, in order to save rehashing time. */
dictExpand(dstzset->dict,dictSize(accumulator));
while((de = dictNext(di)) != NULL) {
robj *ele = dictGetKey(de);
score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl,score,ele);
incrRefCount(ele); /* added to skiplist */
dictAdd(dstzset->dict,ele,&znode->score);
incrRefCount(ele); /* added to dictionary */
}
dictReleaseIterator(di);
/* We can free the accumulator dictionary now. */
dictRelease(accumulator);
} else { } else {
redisPanic("Unknown operator"); redisPanic("Unknown operator");
} }