From 24ac2b4c74a454fabe1b2aa1134e9551a1e32625 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 17 Apr 2018 17:18:00 +0200 Subject: [PATCH] Streams: iterator entry deletion abilities. --- src/stream.h | 2 ++ src/t_stream.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/stream.h b/src/stream.h index 8a019e93..61210f95 100644 --- a/src/stream.h +++ b/src/stream.h @@ -27,6 +27,7 @@ typedef struct stream { * rewriting code that also needs to iterate the stream to emit the XADD * commands. */ typedef struct streamIterator { + stream *stream; /* The stream we are iterating. */ streamID master_id; /* ID of the master entry at listpack head. */ uint64_t master_fields_count; /* Master entries # of fields. */ unsigned char *master_fields_start; /* Master entries start in listpack. */ @@ -38,6 +39,7 @@ typedef struct streamIterator { raxIterator ri; /* Rax iterator. */ unsigned char *lp; /* Current listpack. */ unsigned char *lp_ele; /* Current listpack cursor. */ + unsigned char *lp_flags; /* Current entry flags pointer. */ /* Buffers used to hold the string of lpGet() when the element is * integer encoded, so that there is no string representation of the * element inside the listpack itself. */ diff --git a/src/t_stream.c b/src/t_stream.c index 4640f0b2..b6779a95 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -265,7 +265,7 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, /* Update count and skip the deleted fields. */ int64_t count = lpGetInteger(lp_ele); lp = lpReplaceInteger(lp,&lp_ele,count+1); - lp_ele = lpNext(lp,lp_ele); /* seek delted. */ + lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ /* Check if the entry we are adding, have the same fields @@ -505,6 +505,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI raxSeek(&si->ri,"$",NULL,0); } } + si->stream = s; si->lp = NULL; /* There is no current listpack right now. */ si->lp_ele = NULL; /* Current listpack cursor. */ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ @@ -573,6 +574,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { } /* Get the flags entry. */ + si->lp_flags = si->lp_ele; int flags = lpGetInteger(si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ @@ -657,6 +659,53 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign si->lp_ele = lpNext(si->lp,si->lp_ele); } +/* Remove the current entry from the stream: can be called after the + * GetID() API or after any GetField() call, however we need to iterate + * a valid entry while calling this function. Moreover the function + * requires the entry ID we are currently iterating, that was previously + * returned by GetID(). + * + * Note that after calling this function, next calls to GetField() can't + * be performed: the entry is now deleted. Instead the iterator will + * automatically re-seek to the next entry, so the caller should continue + * with GetID(). */ +void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { + unsigned char *lp = si->lp; + int64_t aux; + + /* We do not really delete the entry here. Instead we mark it as + * deleted flagging it, and also incrementing the count of the + * deleted entries in the listpack header. + * + * We start flagging: */ + int flags = lpGetInteger(si->lp_flags); + flags |= STREAM_ITEM_FLAG_DELETED; + lp = lpReplaceInteger(lp,&si->lp_flags,flags); + + /* Change the valid/deleted entries count in the master entry. */ + unsigned char *p = lpFirst(lp); + aux = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,aux-1); + p = lpNext(lp,p); /* Seek deleted field. */ + aux = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,aux+1); + + /* Re-seek the iterator to fix the now messed up state. */ + streamID start, end; + if (si->rev) { + streamDecodeID(si->start_key,&start); + end = *current; + } else { + start = *current; + streamDecodeID(si->end_key,&end); + } + streamIteratorStop(si); + streamIteratorStart(si,si->stream,&start,&end,si->rev); + + /* TODO: perform a garbage collection here if the ration between + * deleted and valid goes over a certain limit. */ +} + /* Stop the stream iterator. The only cleanup we need is to free the rax * itereator, since the stream iterator itself is supposed to be stack * allocated. */