diff --git a/src/aof.c b/src/aof.c index 5fbfdd69..79962fd0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1035,7 +1035,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { * The function returns 0 on error, 1 on success. */ int rewriteStreamObject(rio *r, robj *key, robj *o) { streamIterator si; - streamIteratorStart(&si,o->ptr,NULL,NULL); + streamIteratorStart(&si,o->ptr,NULL,NULL,0); streamID id; int64_t numfields; diff --git a/src/blocked.c b/src/blocked.c index 734e6ffd..f438c335 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -326,7 +326,7 @@ void handleClientsBlockedOnKeys(void) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,rl->key); streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count); + receiver->bpop.xread_count,0); } } } diff --git a/src/stream.h b/src/stream.h index df29e9e7..214b6d9a 100644 --- a/src/stream.h +++ b/src/stream.h @@ -28,9 +28,10 @@ typedef struct stream { typedef struct streamIterator { streamID master_id; /* ID of the master entry at listpack head. */ uint64_t master_fields_count; /* Master entries # of fields. */ - unsigned char *master_fields_start; /* Master entries start in listapck. */ + unsigned char *master_fields_start; /* Master entries start in listpack. */ unsigned char *master_fields_ptr; /* Master field to emit next. */ int entry_flags; /* Flags of entry we are emitting. */ + int rev; /* True if iterating end to start (reverse). */ uint64_t start_key[2]; /* Start key as 128 bit big endian. */ uint64_t end_key[2]; /* End key as 128 bit big endian. */ raxIterator ri; /* Rax iterator. */ @@ -49,8 +50,8 @@ struct client; stream *streamNew(void); void freeStream(stream *s); -size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count); -void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev); +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); diff --git a/src/t_stream.c b/src/t_stream.c index 14eba44c..945fc28c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -426,7 +426,9 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { /* Initialize the stream iterator, so that we can call iterating functions * to get the next items. This requires a corresponding streamIteratorStop() - * at the end. + * at the end. The 'rev' parameter controls the direction. If it's zero the + * iteration is from the start to the end element (inclusive), otherwise + * if rev is non-zero, the iteration is reversed. * * Once the iterator is initalized, we iterate like this: * @@ -443,7 +445,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { * } * } * streamIteratorStop(&myiterator); */ -void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) { +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { /* Intialize the iterator and translates the iteration start/stop * elements into a 128 big big-endian number. */ if (start) { @@ -462,17 +464,26 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI /* Seek the correct node in the radix tree. */ raxStart(&si->ri,s->rax); - if (start && (start->ms || start->seq)) { - raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, - sizeof(si->start_key)); - if (raxEOF(&si->ri)) - raxSeek(&si->ri,">",(unsigned char*)si->start_key, + if (!rev) { + if (start && (start->ms || start->seq)) { + raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, sizeof(si->start_key)); + if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0); + } else { + raxSeek(&si->ri,"^",NULL,0); + } } else { - raxSeek(&si->ri,"^",NULL,0); + if (end && (end->ms || end->seq)) { + raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, + sizeof(si->end_key)); + if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0); + } else { + raxSeek(&si->ri,"$",NULL,0); + } } si->lp = NULL; /* There is no current listpack right now. */ si->lp_ele = NULL; /* Current listpack cursor. */ + si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ } /* Return 1 and store the current item ID at 'id' if there are still @@ -484,7 +495,8 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { * iteration or the previous listpack was completely iterated. * Go to the next node. */ if (si->lp == NULL || si->lp_ele == NULL) { - if (!raxNext(&si->ri)) return 0; + if (!si->rev && !raxNext(&si->ri)) return 0; + else if (si->rev && !raxPrev(&si->ri)) return 0; serverAssert(si->ri.key_len == sizeof(streamID)); /* Get the master ID. */ streamDecodeID(si->ri.key,&si->master_id); @@ -499,16 +511,38 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { /* Skip master fileds to seek the first entry. */ for (uint64_t i = 0; i < si->master_fields_count; i++) si->lp_ele = lpNext(si->lp,si->lp_ele); - /* We are now pointing the zero term of the master entry. */ + /* We are now pointing the zero term of the master entry. If + * we are iterating in reverse order, we need to seek the + * end of the listpack. */ + if (si->rev) si->lp_ele = lpLast(si->lp); + } else if (si->rev) { + /* If we are itereating in the reverse order, and this is not + * the first entry emitted for this listpack, then we already + * emitted the current entry, and have to go back to the previous + * one. */ + int lp_count = lpGetInteger(si->lp_ele); + while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); + /* Seek lp-count of prev entry. */ + si->lp_ele = lpPrev(si->lp,si->lp_ele); } /* For every radix tree node, iterate the corresponding listpack, * returning elements when they are within range. */ while(1) { - /* Skip the previous entry lp-count field, or in case of the - * master entry, the zero term field. */ - si->lp_ele = lpNext(si->lp,si->lp_ele); - if (si->lp_ele == NULL) break; + if (!si->rev) { + /* If we are going forward, skip the previous entry + * lp-count field (or in case of the master entry, the zero + * term field) */ + si->lp_ele = lpNext(si->lp,si->lp_ele); + if (si->lp_ele == NULL) break; + } else { + /* If we are going backward, read the number of elements this + * entry is composed of, and jump backward N times to seek + * its start. */ + int lp_count = lpGetInteger(si->lp_ele); + if (lp_count == 0) break; /* We reached the master entry. */ + while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); + } /* Get the flags entry. */ int flags = lpGetInteger(si->lp_ele); @@ -535,15 +569,28 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { /* If current >= start, and the entry is not marked as * deleted, emit it. */ - if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && - !(flags & STREAM_ITEM_FLAG_DELETED)) - { - if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) - return 0; /* We are already out of range. */ - si->entry_flags = flags; - if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) - si->master_fields_ptr = si->master_fields_start; - return 1; /* Valid item returned. */ + if (!si->rev) { + if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && + !(flags & STREAM_ITEM_FLAG_DELETED)) + { + if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) + return 0; /* We are already out of range. */ + si->entry_flags = flags; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) + si->master_fields_ptr = si->master_fields_start; + return 1; /* Valid item returned. */ + } + } else { + if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && + !(flags & STREAM_ITEM_FLAG_DELETED)) + { + if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) + return 0; /* We are already out of range. */ + si->entry_flags = flags; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) + si->master_fields_ptr = si->master_fields_start; + return 1; /* Valid item returned. */ + } } /* If we do not emit, we have to discard. */ @@ -553,7 +600,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { si->lp_ele = lpNext(si->lp,si->lp_ele); } - /* End of listpack reached. Try the next radix tree node. */ + /* End of listpack reached. Try the next/prev radix tree node. */ } } @@ -585,15 +632,16 @@ void streamIteratorStop(streamIterator *si) { /* Send the specified range to the client 'c'. The range the client will * receive is between start and end inclusive, if 'count' is non zero, no more * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that - * we want all the elements from 'start' till the end of the stream. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) { + * we want all the elements from 'start' till the end of the stream. If 'rev' + * is non zero, elements are produced in reversed order from end to start. */ +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) { void *arraylen_ptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; streamIterator si; int64_t numfields; streamID id; - streamIteratorStart(&si,s,start,end); + streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ @@ -797,25 +845,32 @@ void xaddCommand(client *c) { signalKeyAsReady(c->db, c->argv[1]); } -/* XRANGE key start end [COUNT ] */ +/* XRANGE key start end [COUNT ] [REV] */ void xrangeCommand(client *c) { robj *o; stream *s; streamID startid, endid; long long count = 0; + int rev = 0; if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return; if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return; /* Parse the COUNT option if any. */ - if (c->argc > 5) { - if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) { - if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK) + if (c->argc > 4) { + for (int j = 4; j < c->argc; j++) { + int additional = c->argc-j-1; + if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) + != C_OK) return; + if (count < 0) count = 0; + j++; /* Consume additional arg. */ + } else if (strcasecmp(c->argv[j]->ptr,"REV") == 0) { + rev = 1; + } else { + addReply(c,shared.syntaxerr); return; - if (count < 0) count = 0; - } else { - addReply(c,shared.syntaxerr); - return; + } } } @@ -823,7 +878,7 @@ void xrangeCommand(client *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; - streamReplyWithRange(c,s,&startid,&endid,count); + streamReplyWithRange(c,s,&startid,&endid,count,rev); } /* XLEN */ @@ -931,7 +986,7 @@ void xreadCommand(client *c) { * of the stream and the data we extracted from it. */ addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[i+streams_arg]); - streamReplyWithRange(c,s,&start,NULL,count); + streamReplyWithRange(c,s,&start,NULL,count,0); } }