diff --git a/src/ae.c b/src/ae.c index 65adb2ab..1ea67156 100644 --- a/src/ae.c +++ b/src/ae.c @@ -433,7 +433,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) * before replying to a client. */ int invert = fe->mask & AE_BARRIER; - /* Note the "fe->mask & mask & ..." code: maybe an already + /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * @@ -485,7 +485,7 @@ int aeWait(int fd, int mask, long long milliseconds) { if ((retval = poll(&pfd, 1, milliseconds))== 1) { if (pfd.revents & POLLIN) retmask |= AE_READABLE; if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; - if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; return retmask; } else { diff --git a/src/blocked.c b/src/blocked.c index e722a4e8..d8ae596d 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -406,10 +406,13 @@ void handleClientsBlockedOnKeys(void) { /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; + int noack = 0; + if (group) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1); + noack = receiver->bpop.xread_group_noack; } /* Emit the two elements sub-array consisting of @@ -426,7 +429,7 @@ void handleClientsBlockedOnKeys(void) { }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, 0, &pi); + 0, group, consumer, noack, &pi); /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer diff --git a/src/networking.c b/src/networking.c index 5e0d783f..58b553fe 100644 --- a/src/networking.c +++ b/src/networking.c @@ -140,6 +140,7 @@ client *createClient(int fd) { c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; + c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; diff --git a/src/server.c b/src/server.c index 41edab91..31019beb 100644 --- a/src/server.c +++ b/src/server.c @@ -1165,7 +1165,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ - for (j = 0; j < server.saveparamslen; j++) { + for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, @@ -1185,23 +1185,23 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { rdbSaveBackground(server.rdb_filename,rsiptr); break; } - } + } - /* Trigger an AOF rewrite if needed. */ - if (server.aof_state == AOF_ON && - server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && - server.aof_rewrite_perc && - server.aof_current_size > server.aof_rewrite_min_size) - { + /* Trigger an AOF rewrite if needed. */ + if (server.aof_state == AOF_ON && + server.rdb_child_pid == -1 && + server.aof_child_pid == -1 && + server.aof_rewrite_perc && + server.aof_current_size > server.aof_rewrite_min_size) + { long long base = server.aof_rewrite_base_size ? - server.aof_rewrite_base_size : 1; + server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } - } + } } diff --git a/src/server.h b/src/server.h index 47efd033..f3df6b46 100644 --- a/src/server.h +++ b/src/server.h @@ -665,6 +665,7 @@ typedef struct blockingState { robj *xread_group; /* XREADGROUP group name. */ robj *xread_consumer; /* XREADGROUP consumer name. */ mstime_t xread_retry_time, xread_retry_ttl; + int xread_group_noack; /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ diff --git a/src/t_stream.c b/src/t_stream.c index 52687778..d44cc7e6 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1461,6 +1461,7 @@ void xreadCommand(client *c) { incrRefCount(consumername); c->bpop.xread_group = groupname; c->bpop.xread_consumer = consumername; + c->bpop.xread_group_noack = noack; } else { c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL;