diff --git a/src/t_stream.c b/src/t_stream.c index 61b229a5..14eba44c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -199,9 +199,9 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, * * The master entry is composed like in the following example: * - * +-------+---------+------------+---------+--/--+---------+---------+ - * | count | deleted | num-fields | field_1 | field_2 | ... | field_N | - * +-------+---------+------------+---------+--/--+---------+---------+ + * +-------+---------+------------+---------+--/--+---------+---------+-+ + * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| + * +-------+---------+------------+---------+--/--+---------+---------+-+ * * count and deleted just represent respectively the total number of * entires inside the listpack that are valid, and marked as deleted @@ -213,7 +213,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, * the radix tree node containing the listpack (delta encoding), and * if the fields of the entry are the same as the master enty fields, the * entry flags will specify this fact and the entry fields and number - * of fields will be omitted (see later in the code of this function). */ + * of fields will be omitted (see later in the code of this function). + * + * The "0" entry at the end is the same as the 'lp-count' entry in the + * regular stream entries (see below), and marks the fact that there are + * no more entires, when we scan the stream from right to left. */ int flags = STREAM_ITEM_FLAG_NONE; if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { @@ -228,6 +232,7 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, sds field = argv[i*2]->ptr; lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); } + lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); /* The first entry we insert, has obviously the same fields of the * master entry. */ @@ -271,20 +276,25 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, /* Populate the listpack with the new entry. We use the following * encoding: * - * +-----+--------+----------+-------+-------+-/-+-------+-------+ - * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N| - * +-----+--------+----------+-------+-------+-/-+-------+-------+ + * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ + * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| + * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ * * However if the SAMEFIELD flag is set, we have just to populate * the entry with the values, so it becomes: * - * +-----+--------+-------+-/-+-------+ - * |flags|entry-id|value-1|...|value-N| - * +-----+--------+-------+-/-+-------+ + * +-----+--------+-------+-/-+-------+--------+ + * |flags|entry-id|value-1|...|value-N|lp-count| + * +-----+--------+-------+-/-+-------+--------+ * * The entry-id field is actually two separated fields: the ms * and seq difference compared to the master entry. - */ + * + * The lp-count field is a number that states the number of listpack pieces + * that compose the entry, so that it's possible to travel the entry + * in reverse order: we can just start from the end of the listpack, read + * the entry, and jump back N times to seek the "flags" field to read + * the stream full entry. */ lp = lpAppendInteger(lp,flags); lp = lpAppendInteger(lp,id.ms - master_id.ms); lp = lpAppendInteger(lp,id.seq - master_id.seq); @@ -296,6 +306,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); } + /* Compute and store the lp-count field. */ + int lp_count = numfields; + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp_count *= 2; + lp_count += 3; /* Add the 3 fixed fileds flags + ms-diff + seq-diff. */ + lp = lpAppendInteger(lp,lp_count); /* Insert back into the tree in order to update the listpack pointer. */ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); @@ -361,6 +376,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { p = lpNext(lp,p); /* Seek the first field. */ for (int64_t j = 0; j < master_fields_count; j++) p = lpNext(lp,p); /* Skip all master fields. */ + p = lpNext(lp,p); /* Skip the zero master entry terminator. */ /* 'p' is now pointing to the first entry inside the listpack. * We have to run entry after entry, marking entries as deleted @@ -389,6 +405,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { } while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ + p = lpNext(lp,p); /* Skip the final lp-count field. */ } /* Here we should perform garbage collection in case at this point @@ -482,11 +499,17 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { /* Skip master fileds to seek the first entry. */ for (uint64_t i = 0; i < si->master_fields_count; i++) si->lp_ele = lpNext(si->lp,si->lp_ele); + /* We are now pointing the zero term of the master entry. */ } /* For every radix tree node, iterate the corresponding listpack, * returning elements when they are within range. */ - while(si->lp_ele) { + while(1) { + /* Skip the previous entry lp-count field, or in case of the + * master entry, the zero term field. */ + si->lp_ele = lpNext(si->lp,si->lp_ele); + if (si->lp_ele == NULL) break; + /* Get the flags entry. */ int flags = lpGetInteger(si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */