diff --git a/src/server.c b/src/server.c index 7f51778d..2cc6b6f5 100644 --- a/src/server.c +++ b/src/server.c @@ -313,6 +313,7 @@ struct redisCommand redisCommandTable[] = { {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0}, {"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0}, + {"xdel",xdelCommand,-2,"wF",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 0e9c3f28..66581eaa 100644 --- a/src/server.h +++ b/src/server.h @@ -2053,6 +2053,7 @@ void xackCommand(client *c); void xpendingCommand(client *c); void xclaimCommand(client *c); void xinfoCommand(client *c); +void xdelCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index 2e4480bf..5891b774 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -715,10 +715,10 @@ void streamIteratorStop(streamIterator *si) { /* Delete the specified item ID from the stream, returning 1 if the item * was deleted 0 otherwise (if it does not exist). */ -int streamDeleteItem(stream *s, streamID id) { +int streamDeleteItem(stream *s, streamID *id) { int deleted = 0; streamIterator si; - streamIteratorStart(&si,s,&id,&id,0); + streamIteratorStart(&si,s,id,id,0); streamID myid; int64_t numfields; if (streamIteratorGetID(&si,&myid,&numfields)) { @@ -1992,6 +1992,35 @@ void xclaimCommand(client *c) { preventCommandPropagation(c); } + +/* XDEL [ ... ] + * + * Removes the specified entries from the stream. Returns the number + * of items actaully deleted, that may be different from the number + * of IDs passed in case certain IDs do not exist. */ +void xdelCommand(client *c) { + robj *o; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + + /* We need to sanity check the IDs passed to start. Even if not + * a big issue, it is not great that the command is only partially + * executed becuase at some point an invalid ID is parsed. */ + streamID id; + for (int j = 2; j < c->argc; j++) { + if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + } + + /* Actaully apply the command. */ + int deleted = 0; + for (int j = 2; j < c->argc; j++) { + streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ + deleted += streamDeleteItem(s,&id); + } + addReplyLongLong(c,deleted); +} /* XINFO CONSUMERS key group * XINFO GROUPS * XINFO STREAM