From 0540803288dd137c0a0f3fc345165c6a87f0957e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 29 Sep 2017 12:40:29 +0200 Subject: [PATCH] Streams: XADD MAXLEN implementation. The core of this change is the implementation of stream trimming, and the resulting MAXLEN option of XADD as a trivial result of having trimming functionalities. MAXLEN already works but in order to be more efficient listpack GC should be implemented, currently marked as a TODO item inside the comments. --- src/t_stream.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 129 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 956a9af1..a7505d15 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -305,6 +305,107 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, return C_OK; } +/* Trim the stream 's' to have no more than maxlen elements, and return the + * number of elements removed from the stream. The 'approx' option, if non-zero, + * specifies that the trimming must be performed in a approximated way in + * order to maximize performances. This means that the stream may contain + * more elements than 'maxlen', and elements are only removed if we can remove + * a *whole* node of the radix tree. The elements are removed from the head + * of the stream (older elements). + * + * The function may return zero if: + * + * 1) The stream is already shorter or equal to the specified max length. + * 2) The 'approx' option is true and the head node had not enough elements + * to be deleted, leaving the stream with a number of elements >= maxlen. + */ +int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { + if (s->length <= maxlen) return 0; + + raxIterator ri; + raxStart(&ri,s->rax); + raxSeek(&ri,"^",NULL,0); + + int64_t deleted = 0; + while(s->length > maxlen && raxNext(&ri)) { + unsigned char *lp = ri.data, *p = lpFirst(lp); + int64_t entries = lpGetInteger(p); + + /* Check if we can remove the whole node, and still have at + * least maxlen elements. */ + if (s->length - entries >= maxlen) { + raxRemove(s->rax,ri.key,ri.key_len,NULL); + raxSeek(&ri,">=",ri.key,ri.key_len); + s->length -= entries; + deleted += entries; + continue; + } + + /* If we cannot remove a whole element, and approx is true, + * stop here. */ + if (approx) break; + + /* Otherwise, we have to mark single entries inside the listpack + * as deleted. We start by updating the entries/deleted counters. */ + int64_t to_delete = s->length - maxlen; + serverAssert(to_delete < entries); + lp = lpReplaceInteger(lp,&p,entries-to_delete); + p = lpNext(lp,p); /* Seek deleted field. */ + int64_t deleted = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,deleted+to_delete); + p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */ + + /* Skip all the master fields. */ + int64_t master_fields_count = lpGetInteger(p); + p = lpNext(lp,p); /* Seek the first field. */ + for (int64_t j = 0; j < master_fields_count; j++) + p = lpNext(lp,p); /* Skip all master fields. */ + + /* 'p' is now pointing to the first entry inside the listpack. + * We have to run entry after entry, marking entries as deleted + * if they are already not deleted. */ + while(p) { + int flags = lpGetInteger(p); + int to_skip; + + /* Mark the entry as deleted. */ + if (!(flags & STREAM_ITEM_FLAG_DELETED)) { + flags |= STREAM_ITEM_FLAG_DELETED; + lp = lpReplaceInteger(lp,&p,flags); + deleted++; + s->length--; + if (s->length <= maxlen) break; /* Enough entries deleted. */ + } + + p = lpNext(lp,p); /* Skip ID ms delta. */ + p = lpNext(lp,p); /* Skip ID seq delta. */ + p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */ + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + to_skip = master_fields_count; + } else { + to_skip = lpGetInteger(p); p = lpNext(lp,p); + to_skip = 1+(to_skip*2); + } + + while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ + } + + /* Here we should perform garbage collection in case at this point + * there are too many entries deleted inside the listpack. */ + entries -= to_delete; + deleted += to_delete; + if (entries + deleted > 10 && deleted > entries/2) { + /* TODO: perform a garbage collection. */ + } + + break; /* If we are here, there was enough to delete in the current + node, so no need to go to the next node. */ + } + + raxStop(&ri); + return deleted; +} + /* Initialize the stream iterator, so that we can call iterating functions * to get the next items. This requires a corresponding streamIteratorStop() * at the end. @@ -578,21 +679,32 @@ invalid: void xaddCommand(client *c) { streamID id; int id_given = 0; /* Was an ID different than "*" specified? */ + long long maxlen = 0; /* 0 means no maximum length. */ + int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so + the maxium length is not applied verbatim. */ + int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ /* Parse options. */ int i = 2; /* This is the first argument position where we could find an option, or the ID. */ for (; i < c->argc; i++) { - int moreargs = i != c->argc-1; + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ char *opt = c->argv[i]->ptr; if (opt[0] == '*' && opt[1] == '\0') { /* This is just a fast path for the common case of auto-ID * creation. */ break; } else if (!strcasecmp(opt,"maxlen") && moreargs) { - addReplyError(c,"Sorry, MAXLEN is still not implemented"); + char *next = c->argv[i+1]->ptr; + /* Check for the form MAXLEN ~ . */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + approx_maxlen = 1; + i++; + } + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) + != C_OK) return; i++; - return; + maxlen_arg_idx = i; } else { /* If we are here is a syntax error or a valid ID. */ if (streamParseIDOrReply(NULL,c->argv[i],&id,0) == C_OK) { @@ -634,6 +746,20 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; + /* Remove older elements if MAXLEN was specified. */ + if (maxlen) { + if (!streamTrimByLength(s,maxlen,approx_maxlen)) { + /* If no trimming was performed, for instance because approximated + * trimming length was specified, rewrite the MAXLEN argument + * as zero, so that the command is propagated without trimming. */ + robj *zeroobj = createStringObjectFromLongLong(0); + rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj); + decrRefCount(zeroobj); + } else { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + } + } + /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ robj *idarg = createObject(OBJ_STRING,