Add ziplistMerge()

This started out as #2158 by sunheehnus, but I kept rewriting it
until I could understand things more easily and get a few more
correctness guarantees out of the readability flow.

The original commit created and returned a new ziplist with the contents of
both input ziplists, but I prefer to grow one of the input ziplists
and destroy the other one.

So, instead of malloc+copy as in #2158, the merge now reallocs one of
the existing ziplists and copies the other ziplist into the new space.

Also added merge test cases to ziplistTest()
This commit is contained in:
Matt Stancliff 2014-11-21 14:52:10 -05:00
parent 5e362b84ab
commit 9d2dc0249c
3 changed files with 210 additions and 54 deletions

View File

@ -351,65 +351,28 @@ int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data,
static quicklistNode *_quicklistZiplistMerge(quicklist *quicklist, static quicklistNode *_quicklistZiplistMerge(quicklist *quicklist,
quicklistNode *a, quicklistNode *a,
quicklistNode *b) { quicklistNode *b) {
/* Merge into node with largest initial count */ D("Requested merge (a,b) (%u, %u)", a->count, b->count);
quicklistNode *target = a->count > b->count ? a : b;
if (a->count == 0 || b->count == 0) if ((ziplistMerge(&a->zl, &b->zl))) {
return NULL; /* We merged ziplists! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL;
if (!a->zl) {
nokeep = a;
keep = b;
} else if (!b->zl) {
nokeep = b;
keep = a;
}
keep->count = ziplistLen(keep->zl);
D("Requested merge (a,b) (%u, %u) and picked target %u", a->count, b->count, nokeep->count = 0;
target->count); __quicklistDelNode(quicklist, nokeep);
int where; return keep;
unsigned char *p = NULL;
if (target == a) {
/* If target is node a, we append node b to node a, in-order */
where = ZIPLIST_TAIL;
p = ziplistIndex(b->zl, 0);
D("WILL TRAVERSE B WITH LENGTH: %u, %u", b->count, ziplistLen(b->zl));
} else { } else {
/* If target b, we prepend node a to node b, in reverse order of a */ /* else, the merge returned NULL and nothing changed. */
where = ZIPLIST_HEAD; return NULL;
p = ziplistIndex(a->zl, -1);
D("WILL TRAVERSE A WITH LENGTH: %u, %u", a->count, ziplistLen(a->zl));
} }
unsigned char *val;
unsigned int sz;
long long longval;
char lv[32] = { 0 };
/* NOTE: We could potentially create a built-in ziplist operation
* allowing direct merging of two ziplists. It would be more memory
* efficient (one big realloc instead of incremental), but it's more
* complex than using the existing ziplist API to read/push as below. */
while (ziplistGet(p, &val, &sz, &longval)) {
if (!val) {
sz = ll2string(lv, sizeof(lv), longval);
val = (unsigned char *)lv;
}
target->zl = ziplistPush(target->zl, val, sz, where);
if (target == a) {
p = ziplistNext(b->zl, p);
b->count--;
a->count++;
} else {
p = ziplistPrev(a->zl, p);
a->count--;
b->count++;
}
D("Loop A: %u, B: %u", a->count, b->count);
}
/* At this point, target is populated and not-target needs
* to be free'd and removed from the quicklist. */
if (target == a) {
D("Deleting node B with current count: %d", b->count);
__quicklistDelNode(quicklist, b);
} else if (target == b) {
D("Deleting node A with current count: %d", a->count);
__quicklistDelNode(quicklist, a);
}
return target;
} }
/* Attempt to merge ziplists within two nodes on either side of 'center'. /* Attempt to merge ziplists within two nodes on either side of 'center'.

View File

@ -143,6 +143,7 @@
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1) #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
@ -176,6 +177,8 @@ typedef struct zlentry {
if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \ if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \
} while(0) } while(0)
void ziplistRepr(unsigned char *zl);
/* Return bytes needed to store integer encoded by 'encoding' */ /* Return bytes needed to store integer encoded by 'encoding' */
static unsigned int zipIntSize(unsigned char encoding) { static unsigned int zipIntSize(unsigned char encoding) {
switch(encoding) { switch(encoding) {
@ -670,6 +673,121 @@ static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsig
return zl; return zl;
} }
/* Merge ziplists 'first' and 'second' by appending 'second' to 'first'.
*
* NOTE: The larger ziplist is reallocated to contain the new merged ziplist.
* Either 'first' or 'second' can be used for the result. The parameter not
* used will be free'd and set to NULL.
*
* After calling this function, the input parameters are no longer valid since
* they are changed and free'd in-place.
*
* The result ziplist is the contents of 'first' followed by 'second'.
*
* On failure: returns NULL if the merge is impossible.
* On success: returns the merged ziplist (which is expanded version of either
* 'first' or 'second', also frees the other unused input ziplist, and sets the
* input ziplist argument equal to newly reallocated ziplist return value. */
unsigned char *ziplistMerge(unsigned char **first, unsigned char **second) {
/* If any params are null, we can't merge, so NULL. */
if (first == NULL || *first == NULL || second == NULL || *second == NULL)
return NULL;
/* Can't merge same list into itself. */
if (*first == *second)
return NULL;
size_t first_bytes = intrev32ifbe(ZIPLIST_BYTES(*first));
size_t first_len = intrev16ifbe(ZIPLIST_LENGTH(*first));
size_t second_bytes = intrev32ifbe(ZIPLIST_BYTES(*second));
size_t second_len = intrev16ifbe(ZIPLIST_LENGTH(*second));
int append;
unsigned char *source, *target;
size_t target_bytes, source_bytes;
/* Pick the largest ziplist so we can resize easily in-place.
* We must also track if we are now appending or prepending to
* the target ziplist. */
if (first_len >= second_len) {
/* retain first, append second to first. */
target = *first;
target_bytes = first_bytes;
source = *second;
source_bytes = second_bytes;
append = 1;
} else {
/* else, retain second, prepend first to second. */
target = *second;
target_bytes = second_bytes;
source = *first;
source_bytes = first_bytes;
append = 0;
}
/* Calculate final bytes (subtract one pair of metadata) */
size_t zlbytes = first_bytes + second_bytes -
ZIPLIST_HEADER_SIZE - ZIPLIST_END_SIZE;
size_t zllength = first_len + second_len;
/* Combined zl length should be limited within UINT16_MAX */
zllength = zllength < UINT16_MAX ? zllength : UINT16_MAX;
/* Save offset positions before we start ripping memory apart. */
size_t first_offset = intrev32ifbe(ZIPLIST_TAIL_OFFSET(*first));
size_t second_offset = intrev32ifbe(ZIPLIST_TAIL_OFFSET(*second));
/* Extend target to new zlbytes then append or prepend source. */
target = zrealloc(target, zlbytes);
if (append) {
/* append == appending to target */
/* Copy source after target (copying over original [END]):
* [TARGET - END, SOURCE - HEADER] */
memcpy(target + target_bytes - ZIPLIST_END_SIZE,
source + ZIPLIST_HEADER_SIZE,
source_bytes - ZIPLIST_HEADER_SIZE);
} else {
/* !append == prepending to target */
/* Move target *contents* exactly size of (source - [END]),
* then copy source into vacataed space (source - [END]):
* [SOURCE - END, TARGET - HEADER] */
memmove(target + source_bytes - ZIPLIST_END_SIZE,
target + ZIPLIST_HEADER_SIZE,
target_bytes - ZIPLIST_HEADER_SIZE);
memcpy(target, source, source_bytes - ZIPLIST_END_SIZE);
}
/* Update header metadata. */
ZIPLIST_BYTES(target) = intrev32ifbe(zlbytes);
ZIPLIST_LENGTH(target) = intrev16ifbe(zllength);
/* New tail offset is:
* + N bytes of first ziplist
* - 1 byte for [END] of first ziplist
* + M bytes for the offset of the original tail of the second ziplist
* - J bytes for HEADER because second_offset keeps no header. */
ZIPLIST_TAIL_OFFSET(target) = intrev32ifbe(
(first_bytes - ZIPLIST_END_SIZE) +
(second_offset - ZIPLIST_HEADER_SIZE));
/* __ziplistCascadeUpdate just fixes the prev length values until it finds a
* correct prev length value (then it assumes the rest of the list is okay).
* We tell CascadeUpdate to start at the first ziplist's tail element to fix
* the merge seam. */
target = __ziplistCascadeUpdate(target, target+first_offset);
/* Now free and NULL out what we didn't realloc */
if (append) {
zfree(*second);
*second = NULL;
*first = target;
} else {
zfree(*first);
*first = NULL;
*second = target;
}
return target;
}
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) { unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p; unsigned char *p;
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl); p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
@ -1456,6 +1574,80 @@ int ziplistTest(int argc, char **argv) {
printf("SUCCESS\n\n"); printf("SUCCESS\n\n");
} }
printf("Merge test:\n");
{
/* create list gives us: [hello, foo, quux, 1024] */
zl = createList();
unsigned char *zl2 = createList();
unsigned char *zl3 = ziplistNew();
unsigned char *zl4 = ziplistNew();
if (ziplistMerge(&zl4, &zl4)) {
printf("ERROR: Allowed merging of one ziplist into itself.\n");
return 1;
}
/* Merge two empty ziplists, get empty result back. */
zl4 = ziplistMerge(&zl3, &zl4);
ziplistRepr(zl4);
if (ziplistLen(zl4)) {
printf("ERROR: Merging two empty ziplists created entries.\n");
return 1;
}
zfree(zl4);
zl2 = ziplistMerge(&zl, &zl2);
/* merge gives us: [hello, foo, quux, 1024, hello, foo, quux, 1024] */
ziplistRepr(zl2);
if (ziplistLen(zl2) != 8) {
printf("ERROR: Merged length not 8, but: %u\n", ziplistLen(zl2));
return 1;
}
p = ziplistIndex(zl2,0);
if (!ziplistCompare(p,(unsigned char*)"hello",5)) {
printf("ERROR: not \"hello\"\n");
return 1;
}
if (ziplistCompare(p,(unsigned char*)"hella",5)) {
printf("ERROR: \"hella\"\n");
return 1;
}
p = ziplistIndex(zl2,3);
if (!ziplistCompare(p,(unsigned char*)"1024",4)) {
printf("ERROR: not \"1024\"\n");
return 1;
}
if (ziplistCompare(p,(unsigned char*)"1025",4)) {
printf("ERROR: \"1025\"\n");
return 1;
}
p = ziplistIndex(zl2,4);
if (!ziplistCompare(p,(unsigned char*)"hello",5)) {
printf("ERROR: not \"hello\"\n");
return 1;
}
if (ziplistCompare(p,(unsigned char*)"hella",5)) {
printf("ERROR: \"hella\"\n");
return 1;
}
p = ziplistIndex(zl2,7);
if (!ziplistCompare(p,(unsigned char*)"1024",4)) {
printf("ERROR: not \"1024\"\n");
return 1;
}
if (ziplistCompare(p,(unsigned char*)"1025",4)) {
printf("ERROR: \"1025\"\n");
return 1;
}
printf("SUCCESS\n\n");
}
printf("Stress with random payloads of different encoding:\n"); printf("Stress with random payloads of different encoding:\n");
{ {
int i,j,len,where; int i,j,len,where;

View File

@ -32,6 +32,7 @@
#define ZIPLIST_TAIL 1 #define ZIPLIST_TAIL 1
unsigned char *ziplistNew(void); unsigned char *ziplistNew(void);
unsigned char *ziplistMerge(unsigned char **first, unsigned char **second);
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where); unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);
unsigned char *ziplistIndex(unsigned char *zl, int index); unsigned char *ziplistIndex(unsigned char *zl, int index);
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p); unsigned char *ziplistNext(unsigned char *zl, unsigned char *p);