From b26f03bd695d19ea88fc7ca1849ff46be802a216 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 21 Feb 2018 11:42:51 +0100 Subject: [PATCH] CG: XCLAIM now updates the idle time of the message. --- src/server.c | 1 + src/server.h | 1 + src/t_stream.c | 24 ++++++++++++++++-------- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/server.c b/src/server.c index bebc67fa..8c830043 100644 --- a/src/server.c +++ b/src/server.c @@ -311,6 +311,7 @@ struct redisCommand redisCommandTable[] = { {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, + {"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 5863e0c7..76aaab88 100644 --- a/src/server.h +++ b/src/server.h @@ -2028,6 +2028,7 @@ void xreadCommand(client *c); void xgroupCommand(client *c); void xackCommand(client *c); void xpendingCommand(client *c); +void xclaimCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index c97bf714..46228e74 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1611,7 +1611,9 @@ void xpendingCommand(client *c) { * the consumer group PEL. * * This command creates the consumer as side effect if it does not yet - * exists. + * exists. Moreover the command reset the idle time of the message to 0, + * even if by using the IDLE or TIME options, the user can control the + * new idle time. * * The options at the end can be used in order to specify more attributes * to set in the representation of the pending message: @@ -1639,7 +1641,8 @@ void xpendingCommand(client *c) { * 4. FORCE: * Creates the pending message entry in the PEL even if certain * specified IDs are not already in the PEL assigned to a different - * client. + * client. However the message must be exist in the stream, otherwise + * the IDs of non existing messages are ignored. * * 5. JUSTID: * Return just an array of IDs of messages successfully claimed, @@ -1723,12 +1726,16 @@ void xclaimCommand(client *c) { /* If a delivery time was passed, either with IDLE or TIME, we * do some sanity check on it, and set the deliverytime to now * (which is a sane choice usually) if the value is bogus. - * - * We could raise an error here, but it's not a sensible choice - * because the client may use it's local clock to compute the - * time, and in case of desynchronizations to fail is not a good - * idea most of the times. */ + * To raise an error here is not wise because clients may compute + * the idle time doing some math startin from their local time, + * and this is not a good excuse to fail in case, for instance, + * the computed time is a bit in the future from our POV. */ if (deliverytime < 0 || deliverytime > now) deliverytime = now; + } else { + /* If no IDLE/TIME option was passed, we want the last delivery + * time to be now, so that the idle time of the message will be + * zero. */ + deliverytime = now; } /* Do the actual claiming. */ @@ -1752,8 +1759,9 @@ void xclaimCommand(client *c) { } /* Remove the entry from the old consumer. */ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - /* Update the consumer. */ + /* Update the consumer and idle time. */ nack->consumer = consumer; + nack->delivery_time = deliverytime; /* Add the entry in the new cosnumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */