From 81ceef7d22bcc1a30bce029ffae1a53f4752b7ed Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 14 Apr 2014 16:09:32 +0200 Subject: [PATCH] PFMERGE fixed to work with sparse encoding. --- src/hyperloglog.c | 53 ++++++++++++++++++++++++++++++++------ tests/unit/hyperloglog.tcl | 4 +-- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/hyperloglog.c b/src/hyperloglog.c index fe0a5d7a..9b8968c1 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1130,7 +1130,7 @@ void pfcountCommand(redisClient *c) { /* Recompute it and update the cached value. */ card = hllCount(hdr,&invalid); if (invalid) { - addReplyError(c,"Invalid HLL object"); + addReplyError(c,"Invalid HLL object detected"); return; } hdr->card[0] = card & 0xff; @@ -1163,8 +1163,6 @@ void pfmergeCommand(redisClient *c) { * it to the target variable later. */ memset(max,0,sizeof(max)); for (j = 1; j < c->argc; j++) { - uint8_t val; - /* Check type and size. */ robj *o = lookupKeyRead(c->db,c->argv[j]); if (o == NULL) continue; /* Assume empty HLL for non existing var. */ @@ -1173,14 +1171,47 @@ void pfmergeCommand(redisClient *c) { /* Merge with this HLL with our 'max' HHL by setting max[i] * to MAX(max[i],hll[i]). */ hdr = o->ptr; - for (i = 0; i < HLL_REGISTERS; i++) { - HLL_DENSE_GET_REGISTER(val,hdr->registers,i); - if (val > max[i]) max[i] = val; + if (hdr->encoding == HLL_DENSE) { + uint8_t val; + + for (i = 0; i < HLL_REGISTERS; i++) { + HLL_DENSE_GET_REGISTER(val,hdr->registers,i); + if (val > max[i]) max[i] = val; + } + } else { + uint8_t *p = o->ptr, *end = p + sdslen(o->ptr); + long runlen, regval; + + p += HLL_HDR_SIZE; + i = 0; + while(p < end) { + if (HLL_SPARSE_IS_ZERO(p)) { + runlen = HLL_SPARSE_ZERO_LEN(p); + i += runlen; + p++; + } else if (HLL_SPARSE_IS_XZERO(p)) { + runlen = HLL_SPARSE_XZERO_LEN(p); + i += runlen; + p += 2; + } else { + runlen = HLL_SPARSE_VAL_LEN(p); + regval = HLL_SPARSE_VAL_VALUE(p); + while(runlen--) { + if (regval > max[i]) max[i] = regval; + i++; + } + p++; + } + } + if (i != HLL_REGISTERS) { + addReplyError(c,"Invalid HLL object detected"); + return; + } } } /* Create / unshare the destination key's value if needed. */ - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key with a string value of the exact length to * hold our HLL data structure. sdsnewlen() when NULL is passed @@ -1194,6 +1225,12 @@ void pfmergeCommand(redisClient *c) { o = dbUnshareStringValue(c->db,c->argv[1],o); } + /* Only support dense objects as destination. */ + if (hllSparseToDense(o) == REDIS_ERR) { + addReplyError(c,"Invalid HLL object detected"); + return; + } + /* Write the resulting HLL to the destination HLL registers and * invalidate the cached value. */ hdr = o->ptr; @@ -1308,7 +1345,7 @@ void pfdebugCommand(redisClient *c) { if (hdr->encoding == HLL_SPARSE) { if (hllSparseToDense(o) == REDIS_ERR) { - addReplyError(c,"HLL sparse encoding is corrupted"); + addReplyError(c,"Invalid HLL object detected"); return; } server.dirty++; /* Force propagation on encoding change. */ diff --git a/tests/unit/hyperloglog.tcl b/tests/unit/hyperloglog.tcl index e241288d..07f8b777 100644 --- a/tests/unit/hyperloglog.tcl +++ b/tests/unit/hyperloglog.tcl @@ -60,9 +60,9 @@ start_server {tags {"hll"}} { r pfcount hll } {5} - test {PFGETREG returns the HyperLogLog raw registers} { + test {PFDEBUG GETREG returns the HyperLogLog raw registers} { r del hll r pfadd hll 1 2 3 - llength [r pfgetreg hll] + llength [r pfdebug getreg hll] } {16384} }