1
0
mirror of https://github.com/fluencelabs/redis synced 2025-03-18 00:20:50 +00:00

Make pending buffer processing safe for CLIENT_MASTER client.

Related to .
This commit is contained in:
antirez 2018-09-03 18:17:25 +02:00
parent febe102bf6
commit 3e7349fdaf
3 changed files with 22 additions and 13 deletions

@ -126,7 +126,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */ * the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) { if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) { if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c); processInputBufferAndReplicate(c);
} }
} }
} }

@ -1425,6 +1425,25 @@ void processInputBuffer(client *c) {
server.current_client = NULL; server.current_client = NULL;
} }
/* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-slaves, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata; client *c = (client*) privdata;
int nread, readlen; int nread, readlen;
@ -1492,18 +1511,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
* was actually applied to the master state: this quantity, and its * was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to * corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */ * the sub-slaves and to the replication backlog. */
if (!(c->flags & CLIENT_MASTER)) { processInputBufferAndReplicate(c);
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
} }
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,

@ -1418,6 +1418,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void *addDeferredMultiBulkLength(client *c); void *addDeferredMultiBulkLength(client *c);
void setDeferredMultiBulkLength(client *c, void *node, long length); void setDeferredMultiBulkLength(client *c, void *node, long length);
void processInputBuffer(client *c); void processInputBuffer(client *c);
void processInputBufferAndReplicate(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);