mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
CG: XCLAIM, use minidle and fix array len.
This commit is contained in:
parent
09e3b3b975
commit
8727b4845b
@ -1602,7 +1602,8 @@ void xpendingCommand(client *c) {
|
|||||||
*
|
*
|
||||||
* If the message ID (among the specified ones) exists, and its idle
|
* If the message ID (among the specified ones) exists, and its idle
|
||||||
* time greater or equal to <min-idle-time>, then the message new owner
|
* time greater or equal to <min-idle-time>, then the message new owner
|
||||||
* becomes the specified <consumer>.
|
* becomes the specified <consumer>. If the minimum idle time specified
|
||||||
|
* is zero, messages are claimed regardless of their idle time.
|
||||||
*
|
*
|
||||||
* All the messages that cannot be found inside the pending entires list
|
* All the messages that cannot be found inside the pending entires list
|
||||||
* are ignored, but in case the FORCE option is used. In that case we
|
* are ignored, but in case the FORCE option is used. In that case we
|
||||||
@ -1688,7 +1689,7 @@ void xclaimCommand(client *c) {
|
|||||||
|
|
||||||
/* If we stopped because some IDs cannot be parsed, perhaps they
|
/* If we stopped because some IDs cannot be parsed, perhaps they
|
||||||
* are trailing options. */
|
* are trailing options. */
|
||||||
time_t now = 0;
|
time_t now = mstime();
|
||||||
for (; j < c->argc; j++) {
|
for (; j < c->argc; j++) {
|
||||||
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
|
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
|
||||||
char *opt = c->argv[j]->ptr;
|
char *opt = c->argv[j]->ptr;
|
||||||
@ -1701,7 +1702,6 @@ void xclaimCommand(client *c) {
|
|||||||
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
|
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
|
||||||
"Invalid IDLE option argument for XCLAIM")
|
"Invalid IDLE option argument for XCLAIM")
|
||||||
!= C_OK) return;
|
!= C_OK) return;
|
||||||
now = mstime();
|
|
||||||
deliverytime = now - deliverytime;
|
deliverytime = now - deliverytime;
|
||||||
} else if (!strcasecmp(opt,"TIME") && moreargs) {
|
} else if (!strcasecmp(opt,"TIME") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
@ -1720,7 +1720,14 @@ void xclaimCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (deliverytime != -1) {
|
if (deliverytime != -1) {
|
||||||
now = (now == 0) ? mstime() : now;
|
/* If a delivery time was passed, either with IDLE or TIME, we
|
||||||
|
* do some sanity check on it, and set the deliverytime to now
|
||||||
|
* (which is a sane choice usually) if the value is bogus.
|
||||||
|
*
|
||||||
|
* We could raise an error here, but it's not a sensible choice
|
||||||
|
* because the client may use it's local clock to compute the
|
||||||
|
* time, and in case of desynchronizations to fail is not a good
|
||||||
|
* idea most of the times. */
|
||||||
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
|
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1737,6 +1744,12 @@ void xclaimCommand(client *c) {
|
|||||||
/* Lookup the ID in the group PEL. */
|
/* Lookup the ID in the group PEL. */
|
||||||
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
|
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
|
||||||
if (nack != raxNotFound) {
|
if (nack != raxNotFound) {
|
||||||
|
/* We need to check if the minimum idle time requested
|
||||||
|
* by the caller is satisfied by this entry. */
|
||||||
|
if (minidle) {
|
||||||
|
mstime_t this_idle = now - nack->delivery_time;
|
||||||
|
if (this_idle < minidle) continue;
|
||||||
|
}
|
||||||
/* Remove the entry from the old consumer. */
|
/* Remove the entry from the old consumer. */
|
||||||
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
||||||
/* Update the consumer. */
|
/* Update the consumer. */
|
||||||
@ -1750,6 +1763,7 @@ void xclaimCommand(client *c) {
|
|||||||
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
|
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
|
||||||
STREAM_RWR_RAWENTRIES);
|
STREAM_RWR_RAWENTRIES);
|
||||||
}
|
}
|
||||||
|
arraylen++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
|
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user