CG: XCLAIM now updates the idle time of the message.

This commit is contained in:
antirez 2018-02-21 11:42:51 +01:00
parent f7d4c3acdf
commit b26f03bd69
3 changed files with 18 additions and 8 deletions

View File

@ -311,6 +311,7 @@ struct redisCommand redisCommandTable[] = {
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

View File

@ -2028,6 +2028,7 @@ void xreadCommand(client *c);
void xgroupCommand(client *c); void xgroupCommand(client *c);
void xackCommand(client *c); void xackCommand(client *c);
void xpendingCommand(client *c); void xpendingCommand(client *c);
void xclaimCommand(client *c);
#if defined(__GNUC__) #if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -1611,7 +1611,9 @@ void xpendingCommand(client *c) {
* the consumer group PEL. * the consumer group PEL.
* *
* This command creates the consumer as side effect if it does not yet * This command creates the consumer as side effect if it does not yet
* exists. * exists. Moreover the command reset the idle time of the message to 0,
* even if by using the IDLE or TIME options, the user can control the
* new idle time.
* *
* The options at the end can be used in order to specify more attributes * The options at the end can be used in order to specify more attributes
* to set in the representation of the pending message: * to set in the representation of the pending message:
@ -1639,7 +1641,8 @@ void xpendingCommand(client *c) {
* 4. FORCE: * 4. FORCE:
* Creates the pending message entry in the PEL even if certain * Creates the pending message entry in the PEL even if certain
* specified IDs are not already in the PEL assigned to a different * specified IDs are not already in the PEL assigned to a different
* client. * client. However the message must be exist in the stream, otherwise
* the IDs of non existing messages are ignored.
* *
* 5. JUSTID: * 5. JUSTID:
* Return just an array of IDs of messages successfully claimed, * Return just an array of IDs of messages successfully claimed,
@ -1723,12 +1726,16 @@ void xclaimCommand(client *c) {
/* If a delivery time was passed, either with IDLE or TIME, we /* If a delivery time was passed, either with IDLE or TIME, we
* do some sanity check on it, and set the deliverytime to now * do some sanity check on it, and set the deliverytime to now
* (which is a sane choice usually) if the value is bogus. * (which is a sane choice usually) if the value is bogus.
* * To raise an error here is not wise because clients may compute
* We could raise an error here, but it's not a sensible choice * the idle time doing some math startin from their local time,
* because the client may use it's local clock to compute the * and this is not a good excuse to fail in case, for instance,
* time, and in case of desynchronizations to fail is not a good * the computed time is a bit in the future from our POV. */
* idea most of the times. */
if (deliverytime < 0 || deliverytime > now) deliverytime = now; if (deliverytime < 0 || deliverytime > now) deliverytime = now;
} else {
/* If no IDLE/TIME option was passed, we want the last delivery
* time to be now, so that the idle time of the message will be
* zero. */
deliverytime = now;
} }
/* Do the actual claiming. */ /* Do the actual claiming. */
@ -1752,8 +1759,9 @@ void xclaimCommand(client *c) {
} }
/* Remove the entry from the old consumer. */ /* Remove the entry from the old consumer. */
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer. */ /* Update the consumer and idle time. */
nack->consumer = consumer; nack->consumer = consumer;
nack->delivery_time = deliverytime;
/* Add the entry in the new cosnumer local PEL. */ /* Add the entry in the new cosnumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Send the reply for this entry. */ /* Send the reply for this entry. */