Hyperloglog: Support for PFMERGE sparse encoding as target.

This is a fix for #3819.
This commit is contained in:
antirez 2017-12-22 11:01:19 +01:00
parent cbe9d725a7
commit 27b81f3fbc

View File

@ -1298,6 +1298,7 @@ void pfmergeCommand(client *c) {
uint8_t max[HLL_REGISTERS];
struct hllhdr *hdr;
int j;
int use_dense = 0; /* Use dense representation as target? */
/* Compute an HLL with M[i] = MAX(M[i]_j).
* We store the maximum into the max array of registers. We'll write
@ -1309,6 +1310,11 @@ void pfmergeCommand(client *c) {
if (o == NULL) continue; /* Assume empty HLL for non existing var. */
if (isHLLObjectOrReply(c,o) != C_OK) return;
/* If at least one involved HLL is dense, use the dense representation
* as target ASAP to save time and avoid the conversion step. */
hdr = o->ptr;
if (hdr->encoding == HLL_DENSE) use_dense = 1;
/* Merge with this HLL with our 'max' HHL by setting max[i]
* to MAX(max[i],hll[i]). */
if (hllMerge(max,o) == C_ERR) {
@ -1332,8 +1338,9 @@ void pfmergeCommand(client *c) {
o = dbUnshareStringValue(c->db,c->argv[1],o);
}
/* Only support dense objects as destination. */
if (hllSparseToDense(o) == C_ERR) {
/* Convert the destination object to dense representation if at least
* one of the inputs was dense. */
if (use_dense && hllSparseToDense(o) == C_ERR) {
addReplySds(c,sdsnew(invalid_hll_err));
return;
}
@ -1342,7 +1349,11 @@ void pfmergeCommand(client *c) {
* invalidate the cached value. */
hdr = o->ptr;
for (j = 0; j < HLL_REGISTERS; j++) {
HLL_DENSE_SET_REGISTER(hdr->registers,j,max[j]);
if (max[j] == 0) continue;
switch(hdr->encoding) {
case HLL_DENSE: hllDenseSet(hdr->registers,j,max[j]); break;
case HLL_SPARSE: hllSparseSet(o,j,max[j]); break;
}
}
HLL_INVALIDATE_CACHE(hdr);