From 3e7349fdaf8cdbf96e595750034af43e6d6c56f0 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 3 Sep 2018 18:17:25 +0200 Subject: [PATCH] Make pending buffer processing safe for CLIENT_MASTER client. Related to #5305. --- src/blocked.c | 2 +- src/networking.c | 32 ++++++++++++++++++++------------ src/server.h | 1 + 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 35c33d1c..aeca87a6 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -126,7 +126,7 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c); + processInputBufferAndReplicate(c); } } } diff --git a/src/networking.c b/src/networking.c index 27c69530..8e55ec90 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1425,6 +1425,25 @@ void processInputBuffer(client *c) { server.current_client = NULL; } +/* This is a wrapper for processInputBuffer that also cares about handling + * the replication forwarding to the sub-slaves, in case the client 'c' + * is flagged as master. Usually you want to call this instead of the + * raw processInputBuffer(). */ +void processInputBufferAndReplicate(client *c) { + if (!(c->flags & CLIENT_MASTER)) { + processInputBuffer(c); + } else { + size_t prev_offset = c->reploff; + processInputBuffer(c); + size_t applied = c->reploff - prev_offset; + if (applied) { + replicationFeedSlavesFromMasterStream(server.slaves, + c->pending_querybuf, applied); + sdsrange(c->pending_querybuf,applied,-1); + } + } +} + void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; @@ -1492,18 +1511,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ - if (!(c->flags & CLIENT_MASTER)) { - processInputBuffer(c); - } else { - size_t prev_offset = c->reploff; - processInputBuffer(c); - size_t applied = c->reploff - prev_offset; - if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); - sdsrange(c->pending_querybuf,applied,-1); - } - } + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, diff --git a/src/server.h b/src/server.h index 4fb8e383..4a5967f1 100644 --- a/src/server.h +++ b/src/server.h @@ -1418,6 +1418,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void *addDeferredMultiBulkLength(client *c); void setDeferredMultiBulkLength(client *c, void *node, long length); void processInputBuffer(client *c); +void processInputBufferAndReplicate(client *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);