Fix DEBUG DIGEST, SORT and AOF rewrite

This commit is contained in:
Pieter Noordhuis 2011-03-14 13:30:06 +01:00
parent 9ec4ea20a7
commit dddf5335f4
4 changed files with 100 additions and 25 deletions

View File

@ -429,21 +429,55 @@ int rewriteAppendOnlyFile(char *filename) {
}
} else if (o->type == REDIS_ZSET) {
/* Emit the ZADDs needed to rebuild the sorted set */
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
char cmd[]="*4\r\n$4\r\nZADD\r\n";
while((de = dictNext(di)) != NULL) {
char cmd[]="*4\r\n$4\r\nZADD\r\n";
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vll;
double score;
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkDouble(fp,*score) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
eptr = ziplistIndex(zl,0);
redisAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
while (eptr != NULL) {
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkDouble(fp,score) == 0) goto werr;
if (vstr != NULL) {
if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
goto werr;
} else {
if (fwriteBulkLongLong(fp,vll) == 0)
goto werr;
}
zzlNext(zl,&eptr,&sptr);
}
} else if (o->encoding == REDIS_ENCODING_RAW) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkDouble(fp,*score) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
}
dictReleaseIterator(di);
} else {
redisPanic("Unknown sorted set encoding");
}
dictReleaseIterator(di);
} else if (o->type == REDIS_HASH) {
char cmd[]="*4\r\n$4\r\nHSET\r\n";

View File

@ -127,22 +127,57 @@ void computeDatasetDigest(unsigned char *final) {
}
setTypeReleaseIterator(si);
} else if (o->type == REDIS_ZSET) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
unsigned char eledigest[20];
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
unsigned char eledigest[20];
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vll;
double score;
snprintf(buf,sizeof(buf),"%.17g",*score);
memset(eledigest,0,20);
mixObjectDigest(eledigest,eleobj);
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
eptr = ziplistIndex(zl,0);
redisAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
while (eptr != NULL) {
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);
memset(eledigest,0,20);
if (vstr != NULL) {
mixDigest(eledigest,vstr,vlen);
} else {
ll2string(buf,sizeof(buf),vll);
mixDigest(eledigest,buf,strlen(buf));
}
snprintf(buf,sizeof(buf),"%.17g",score);
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
zzlNext(zl,&eptr,&sptr);
}
} else if (o->encoding == REDIS_ENCODING_RAW) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
snprintf(buf,sizeof(buf),"%.17g",*score);
memset(eledigest,0,20);
mixObjectDigest(eledigest,eleobj);
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
}
dictReleaseIterator(di);
} else {
redisPanic("Unknown sorted set encoding");
}
dictReleaseIterator(di);
} else if (o->type == REDIS_HASH) {
hashTypeIterator *hi;
robj *obj;

View File

@ -800,6 +800,9 @@ zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
double zzlGetScore(unsigned char *sptr);
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding);

View File

@ -199,6 +199,9 @@ void sortCommand(redisClient *c) {
j++;
}
/* Destructively convert encoded sorted sets for SORT. */
if (sortval->type == REDIS_ZSET) zsetConvert(sortval, REDIS_ENCODING_RAW);
/* Load the sorting vector with all the objects to sort */
switch(sortval->type) {
case REDIS_LIST: vectorlen = listTypeLength(sortval); break;