Merge pull request #5437 from soloestoy/fix-xclaim

Fix xclaim
This commit is contained in:
Salvatore Sanfilippo 2018-10-15 11:47:21 +02:00 committed by GitHub
commit 440574d624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2075,6 +2075,8 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they /* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */ * are trailing options. */
time_t now = mstime(); time_t now = mstime();
streamID last_id = {0,0};
int lastid_updated = 0;
for (; j < c->argc; j++) { for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr; char *opt = c->argv[j]->ptr;
@ -2100,18 +2102,18 @@ void xclaimCommand(client *c) {
!= C_OK) return; != C_OK) return;
} else if (!strcasecmp(opt,"LASTID") && moreargs) { } else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++; j++;
streamID id; if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
/* Technically it could be more correct to update that only after
* checking for syntax errors, but this option is only used by
* the replication command that outputs correct syntax. */
if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id;
} else { } else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return; return;
} }
} }
if (streamCompareID(&last_id,&group->last_id) > 0) {
group->last_id = last_id;
lastid_updated = 1;
}
if (deliverytime != -1) { if (deliverytime != -1) {
/* If a delivery time was passed, either with IDLE or TIME, we /* If a delivery time was passed, either with IDLE or TIME, we
* do some sanity check on it, and set the deliverytime to now * do some sanity check on it, and set the deliverytime to now
@ -2132,10 +2134,12 @@ void xclaimCommand(client *c) {
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
void *arraylenptr = addDeferredMultiBulkLength(c); void *arraylenptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0; size_t arraylen = 0;
long long dirty = server.dirty;
for (int j = 5; j <= last_id_arg; j++) { for (int j = 5; j <= last_id_arg; j++) {
streamID id; streamID id;
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */ /* Lookup the ID in the group PEL. */
@ -2165,8 +2169,10 @@ void xclaimCommand(client *c) {
if (nack != raxNotFound) { if (nack != raxNotFound) {
/* We need to check if the minimum idle time requested /* We need to check if the minimum idle time requested
* by the caller is satisfied by this entry. */ * by the caller is satisfied by this entry.
if (minidle) { * Note that if nack->consumer is NULL, means the NACK
* is created by FORCE, we should ignore minidle. */
if (nack->consumer && minidle) {
mstime_t this_idle = now - nack->delivery_time; mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue; if (this_idle < minidle) continue;
} }
@ -2192,10 +2198,14 @@ void xclaimCommand(client *c) {
arraylen++; arraylen++;
/* Propagate this change. */ /* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[3],c->argv[j],nack); streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
server.dirty++; server.dirty++;
} }
} }
if (server.dirty == dirty && lastid_updated) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen); setDeferredMultiBulkLength(c,arraylenptr,arraylen);
preventCommandPropagation(c); preventCommandPropagation(c);
} }