diff --git a/src/t_stream.c b/src/t_stream.c index 87200594..dc6ac8c6 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1153,7 +1153,7 @@ void xreadCommand(client *c) { if (o == NULL || (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) { - addReplyErrorFormat(c, "No such key '%s' or consumer " + addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " "group '%s' in XREADGROUP with GROUP " "option", key->ptr,groupname->ptr); @@ -1491,7 +1491,7 @@ void xpendingCommand(client *c) { if (o == NULL || (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) { - addReplyErrorFormat(c, "No such key '%s' or consumer " + addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " "group '%s'", key->ptr,groupname->ptr); return; @@ -1593,7 +1593,167 @@ void xpendingCommand(client *c) { } } -/* XCLAIM ...*/ +/* XCLAIM + * [IDLE ] [TIME ] [RETRYCOUNT ] + * [FORCE] [JUSTID] + * + * Gets ownership of one or multiple messages in the Pending Entries List + * of a given stream consumer group. + * + * If the message ID (among the specified ones) exists, and its idle + * time greater or equal to , then the message new owner + * becomes the specified . + * + * All the messages that cannot be found inside the pending entires list + * are ignored, but in case the FORCE option is used. In that case we + * create the NACK (representing a not yet acknowledged message) entry in + * the consumer group PEL. + * + * This command creates the consumer as side effect if it does not yet + * exists. + * + * The options at the end can be used in order to specify more attributes + * to set in the representation of the pending message: + * + * 1. IDLE : + * Set the idle time (last time it was delivered) of the message. + * If IDLE is not specified, an IDLE of 0 is assumed, that is, + * the time count is reset because the message has now a new + * owner trying to process it. + * + * 2. TIME : + * This is the same as IDLE but instead of a relative amount of + * milliseconds, it sets the idle time to a specific unix time + * (in milliseconds). This is useful in order to rewrite the AOF + * file generating XCLAIM commands. + * + * 3. RETRYCOUNT : + * Set the retry counter to the specified value. This counter is + * incremented every time a message is delivered again. Normally + * XCLAIM does not alter this counter, which is just served to clients + * when the XPENDING command is called: this way clients can detect + * anomalies, like messages that are never processed for some reason + * after a big number of delivery attempts. + * + * 4. FORCE: + * Creates the pending message entry in the PEL even if certain + * specified IDs are not already in the PEL assigned to a different + * client. + * + * 5. JUSTID: + * Return just an array of IDs of messages successfully claimed, + * without returning the actual message. + * + * The command returns an array of messages that the user + * successfully claimed, so that the caller is able to understand + * what messages it is now in charge of. */ +void xclaimCommand(client *c) { + streamCG *group; + robj *o = lookupKeyRead(c->db,c->argv[1]); + long long minidle; /* Minimum idle time argument. */ + long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ + mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ + int force = 0; + int justid = 0; + + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Send an error given that the group creation + * is mandatory. */ + if (o == NULL || group == NULL) { + addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " + "consumer group '%s'", c->argv[1]->ptr, + c->argv[2]->ptr); + return; + } + + if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, + "Invalid min-idle-time argument for XCLAIM") + != C_OK) return; + if (minidle < 0) minidle = 0; + + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully claimed some message, so it should be + * executed in a "all or nothing" fashion. */ + int j; + for (j = 4; j < c->argc; j++) { + streamID id; + if (streamParseIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break; + } + int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ + + /* If we stopped because some IDs cannot be parsed, perhaps they + * are trailing options. */ + time_t now = 0; + for (; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ + char *opt = c->argv[j]->ptr; + if (!strcasecmp(opt,"FORCE")) { + force = 1; + } else if (!strcasecmp(opt,"JUSTID")) { + justid = 1; + } else if (!strcasecmp(opt,"IDLE") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, + "Invalid IDLE option argument for XCLAIM") + != C_OK) return; + now = mstime(); + deliverytime = now - deliverytime; + } else if (!strcasecmp(opt,"TIME") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, + "Invalid IDLE option argument for XCLAIM") + != C_OK) return; + } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, + "Invalid IDLE option argument for XCLAIM") + != C_OK) return; + } else { + addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); + return; + } + } + + if (deliverytime != -1) { + now = (now == 0) ? mstime() : now; + if (deliverytime < 0 || deliverytime > now) deliverytime = now; + } + + /* Do the actual claiming. */ + streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + void *arraylenptr = addDeferredMultiBulkLength(c); + size_t arraylen = 0; + for (int j = 5; j <= last_id_arg; j++) { + streamID id; + unsigned char buf[sizeof(streamID)]; + if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + streamEncodeID(buf,&id); + + /* Lookup the ID in the group PEL. */ + streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + if (nack != raxNotFound) { + /* Remove the entry from the old consumer. */ + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + /* Update the consumer. */ + nack->consumer = consumer; + /* Add the entry in the new cosnumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + /* Send the reply for this entry. */ + if (justid) { + addReplyStreamID(c,&id); + } else { + streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES); + } + } + } + setDeferredMultiBulkLength(c,arraylenptr,arraylen); +} /* XREAD-GROUP will be implemented by xreadGenericCommand() */