Streams: XDEL command.

This commit is contained in:
antirez 2018-04-18 13:12:09 +02:00
parent 9c149bf1f1
commit aba76320d5
3 changed files with 33 additions and 2 deletions

View File

@ -313,6 +313,7 @@ struct redisCommand redisCommandTable[] = {
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
{"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0}, {"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0},
{"xdel",xdelCommand,-2,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

View File

@ -2053,6 +2053,7 @@ void xackCommand(client *c);
void xpendingCommand(client *c); void xpendingCommand(client *c);
void xclaimCommand(client *c); void xclaimCommand(client *c);
void xinfoCommand(client *c); void xinfoCommand(client *c);
void xdelCommand(client *c);
#if defined(__GNUC__) #if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -715,10 +715,10 @@ void streamIteratorStop(streamIterator *si) {
/* Delete the specified item ID from the stream, returning 1 if the item /* Delete the specified item ID from the stream, returning 1 if the item
* was deleted 0 otherwise (if it does not exist). */ * was deleted 0 otherwise (if it does not exist). */
int streamDeleteItem(stream *s, streamID id) { int streamDeleteItem(stream *s, streamID *id) {
int deleted = 0; int deleted = 0;
streamIterator si; streamIterator si;
streamIteratorStart(&si,s,&id,&id,0); streamIteratorStart(&si,s,id,id,0);
streamID myid; streamID myid;
int64_t numfields; int64_t numfields;
if (streamIteratorGetID(&si,&myid,&numfields)) { if (streamIteratorGetID(&si,&myid,&numfields)) {
@ -1992,6 +1992,35 @@ void xclaimCommand(client *c) {
preventCommandPropagation(c); preventCommandPropagation(c);
} }
/* XDEL <key> [<ID1> <ID2> ... <IDN>]
*
* Removes the specified entries from the stream. Returns the number
* of items actaully deleted, that may be different from the number
* of IDs passed in case certain IDs do not exist. */
void xdelCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
/* We need to sanity check the IDs passed to start. Even if not
* a big issue, it is not great that the command is only partially
* executed becuase at some point an invalid ID is parsed. */
streamID id;
for (int j = 2; j < c->argc; j++) {
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
}
/* Actaully apply the command. */
int deleted = 0;
for (int j = 2; j < c->argc; j++) {
streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
deleted += streamDeleteItem(s,&id);
}
addReplyLongLong(c,deleted);
}
/* XINFO CONSUMERS key group /* XINFO CONSUMERS key group
* XINFO GROUPS <key> * XINFO GROUPS <key>
* XINFO STREAM <key> * XINFO STREAM <key>