Streams: iterator entry deletion abilities.

This commit is contained in:
antirez 2018-04-17 17:18:00 +02:00
parent 7980d87c3c
commit 24ac2b4c74
2 changed files with 52 additions and 1 deletions

View File

@ -27,6 +27,7 @@ typedef struct stream {
* rewriting code that also needs to iterate the stream to emit the XADD * rewriting code that also needs to iterate the stream to emit the XADD
* commands. */ * commands. */
typedef struct streamIterator { typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */ streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */ uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */ unsigned char *master_fields_start; /* Master entries start in listpack. */
@ -38,6 +39,7 @@ typedef struct streamIterator {
raxIterator ri; /* Rax iterator. */ raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */ unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */ unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is /* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the * integer encoded, so that there is no string representation of the
* element inside the listpack itself. */ * element inside the listpack itself. */

View File

@ -265,7 +265,7 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
/* Update count and skip the deleted fields. */ /* Update count and skip the deleted fields. */
int64_t count = lpGetInteger(lp_ele); int64_t count = lpGetInteger(lp_ele);
lp = lpReplaceInteger(lp,&lp_ele,count+1); lp = lpReplaceInteger(lp,&lp_ele,count+1);
lp_ele = lpNext(lp,lp_ele); /* seek delted. */ lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
/* Check if the entry we are adding, have the same fields /* Check if the entry we are adding, have the same fields
@ -505,6 +505,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
raxSeek(&si->ri,"$",NULL,0); raxSeek(&si->ri,"$",NULL,0);
} }
} }
si->stream = s;
si->lp = NULL; /* There is no current listpack right now. */ si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */ si->lp_ele = NULL; /* Current listpack cursor. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
@ -573,6 +574,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
} }
/* Get the flags entry. */ /* Get the flags entry. */
si->lp_flags = si->lp_ele;
int flags = lpGetInteger(si->lp_ele); int flags = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
@ -657,6 +659,53 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
si->lp_ele = lpNext(si->lp,si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele);
} }
/* Remove the current entry from the stream: can be called after the
* GetID() API or after any GetField() call, however we need to iterate
* a valid entry while calling this function. Moreover the function
* requires the entry ID we are currently iterating, that was previously
* returned by GetID().
*
* Note that after calling this function, next calls to GetField() can't
* be performed: the entry is now deleted. Instead the iterator will
* automatically re-seek to the next entry, so the caller should continue
* with GetID(). */
void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
unsigned char *lp = si->lp;
int64_t aux;
/* We do not really delete the entry here. Instead we mark it as
* deleted flagging it, and also incrementing the count of the
* deleted entries in the listpack header.
*
* We start flagging: */
int flags = lpGetInteger(si->lp_flags);
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&si->lp_flags,flags);
/* Change the valid/deleted entries count in the master entry. */
unsigned char *p = lpFirst(lp);
aux = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,aux-1);
p = lpNext(lp,p); /* Seek deleted field. */
aux = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,aux+1);
/* Re-seek the iterator to fix the now messed up state. */
streamID start, end;
if (si->rev) {
streamDecodeID(si->start_key,&start);
end = *current;
} else {
start = *current;
streamDecodeID(si->end_key,&end);
}
streamIteratorStop(si);
streamIteratorStart(si,si->stream,&start,&end,si->rev);
/* TODO: perform a garbage collection here if the ration between
* deleted and valid goes over a certain limit. */
}
/* Stop the stream iterator. The only cleanup we need is to free the rax /* Stop the stream iterator. The only cleanup we need is to free the rax
* itereator, since the stream iterator itself is supposed to be stack * itereator, since the stream iterator itself is supposed to be stack
* allocated. */ * allocated. */