From 4dc48a0d11fb7765a1a9b7a5908d9c9e64c10b8e Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 11 Oct 2018 21:12:09 +0800 Subject: [PATCH 1/4] Streams: bugfix XCLAIM should propagate group name not consumer name --- src/t_stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index a9474aff..ff95132c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2192,7 +2192,7 @@ void xclaimCommand(client *c) { arraylen++; /* Propagate this change. */ - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[3],c->argv[j],nack); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); server.dirty++; } } From 183ef7ae9bbc91d3bba0735df1669e95dc53a24e Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 11 Oct 2018 21:20:46 +0800 Subject: [PATCH 2/4] Streams: XCLAIM ignore minidle if NACK is created by FORCE Because the NACK->consumer is NULL, if idletime < minidle the NACK does not belong to any consumer, then redis will crash in XPENDING. --- src/t_stream.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index ff95132c..323783a7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2165,8 +2165,10 @@ void xclaimCommand(client *c) { if (nack != raxNotFound) { /* We need to check if the minimum idle time requested - * by the caller is satisfied by this entry. */ - if (minidle) { + * by the caller is satisfied by this entry. + * Note that if nack->consumer is NULL, means the NACK + * is created by FORCE, we should ignore minidle. */ + if (nack->consumer && minidle) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } From 08ae522ff942cd9ff5386a73ead6b27846766a8f Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 11 Oct 2018 21:44:20 +0800 Subject: [PATCH 3/4] Streams: propagate lastid in XCLAIM when it has effect --- src/t_stream.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 323783a7..16817c21 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2075,6 +2075,8 @@ void xclaimCommand(client *c) { /* If we stopped because some IDs cannot be parsed, perhaps they * are trailing options. */ time_t now = mstime(); + streamID last_id = {0,0}; + int lastid_updated = 0; for (; j < c->argc; j++) { int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ char *opt = c->argv[j]->ptr; @@ -2100,18 +2102,18 @@ void xclaimCommand(client *c) { != C_OK) return; } else if (!strcasecmp(opt,"LASTID") && moreargs) { j++; - streamID id; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; - /* Technically it could be more correct to update that only after - * checking for syntax errors, but this option is only used by - * the replication command that outputs correct syntax. */ - if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id; + if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return; } else { addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); return; } } + if (streamCompareID(&last_id,&group->last_id) > 0) { + group->last_id = last_id; + lastid_updated = 1; + } + if (deliverytime != -1) { /* If a delivery time was passed, either with IDLE or TIME, we * do some sanity check on it, and set the deliverytime to now @@ -2132,6 +2134,7 @@ void xclaimCommand(client *c) { streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); void *arraylenptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; + long long dirty = server.dirty; for (int j = 5; j <= last_id_arg; j++) { streamID id; unsigned char buf[sizeof(streamID)]; @@ -2198,6 +2201,10 @@ void xclaimCommand(client *c) { server.dirty++; } } + if (server.dirty == dirty && lastid_updated) { + streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); + server.dirty++; + } setDeferredMultiBulkLength(c,arraylenptr,arraylen); preventCommandPropagation(c); } From 5cc052230355b61442fa911b5fc4150ea098b4de Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 11 Oct 2018 21:46:47 +0800 Subject: [PATCH 4/4] Streams: panic if streamID invalid after check, should not be possible. --- src/t_stream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 16817c21..43d3e4da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2138,7 +2138,8 @@ void xclaimCommand(client *c) { for (int j = 5; j <= last_id_arg; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) + serverPanic("StreamID invalid after check. Should not be possible."); streamEncodeID(buf,&id); /* Lookup the ID in the group PEL. */