diff --git a/src/rdb.c b/src/rdb.c index 5d15539c..17a93275 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -781,6 +781,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { while (raxNext(&ri)) { unsigned char *lp = ri.data; size_t lp_bytes = lpBytes(lp); + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; + nwritten += n; if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1; nwritten += n; } @@ -1448,27 +1450,31 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { uint64_t listpacks = rdbLoadLen(rdb,NULL); while(listpacks--) { + /* Get the master ID, the one we'll use as key of the radix tree + * node: the entries inside the listpack itself are delta-encoded + * relatively to this ID. */ + sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + if (sdslen(nodekey) != sizeof(streamID)) { + rdbExitReportCorruptRDB("Stream node key entry is not the " + "size of a stream ID"); + } + + /* Load the listpack. */ unsigned char *lp = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); if (lp == NULL) return NULL; unsigned char *first = lpFirst(lp); if (first == NULL) { - /* Serialized listpacks should never be free, since on + /* Serialized listpacks should never be empty, since on * deletion we should remove the radix tree key if the * resulting listpack is emtpy. */ rdbExitReportCorruptRDB("Empty listpack inside stream"); } - /* Get the ID of the first entry: we'll use it as key to add the - * listpack into the radix tree. */ - int64_t e_len; - unsigned char buf[LP_INTBUF_SIZE]; - unsigned char *e = lpGet(first,&e_len,buf); - if (e_len != sizeof(streamID)) { - rdbExitReportCorruptRDB("Listpack first entry is not the " - "size of a stream ID"); - } - int retval = raxInsert(s->rax,e,sizeof(streamID),lp,NULL); + /* Insert the key in the radix tree. */ + int retval = raxInsert(s->rax, + (unsigned char*)nodekey,sizeof(streamID),lp,NULL); + sdsfree(nodekey); if (!retval) rdbExitReportCorruptRDB("Listpack re-added with existing key"); } diff --git a/src/t_stream.c b/src/t_stream.c index bfc6e4c9..00d07ac5 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -83,6 +83,16 @@ unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { return lpAppend(lp,(unsigned char*)buf,slen); } +/* This is just a wrapper for lpReplace() to directly use a 64 bit integer + * instead of a string to replace the current element. The function returns + * the new listpack as return value, and also updates the current cursor + * by updating '*pos'. */ +unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) { + char buf[LONG_STR_SIZE]; + int slen = ll2string(buf,sizeof(buf),value); + return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos); +} + /* This is a wrapper function for lpGet() to directly get an integer value * from the listpack (that may store numbers as a string), converting * the string if needed. */ @@ -179,26 +189,31 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, /* Create a new listpack and radix tree node if needed. Note that when * a new listpack is created, we populate it with a "master entry". This - * is just an ID and a set of fields that is taken as refernce in order - * to compress the stream entries that we'll add inside the listpack. + * is just a set of fields that is taken as refernce in order to compress + * the stream entries that we'll add inside the listpack. * - * Note that while we use the first added entry ID and fields to create + * Note that while we use the first added entry fields to create * the master entry, the first added entry is NOT represented in the master * entry, which is a stand alone object. But of course, the first entry * will compress well because it's used as reference. * - * The master entry is composed of just: an ID and a set of fields, like: + * The master entry is composed like in the following example: * - * +------------+------------+---------+---------+--/--+---------+ - * | 128 bit ID | num-fields | field_1 | field_2 | ... | field_N | - * +------------+------------+---------+---------+--/--+---------+ + * +-------+---------+------------+---------+--/--+---------+---------+ + * | count | deleted | num-fields | field_1 | field_2 | ... | field_N | + * +-------+---------+------------+---------+--/--+---------+---------+ + * + * count and deleted just represent respectively the total number of + * entires inside the listpack that are valid, and marked as deleted + * (delted flag in the entry flags set). So the total number of items + * actually inside the listpack (both deleted and not) is count+deleted. * * The real entries will be encoded with an ID that is just the - * millisecond and sequence difference compared to the master entry - * (delta encoding), and if the fields of the entry are the same as - * the master enty fields, the entry flags will specify this fact - * and the entry fields and number of fields will be omitted (see later - * in the code of this function). */ + * millisecond and sequence difference compared to the key stored at + * the radix tree node containing the listpack (delta encoding), and + * if the fields of the entry are the same as the master enty fields, the + * entry flags will specify this fact and the entry fields and number + * of fields will be omitted (see later in the code of this function). */ int flags = STREAM_ITEM_FLAG_NONE; if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { @@ -206,7 +221,8 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamEncodeID(rax_key,&id); /* Create the listpack having the master entry ID and fields. */ lp = lpNew(); - lp = lpAppend(lp,(unsigned char*)rax_key,sizeof(rax_key)); + lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ + lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ lp = lpAppendInteger(lp,numfields); for (int i = 0; i < numfields; i++) { sds field = argv[i*2]->ptr; @@ -220,14 +236,15 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, serverAssert(ri.key_len == sizeof(rax_key)); memcpy(rax_key,ri.key,sizeof(rax_key)); - /* Read the master entry ID. */ - int64_t e_len; + /* Read the master ID from the radix tree key. */ + streamDecodeID(rax_key,&master_id); unsigned char *lp_ele = lpFirst(lp); - unsigned char buf[LP_INTBUF_SIZE]; - unsigned char *e = lpGet(lp_ele,&e_len,buf); - serverAssert(e_len == sizeof(streamID)); - streamDecodeID(e,&master_id); - lp_ele = lpNext(lp,lp_ele); + + /* Update count and skip the deleted fields. */ + int64_t count = lpGetInteger(lp_ele); + lp = lpReplaceInteger(lp,&lp_ele,count+1); + lp_ele = lpNext(lp,lp_ele); /* seek delted. */ + lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ /* Check if the entry we are adding, have the same fields * as the master entry. */ @@ -237,6 +254,8 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, int i; for (i = 0; i < master_fields_count; i++) { sds field = argv[i*2]->ptr; + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; unsigned char *e = lpGet(lp_ele,&e_len,buf); /* Stop if there is a mismatch. */ if (sdslen(field) != (size_t)e_len || @@ -348,16 +367,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { if (si->lp == NULL || si->lp_ele == NULL) { if (!raxNext(&si->ri)) return 0; serverAssert(si->ri.key_len == sizeof(streamID)); - si->lp = si->ri.data; - si->lp_ele = lpFirst(si->lp); /* Seek the master ID. */ /* Get the master ID. */ - int64_t e_len; - unsigned char buf[LP_INTBUF_SIZE]; - unsigned char *e = lpGet(si->lp_ele,&e_len,buf); - serverAssert(e_len == sizeof(streamID)); - streamDecodeID(e,&si->master_id); - si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek fields count. */ + streamDecodeID(si->ri.key,&si->master_id); /* Get the master fields count. */ + si->lp = si->ri.data; + si->lp_ele = lpFirst(si->lp); /* Seek items count */ + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ si->master_fields_count = lpGetInteger(si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ si->master_fields_start = si->lp_ele;