mirror of
https://github.com/fluencelabs/redis
synced 2025-03-21 01:50:50 +00:00
Bugfix: PEL is incorrect when consumer is blocked using xreadgroup with NOACK option.
Save NOACK option into client.blockingState structure.
This commit is contained in:
parent
b67f027699
commit
61f12973f7
@ -382,6 +382,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* consumer here. */
|
* consumer here. */
|
||||||
streamCG *group = NULL;
|
streamCG *group = NULL;
|
||||||
streamConsumer *consumer = NULL;
|
streamConsumer *consumer = NULL;
|
||||||
|
int noack = 0;
|
||||||
if (receiver->bpop.xread_group) {
|
if (receiver->bpop.xread_group) {
|
||||||
group = streamLookupCG(s,
|
group = streamLookupCG(s,
|
||||||
receiver->bpop.xread_group->ptr);
|
receiver->bpop.xread_group->ptr);
|
||||||
@ -396,6 +397,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
consumer = streamLookupConsumer(group,
|
consumer = streamLookupConsumer(group,
|
||||||
receiver->bpop.xread_consumer->ptr,
|
receiver->bpop.xread_consumer->ptr,
|
||||||
1);
|
1);
|
||||||
|
noack = receiver->bpop.xread_group_noack;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Emit the two elements sub-array consisting of
|
/* Emit the two elements sub-array consisting of
|
||||||
@ -412,7 +414,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
};
|
};
|
||||||
streamReplyWithRange(receiver,s,&start,NULL,
|
streamReplyWithRange(receiver,s,&start,NULL,
|
||||||
receiver->bpop.xread_count,
|
receiver->bpop.xread_count,
|
||||||
0, group, consumer, 0, &pi);
|
0, group, consumer, noack, &pi);
|
||||||
|
|
||||||
/* Note that after we unblock the client, 'gt'
|
/* Note that after we unblock the client, 'gt'
|
||||||
* and other receiver->bpop stuff are no longer
|
* and other receiver->bpop stuff are no longer
|
||||||
|
@ -140,6 +140,7 @@ client *createClient(int fd) {
|
|||||||
c->bpop.target = NULL;
|
c->bpop.target = NULL;
|
||||||
c->bpop.xread_group = NULL;
|
c->bpop.xread_group = NULL;
|
||||||
c->bpop.xread_consumer = NULL;
|
c->bpop.xread_consumer = NULL;
|
||||||
|
c->bpop.xread_group_noack = 0;
|
||||||
c->bpop.numreplicas = 0;
|
c->bpop.numreplicas = 0;
|
||||||
c->bpop.reploffset = 0;
|
c->bpop.reploffset = 0;
|
||||||
c->woff = 0;
|
c->woff = 0;
|
||||||
|
@ -665,6 +665,7 @@ typedef struct blockingState {
|
|||||||
robj *xread_group; /* XREADGROUP group name. */
|
robj *xread_group; /* XREADGROUP group name. */
|
||||||
robj *xread_consumer; /* XREADGROUP consumer name. */
|
robj *xread_consumer; /* XREADGROUP consumer name. */
|
||||||
mstime_t xread_retry_time, xread_retry_ttl;
|
mstime_t xread_retry_time, xread_retry_ttl;
|
||||||
|
int xread_group_noack;
|
||||||
|
|
||||||
/* BLOCKED_WAIT */
|
/* BLOCKED_WAIT */
|
||||||
int numreplicas; /* Number of replicas we are waiting for ACK. */
|
int numreplicas; /* Number of replicas we are waiting for ACK. */
|
||||||
|
@ -1426,6 +1426,7 @@ void xreadCommand(client *c) {
|
|||||||
incrRefCount(consumername);
|
incrRefCount(consumername);
|
||||||
c->bpop.xread_group = groupname;
|
c->bpop.xread_group = groupname;
|
||||||
c->bpop.xread_consumer = consumername;
|
c->bpop.xread_consumer = consumername;
|
||||||
|
c->bpop.xread_group_noack = noack;
|
||||||
} else {
|
} else {
|
||||||
c->bpop.xread_group = NULL;
|
c->bpop.xread_group = NULL;
|
||||||
c->bpop.xread_consumer = NULL;
|
c->bpop.xread_consumer = NULL;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user