From 61f12973f763b1af4e4737a0985df8991451070e Mon Sep 17 00:00:00 2001 From: "dejun.xdj" Date: Mon, 9 Jul 2018 19:26:40 +0800 Subject: [PATCH] Bugfix: PEL is incorrect when consumer is blocked using xreadgroup with NOACK option. Save NOACK option into client.blockingState structure. --- src/blocked.c | 4 +++- src/networking.c | 1 + src/server.h | 1 + src/t_stream.c | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index e0dd5672..58772f1e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -382,6 +382,7 @@ void handleClientsBlockedOnKeys(void) { * consumer here. */ streamCG *group = NULL; streamConsumer *consumer = NULL; + int noack = 0; if (receiver->bpop.xread_group) { group = streamLookupCG(s, receiver->bpop.xread_group->ptr); @@ -396,6 +397,7 @@ void handleClientsBlockedOnKeys(void) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1); + noack = receiver->bpop.xread_group_noack; } /* Emit the two elements sub-array consisting of @@ -412,7 +414,7 @@ void handleClientsBlockedOnKeys(void) { }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, 0, &pi); + 0, group, consumer, noack, &pi); /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer diff --git a/src/networking.c b/src/networking.c index 5e0d783f..58b553fe 100644 --- a/src/networking.c +++ b/src/networking.c @@ -140,6 +140,7 @@ client *createClient(int fd) { c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; + c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; diff --git a/src/server.h b/src/server.h index 47efd033..f3df6b46 100644 --- a/src/server.h +++ b/src/server.h @@ -665,6 +665,7 @@ typedef struct blockingState { robj *xread_group; /* XREADGROUP group name. */ robj *xread_consumer; /* XREADGROUP consumer name. */ mstime_t xread_retry_time, xread_retry_ttl; + int xread_group_noack; /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ diff --git a/src/t_stream.c b/src/t_stream.c index 5c03c972..db0b4fb9 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1426,6 +1426,7 @@ void xreadCommand(client *c) { incrRefCount(consumername); c->bpop.xread_group = groupname; c->bpop.xread_consumer = consumername; + c->bpop.xread_group_noack = noack; } else { c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL;