1
0
mirror of https://github.com/fluencelabs/redis synced 2025-03-26 04:11:04 +00:00

Streams: augment stream entries to allow backward scanning.

This commit is contained in:
antirez 2017-11-17 10:16:30 +01:00
parent 0381931b4c
commit 3c5d773f82

@ -199,9 +199,9 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* *
* The master entry is composed like in the following example: * The master entry is composed like in the following example:
* *
* +-------+---------+------------+---------+--/--+---------+---------+ * +-------+---------+------------+---------+--/--+---------+---------+-+
* | count | deleted | num-fields | field_1 | field_2 | ... | field_N | * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
* +-------+---------+------------+---------+--/--+---------+---------+ * +-------+---------+------------+---------+--/--+---------+---------+-+
* *
* count and deleted just represent respectively the total number of * count and deleted just represent respectively the total number of
* entires inside the listpack that are valid, and marked as deleted * entires inside the listpack that are valid, and marked as deleted
@ -213,7 +213,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* the radix tree node containing the listpack (delta encoding), and * the radix tree node containing the listpack (delta encoding), and
* if the fields of the entry are the same as the master enty fields, the * if the fields of the entry are the same as the master enty fields, the
* entry flags will specify this fact and the entry fields and number * entry flags will specify this fact and the entry fields and number
* of fields will be omitted (see later in the code of this function). */ * of fields will be omitted (see later in the code of this function).
*
* The "0" entry at the end is the same as the 'lp-count' entry in the
* regular stream entries (see below), and marks the fact that there are
* no more entires, when we scan the stream from right to left. */
int flags = STREAM_ITEM_FLAG_NONE; int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
@ -228,6 +232,7 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
sds field = argv[i*2]->ptr; sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
} }
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the /* The first entry we insert, has obviously the same fields of the
* master entry. */ * master entry. */
@ -271,20 +276,25 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
/* Populate the listpack with the new entry. We use the following /* Populate the listpack with the new entry. We use the following
* encoding: * encoding:
* *
* +-----+--------+----------+-------+-------+-/-+-------+-------+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N| * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
* +-----+--------+----------+-------+-------+-/-+-------+-------+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* *
* However if the SAMEFIELD flag is set, we have just to populate * However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes: * the entry with the values, so it becomes:
* *
* +-----+--------+-------+-/-+-------+ * +-----+--------+-------+-/-+-------+--------+
* |flags|entry-id|value-1|...|value-N| * |flags|entry-id|value-1|...|value-N|lp-count|
* +-----+--------+-------+-/-+-------+ * +-----+--------+-------+-/-+-------+--------+
* *
* The entry-id field is actually two separated fields: the ms * The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry. * and seq difference compared to the master entry.
*/ *
* The lp-count field is a number that states the number of listpack pieces
* that compose the entry, so that it's possible to travel the entry
* in reverse order: we can just start from the end of the listpack, read
* the entry, and jump back N times to seek the "flags" field to read
* the stream full entry. */
lp = lpAppendInteger(lp,flags); lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms); lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq); lp = lpAppendInteger(lp,id.seq - master_id.seq);
@ -296,6 +306,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
} }
/* Compute and store the lp-count field. */
int lp_count = numfields;
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp_count *= 2;
lp_count += 3; /* Add the 3 fixed fileds flags + ms-diff + seq-diff. */
lp = lpAppendInteger(lp,lp_count);
/* Insert back into the tree in order to update the listpack pointer. */ /* Insert back into the tree in order to update the listpack pointer. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
@ -361,6 +376,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
p = lpNext(lp,p); /* Seek the first field. */ p = lpNext(lp,p); /* Seek the first field. */
for (int64_t j = 0; j < master_fields_count; j++) for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p); /* Skip all master fields. */ p = lpNext(lp,p); /* Skip all master fields. */
p = lpNext(lp,p); /* Skip the zero master entry terminator. */
/* 'p' is now pointing to the first entry inside the listpack. /* 'p' is now pointing to the first entry inside the listpack.
* We have to run entry after entry, marking entries as deleted * We have to run entry after entry, marking entries as deleted
@ -389,6 +405,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
} }
while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
p = lpNext(lp,p); /* Skip the final lp-count field. */
} }
/* Here we should perform garbage collection in case at this point /* Here we should perform garbage collection in case at this point
@ -482,11 +499,17 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* Skip master fileds to seek the first entry. */ /* Skip master fileds to seek the first entry. */
for (uint64_t i = 0; i < si->master_fields_count; i++) for (uint64_t i = 0; i < si->master_fields_count; i++)
si->lp_ele = lpNext(si->lp,si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele);
/* We are now pointing the zero term of the master entry. */
} }
/* For every radix tree node, iterate the corresponding listpack, /* For every radix tree node, iterate the corresponding listpack,
* returning elements when they are within range. */ * returning elements when they are within range. */
while(si->lp_ele) { while(1) {
/* Skip the previous entry lp-count field, or in case of the
* master entry, the zero term field. */
si->lp_ele = lpNext(si->lp,si->lp_ele);
if (si->lp_ele == NULL) break;
/* Get the flags entry. */ /* Get the flags entry. */
int flags = lpGetInteger(si->lp_ele); int flags = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */