Streams: state machine for reverse iteration WIP 1.

This commit is contained in:
antirez 2017-11-17 13:24:20 +01:00
parent 3c5d773f82
commit ee3490ec48
4 changed files with 98 additions and 42 deletions

View File

@ -1035,7 +1035,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
* The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamIterator si;
streamIteratorStart(&si,o->ptr,NULL,NULL);
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
streamID id;
int64_t numfields;

View File

@ -326,7 +326,7 @@ void handleClientsBlockedOnKeys(void) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count);
receiver->bpop.xread_count,0);
}
}
}

View File

@ -28,9 +28,10 @@ typedef struct stream {
typedef struct streamIterator {
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listapck. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
@ -49,8 +50,8 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end);
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);

View File

@ -426,7 +426,9 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
* at the end.
* at the end. The 'rev' parameter controls the direction. If it's zero the
* iteration is from the start to the end element (inclusive), otherwise
* if rev is non-zero, the iteration is reversed.
*
* Once the iterator is initalized, we iterate like this:
*
@ -443,7 +445,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* }
* }
* streamIteratorStop(&myiterator); */
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) {
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
/* Intialize the iterator and translates the iteration start/stop
* elements into a 128 big big-endian number. */
if (start) {
@ -462,17 +464,26 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
/* Seek the correct node in the radix tree. */
raxStart(&si->ri,s->rax);
if (start && (start->ms || start->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
sizeof(si->start_key));
if (raxEOF(&si->ri))
raxSeek(&si->ri,">",(unsigned char*)si->start_key,
if (!rev) {
if (start && (start->ms || start->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
sizeof(si->start_key));
if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
} else {
raxSeek(&si->ri,"^",NULL,0);
}
} else {
raxSeek(&si->ri,"^",NULL,0);
if (end && (end->ms || end->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
sizeof(si->end_key));
if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
} else {
raxSeek(&si->ri,"$",NULL,0);
}
}
si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
}
/* Return 1 and store the current item ID at 'id' if there are still
@ -484,7 +495,8 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
* iteration or the previous listpack was completely iterated.
* Go to the next node. */
if (si->lp == NULL || si->lp_ele == NULL) {
if (!raxNext(&si->ri)) return 0;
if (!si->rev && !raxNext(&si->ri)) return 0;
else if (si->rev && !raxPrev(&si->ri)) return 0;
serverAssert(si->ri.key_len == sizeof(streamID));
/* Get the master ID. */
streamDecodeID(si->ri.key,&si->master_id);
@ -499,16 +511,38 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* Skip master fileds to seek the first entry. */
for (uint64_t i = 0; i < si->master_fields_count; i++)
si->lp_ele = lpNext(si->lp,si->lp_ele);
/* We are now pointing the zero term of the master entry. */
/* We are now pointing the zero term of the master entry. If
* we are iterating in reverse order, we need to seek the
* end of the listpack. */
if (si->rev) si->lp_ele = lpLast(si->lp);
} else if (si->rev) {
/* If we are itereating in the reverse order, and this is not
* the first entry emitted for this listpack, then we already
* emitted the current entry, and have to go back to the previous
* one. */
int lp_count = lpGetInteger(si->lp_ele);
while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
/* Seek lp-count of prev entry. */
si->lp_ele = lpPrev(si->lp,si->lp_ele);
}
/* For every radix tree node, iterate the corresponding listpack,
* returning elements when they are within range. */
while(1) {
/* Skip the previous entry lp-count field, or in case of the
* master entry, the zero term field. */
si->lp_ele = lpNext(si->lp,si->lp_ele);
if (si->lp_ele == NULL) break;
if (!si->rev) {
/* If we are going forward, skip the previous entry
* lp-count field (or in case of the master entry, the zero
* term field) */
si->lp_ele = lpNext(si->lp,si->lp_ele);
if (si->lp_ele == NULL) break;
} else {
/* If we are going backward, read the number of elements this
* entry is composed of, and jump backward N times to seek
* its start. */
int lp_count = lpGetInteger(si->lp_ele);
if (lp_count == 0) break; /* We reached the master entry. */
while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
}
/* Get the flags entry. */
int flags = lpGetInteger(si->lp_ele);
@ -535,15 +569,28 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* If current >= start, and the entry is not marked as
* deleted, emit it. */
if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
!(flags & STREAM_ITEM_FLAG_DELETED))
{
if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
return 0; /* We are already out of range. */
si->entry_flags = flags;
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
si->master_fields_ptr = si->master_fields_start;
return 1; /* Valid item returned. */
if (!si->rev) {
if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
!(flags & STREAM_ITEM_FLAG_DELETED))
{
if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
return 0; /* We are already out of range. */
si->entry_flags = flags;
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
si->master_fields_ptr = si->master_fields_start;
return 1; /* Valid item returned. */
}
} else {
if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
!(flags & STREAM_ITEM_FLAG_DELETED))
{
if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
return 0; /* We are already out of range. */
si->entry_flags = flags;
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
si->master_fields_ptr = si->master_fields_start;
return 1; /* Valid item returned. */
}
}
/* If we do not emit, we have to discard. */
@ -553,7 +600,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
si->lp_ele = lpNext(si->lp,si->lp_ele);
}
/* End of listpack reached. Try the next radix tree node. */
/* End of listpack reached. Try the next/prev radix tree node. */
}
}
@ -585,15 +632,16 @@ void streamIteratorStop(streamIterator *si) {
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
* we want all the elements from 'start' till the end of the stream. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
* we want all the elements from 'start' till the end of the stream. If 'rev'
* is non zero, elements are produced in reversed order from end to start. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
streamIteratorStart(&si,s,start,end);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
@ -797,25 +845,32 @@ void xaddCommand(client *c) {
signalKeyAsReady(c->db, c->argv[1]);
}
/* XRANGE key start end [COUNT <n>] */
/* XRANGE key start end [COUNT <n>] [REV] */
void xrangeCommand(client *c) {
robj *o;
stream *s;
streamID startid, endid;
long long count = 0;
int rev = 0;
if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return;
if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return;
/* Parse the COUNT option if any. */
if (c->argc > 5) {
if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) {
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK)
if (c->argc > 4) {
for (int j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
!= C_OK) return;
if (count < 0) count = 0;
j++; /* Consume additional arg. */
} else if (strcasecmp(c->argv[j]->ptr,"REV") == 0) {
rev = 1;
} else {
addReply(c,shared.syntaxerr);
return;
if (count < 0) count = 0;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
}
@ -823,7 +878,7 @@ void xrangeCommand(client *c) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
streamReplyWithRange(c,s,&startid,&endid,count);
streamReplyWithRange(c,s,&startid,&endid,count,rev);
}
/* XLEN */
@ -931,7 +986,7 @@ void xreadCommand(client *c) {
* of the stream and the data we extracted from it. */
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[i+streams_arg]);
streamReplyWithRange(c,s,&start,NULL,count);
streamReplyWithRange(c,s,&start,NULL,count,0);
}
}