Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2018-07-10 12:06:44 +02:00
commit 0420c3276f
6 changed files with 20 additions and 14 deletions

View File

@ -433,7 +433,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
* before replying to a client. */ * before replying to a client. */
int invert = fe->mask & AE_BARRIER; int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already /* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still * processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid. * didn't processed, so we check if the event is still valid.
* *
@ -485,7 +485,7 @@ int aeWait(int fd, int mask, long long milliseconds) {
if ((retval = poll(&pfd, 1, milliseconds))== 1) { if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE; if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask; return retmask;
} else { } else {

View File

@ -406,10 +406,13 @@ void handleClientsBlockedOnKeys(void) {
/* Lookup the consumer for the group, if any. */ /* Lookup the consumer for the group, if any. */
streamConsumer *consumer = NULL; streamConsumer *consumer = NULL;
int noack = 0;
if (group) { if (group) {
consumer = streamLookupConsumer(group, consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr, receiver->bpop.xread_consumer->ptr,
1); 1);
noack = receiver->bpop.xread_group_noack;
} }
/* Emit the two elements sub-array consisting of /* Emit the two elements sub-array consisting of
@ -426,7 +429,7 @@ void handleClientsBlockedOnKeys(void) {
}; };
streamReplyWithRange(receiver,s,&start,NULL, streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count, receiver->bpop.xread_count,
0, group, consumer, 0, &pi); 0, group, consumer, noack, &pi);
/* Note that after we unblock the client, 'gt' /* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer * and other receiver->bpop stuff are no longer

View File

@ -140,6 +140,7 @@ client *createClient(int fd) {
c->bpop.target = NULL; c->bpop.target = NULL;
c->bpop.xread_group = NULL; c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL; c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0; c->bpop.numreplicas = 0;
c->bpop.reploffset = 0; c->bpop.reploffset = 0;
c->woff = 0; c->woff = 0;

View File

@ -1165,7 +1165,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
} else { } else {
/* If there is not a background saving/rewrite in progress check if /* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */ * we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) { for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j; struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes, /* Save if we reached the given amount of changes,
@ -1185,23 +1185,23 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
rdbSaveBackground(server.rdb_filename,rsiptr); rdbSaveBackground(server.rdb_filename,rsiptr);
break; break;
} }
} }
/* Trigger an AOF rewrite if needed. */ /* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON && if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 && server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_perc && server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size) server.aof_current_size > server.aof_rewrite_min_size)
{ {
long long base = server.aof_rewrite_base_size ? long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1; server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100; long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) { if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground(); rewriteAppendOnlyFileBackground();
} }
} }
} }

View File

@ -665,6 +665,7 @@ typedef struct blockingState {
robj *xread_group; /* XREADGROUP group name. */ robj *xread_group; /* XREADGROUP group name. */
robj *xread_consumer; /* XREADGROUP consumer name. */ robj *xread_consumer; /* XREADGROUP consumer name. */
mstime_t xread_retry_time, xread_retry_ttl; mstime_t xread_retry_time, xread_retry_ttl;
int xread_group_noack;
/* BLOCKED_WAIT */ /* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */ int numreplicas; /* Number of replicas we are waiting for ACK. */

View File

@ -1461,6 +1461,7 @@ void xreadCommand(client *c) {
incrRefCount(consumername); incrRefCount(consumername);
c->bpop.xread_group = groupname; c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername; c->bpop.xread_consumer = consumername;
c->bpop.xread_group_noack = noack;
} else { } else {
c->bpop.xread_group = NULL; c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL; c->bpop.xread_consumer = NULL;