From 46f5a2ca077dbc9c4edab1f68feaa2c68244e0fc Mon Sep 17 00:00:00 2001 From: "dejun.xdj" Date: Wed, 4 Jul 2018 20:04:06 +0800 Subject: [PATCH 1/2] Fix indentation. --- src/ae.c | 4 ++-- src/server.c | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/ae.c b/src/ae.c index 65adb2ab..1ea67156 100644 --- a/src/ae.c +++ b/src/ae.c @@ -433,7 +433,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) * before replying to a client. */ int invert = fe->mask & AE_BARRIER; - /* Note the "fe->mask & mask & ..." code: maybe an already + /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * @@ -485,7 +485,7 @@ int aeWait(int fd, int mask, long long milliseconds) { if ((retval = poll(&pfd, 1, milliseconds))== 1) { if (pfd.revents & POLLIN) retmask |= AE_READABLE; if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; - if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; return retmask; } else { diff --git a/src/server.c b/src/server.c index 347ebcef..f7fab5fd 100644 --- a/src/server.c +++ b/src/server.c @@ -1151,7 +1151,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ - for (j = 0; j < server.saveparamslen; j++) { + for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, @@ -1171,23 +1171,23 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { rdbSaveBackground(server.rdb_filename,rsiptr); break; } - } + } - /* Trigger an AOF rewrite if needed. */ - if (server.aof_state == AOF_ON && - server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && - server.aof_rewrite_perc && - server.aof_current_size > server.aof_rewrite_min_size) - { + /* Trigger an AOF rewrite if needed. */ + if (server.aof_state == AOF_ON && + server.rdb_child_pid == -1 && + server.aof_child_pid == -1 && + server.aof_rewrite_perc && + server.aof_current_size > server.aof_rewrite_min_size) + { long long base = server.aof_rewrite_base_size ? - server.aof_rewrite_base_size : 1; + server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } - } + } } From 61f12973f763b1af4e4737a0985df8991451070e Mon Sep 17 00:00:00 2001 From: "dejun.xdj" Date: Mon, 9 Jul 2018 19:26:40 +0800 Subject: [PATCH 2/2] Bugfix: PEL is incorrect when consumer is blocked using xreadgroup with NOACK option. Save NOACK option into client.blockingState structure. --- src/blocked.c | 4 +++- src/networking.c | 1 + src/server.h | 1 + src/t_stream.c | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index e0dd5672..58772f1e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -382,6 +382,7 @@ void handleClientsBlockedOnKeys(void) { * consumer here. */ streamCG *group = NULL; streamConsumer *consumer = NULL; + int noack = 0; if (receiver->bpop.xread_group) { group = streamLookupCG(s, receiver->bpop.xread_group->ptr); @@ -396,6 +397,7 @@ void handleClientsBlockedOnKeys(void) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1); + noack = receiver->bpop.xread_group_noack; } /* Emit the two elements sub-array consisting of @@ -412,7 +414,7 @@ void handleClientsBlockedOnKeys(void) { }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, 0, &pi); + 0, group, consumer, noack, &pi); /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer diff --git a/src/networking.c b/src/networking.c index 5e0d783f..58b553fe 100644 --- a/src/networking.c +++ b/src/networking.c @@ -140,6 +140,7 @@ client *createClient(int fd) { c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; + c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; diff --git a/src/server.h b/src/server.h index 47efd033..f3df6b46 100644 --- a/src/server.h +++ b/src/server.h @@ -665,6 +665,7 @@ typedef struct blockingState { robj *xread_group; /* XREADGROUP group name. */ robj *xread_consumer; /* XREADGROUP consumer name. */ mstime_t xread_retry_time, xread_retry_ttl; + int xread_group_noack; /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ diff --git a/src/t_stream.c b/src/t_stream.c index 5c03c972..db0b4fb9 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1426,6 +1426,7 @@ void xreadCommand(client *c) { incrRefCount(consumername); c->bpop.xread_group = groupname; c->bpop.xread_consumer = consumername; + c->bpop.xread_group_noack = noack; } else { c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL;