From e6b0e8d9ec4561a07864358af8d2d4e81ac413fc Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 19 Apr 2018 16:25:29 +0200 Subject: [PATCH] Streams: XTRIM command added. --- src/server.c | 1 + src/server.h | 1 + src/t_stream.c | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/src/server.c b/src/server.c index 2cc6b6f5..404429be 100644 --- a/src/server.c +++ b/src/server.c @@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = { {"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0}, {"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0}, {"xdel",xdelCommand,-2,"wF",0,NULL,1,1,1,0,0}, + {"xtrim",xtrimCommand,-2,"wF",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 66581eaa..172e99c8 100644 --- a/src/server.h +++ b/src/server.h @@ -2054,6 +2054,7 @@ void xpendingCommand(client *c); void xclaimCommand(client *c); void xinfoCommand(client *c); void xdelCommand(client *c); +void xtrimCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index ad47941a..1e46c636 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2024,6 +2024,74 @@ void xdelCommand(client *c) { server.dirty += deleted; addReplyLongLong(c,deleted); } + +/* General form: XTRIM [... options ...] + * + * List of options: + * + * MAXLEN [~] -- Trim so that the stream will be capped at + * the specified length. Use ~ before the + * count in order to demand approximated trimming + * (like XADD MAXLEN option). + */ + +#define TRIM_STRATEGY_NONE 0 +#define TRIM_STRATEGY_MAXLEN 1 +void xtrimCommand(client *c) { + robj *o; + + /* If the key does not exist, we are ok returning zero, that is, the + * number of elements removed from the stream. */ + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + + /* Argument parsing. */ + int trim_strategy = TRIM_STRATEGY_NONE; + long long maxlen = 0; /* 0 means no maximum length. */ + int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so + the maxium length is not applied verbatim. */ + + /* Parse options. */ + int i = 2; /* Start of options. */ + for (; i < c->argc; i++) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (!strcasecmp(opt,"maxlen") && moreargs) { + trim_strategy = TRIM_STRATEGY_MAXLEN; + char *next = c->argv[i+1]->ptr; + /* Check for the form MAXLEN ~ . */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + approx_maxlen = 1; + i++; + } + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) + != C_OK) return; + i++; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + + /* Perform the trimming. */ + int64_t deleted = 0; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + deleted = streamTrimByLength(s,maxlen,approx_maxlen); + } else { + addReplyError(c,"XTRIM called without an option to trim the stream"); + return; + } + + /* Propagate the write if needed. */ + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); +} + /* XINFO CONSUMERS key group * XINFO GROUPS * XINFO STREAM