CG: XCLAIM initial draft.

This commit is contained in:
antirez 2018-02-16 17:25:35 +01:00
parent 00a29b1a81
commit 0a6780e560

View File

@ -1153,7 +1153,7 @@ void xreadCommand(client *c) {
if (o == NULL || if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{ {
addReplyErrorFormat(c, "No such key '%s' or consumer " addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP " "group '%s' in XREADGROUP with GROUP "
"option", "option",
key->ptr,groupname->ptr); key->ptr,groupname->ptr);
@ -1491,7 +1491,7 @@ void xpendingCommand(client *c) {
if (o == NULL || if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{ {
addReplyErrorFormat(c, "No such key '%s' or consumer " addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s'", "group '%s'",
key->ptr,groupname->ptr); key->ptr,groupname->ptr);
return; return;
@ -1593,7 +1593,167 @@ void xpendingCommand(client *c) {
} }
} }
/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/ /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
* [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
* [FORCE] [JUSTID]
*
* Gets ownership of one or multiple messages in the Pending Entries List
* of a given stream consumer group.
*
* If the message ID (among the specified ones) exists, and its idle
* time greater or equal to <min-idle-time>, then the message new owner
* becomes the specified <consumer>.
*
* All the messages that cannot be found inside the pending entires list
* are ignored, but in case the FORCE option is used. In that case we
* create the NACK (representing a not yet acknowledged message) entry in
* the consumer group PEL.
*
* This command creates the consumer as side effect if it does not yet
* exists.
*
* The options at the end can be used in order to specify more attributes
* to set in the representation of the pending message:
*
* 1. IDLE <ms>:
* Set the idle time (last time it was delivered) of the message.
* If IDLE is not specified, an IDLE of 0 is assumed, that is,
* the time count is reset because the message has now a new
* owner trying to process it.
*
* 2. TIME <ms-unix-time>:
* This is the same as IDLE but instead of a relative amount of
* milliseconds, it sets the idle time to a specific unix time
* (in milliseconds). This is useful in order to rewrite the AOF
* file generating XCLAIM commands.
*
* 3. RETRYCOUNT <count>:
* Set the retry counter to the specified value. This counter is
* incremented every time a message is delivered again. Normally
* XCLAIM does not alter this counter, which is just served to clients
* when the XPENDING command is called: this way clients can detect
* anomalies, like messages that are never processed for some reason
* after a big number of delivery attempts.
*
* 4. FORCE:
* Creates the pending message entry in the PEL even if certain
* specified IDs are not already in the PEL assigned to a different
* client.
*
* 5. JUSTID:
* Return just an array of IDs of messages successfully claimed,
* without returning the actual message.
*
* The command returns an array of messages that the user
* successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */
void xclaimCommand(client *c) {
streamCG *group;
robj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle; /* Minimum idle time argument. */
long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
int force = 0;
int justid = 0;
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
/* No key or group? Send an error given that the group creation
* is mandatory. */
if (o == NULL || group == NULL) {
addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
"consumer group '%s'", c->argv[1]->ptr,
c->argv[2]->ptr);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
"Invalid min-idle-time argument for XCLAIM")
!= C_OK) return;
if (minidle < 0) minidle = 0;
/* Start parsing the IDs, so that we abort ASAP if there is a syntax
* error: the return value of this command cannot be an error in case
* the client successfully claimed some message, so it should be
* executed in a "all or nothing" fashion. */
int j;
for (j = 4; j < c->argc; j++) {
streamID id;
if (streamParseIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
}
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
time_t now = 0;
for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
if (!strcasecmp(opt,"FORCE")) {
force = 1;
} else if (!strcasecmp(opt,"JUSTID")) {
justid = 1;
} else if (!strcasecmp(opt,"IDLE") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) return;
now = mstime();
deliverytime = now - deliverytime;
} else if (!strcasecmp(opt,"TIME") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) return;
} else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) return;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return;
}
}
if (deliverytime != -1) {
now = (now == 0) ? mstime() : now;
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
}
/* Do the actual claiming. */
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
void *arraylenptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if (nack != raxNotFound) {
/* Remove the entry from the old consumer. */
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer. */
nack->consumer = consumer;
/* Add the entry in the new cosnumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Send the reply for this entry. */
if (justid) {
addReplyStreamID(c,&id);
} else {
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES);
}
}
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
}
/* XREAD-GROUP will be implemented by xreadGenericCommand() */ /* XREAD-GROUP will be implemented by xreadGenericCommand() */