diff --git a/src/t_stream.c b/src/t_stream.c index a1d3f8a1..03860b8e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -208,6 +208,11 @@ typedef struct streamIterator { raxIterator ri; /* Rax iterator. */ unsigned char *lp; /* Current listpack. */ unsigned char *lp_ele; /* Current listpack cursor. */ + /* Buffers used to hold the string of lpGet() when the element is + * integer encoded, so that there is no string representation of the + * element inside the listpack itself. */ + unsigned char field_buf[LP_INTBUF_SIZE]; + unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator; /* Initialize the stream iterator, so that we can call iterating functions @@ -289,6 +294,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) { return 0; /* We are already out of range. */ streamDecodeID(e,id); *numfields = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); return 1; /* Valid item returned. */ } else { /* If we do not emit, we have to discard. */ @@ -303,6 +309,19 @@ int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) { } } +/* Get the field and value of the current item we are iterating. This should + * be called immediately after streamIteratorGetID(), and for each field + * according to the number of fields returned by streamIteratorGetID(). + * The function populates the field and value pointers and the corresponding + * lengths by reference, that are valid until the next iterator call, assuming + * no one touches the stream meanwhile. */ +void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { + *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); + si->lp_ele = lpNext(si->lp,si->lp_ele); + *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); + si->lp_ele = lpNext(si->lp,si->lp_ele); +} + /* Stop the stream iterator. The only cleanup we need is to free the rax * itereator, since the stream iterator itself is supposed to be stack * allocated. */