CG: XACK implementation.

This commit is contained in:
antirez 2018-01-25 16:39:49 +01:00
parent 5bbd117c29
commit 388c69fe4e
3 changed files with 56 additions and 4 deletions

View File

@ -309,6 +309,7 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

View File

@ -2026,6 +2026,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c); void xlenCommand(client *c);
void xreadCommand(client *c); void xreadCommand(client *c);
void xgroupCommand(client *c); void xgroupCommand(client *c);
void xackCommand(client *c);
#if defined(__GNUC__) #if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -1393,12 +1393,62 @@ NULL
} }
} }
/* XPENDING <key> [<start> <stop>]. */ /* XACK <key> <group> <id> <id> ... <id>
*
* Acknowledge a message as processed. In practical terms we just check the
* pendine entries list (PEL) of the group, and delete the PEL entry both from
* the group and the consumer (pending messages are referenced in both places).
*
* Return value of the command is the number of messages successfully
* acknowledged, that is, the IDs we were actually able to resolve in the PEL.
*/
void xackCommand(client *c) {
streamCG *group;
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
/* XCLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ...*/ /* No key or group? Nothing to ack. */
if (o == NULL || group == NULL) {
addReply(c,shared.cone);
return;
}
/* XACK <stream-key> */ int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL: it will have a reference to the
* NACK structure that will have a reference to the consumer, so that
* we are able to remove the entry from both PELs. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if (nack != raxNotFound) {
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamFreeNACK(nack);
acknowledged++;
}
}
addReplyLongLong(c,acknowledged);
}
/* XPENDING <key> <group> [<start> <stop>]
*
* If start and stop are omitted, the command just outputs information about
* the amount of pending messages for the key/group pair, together with
* the minimum and maxium ID of pending messages.
*
* If start and stop are provided instead, the pending messages are returned
* with informations about the current owner, number of deliveries and last
* delivery time and so forth. */
/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/
/* XREAD-GROUP will be implemented by xreadGenericCommand() */ /* XREAD-GROUP will be implemented by xreadGenericCommand() */
/* XINFO <key> [CONSUMERS|GROUPS|STREAM]. STREAM is the default */ /* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */