From fdb3be939ec30be0d39ea639c3988be8b1516c2c Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 30 Sep 2015 16:41:48 +0200 Subject: [PATCH] Refactoring: new function to test if client has pending output. --- src/networking.c | 20 ++++++++++++++------ src/replication.c | 4 ++-- src/server.h | 1 + 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 9a69f0fb..d02d02b5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -163,9 +163,11 @@ int prepareClientToWrite(client *c) { if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */ - /* Only install the handler if not already installed and, in case of - * slaves, if the client can actually receive writes. */ - if (c->bufpos == 0 && listLength(c->reply) == 0 && + /* Schedule the client to write the output buffers to the socket only + * if not already done (there were no pending writes alreday and the client + * was yet not flagged), and, for slaves, if the slave can actually + * receive writes at this stage. */ + if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -591,6 +593,12 @@ void copyClientOutputBuffer(client *dst, client *src) { dst->reply_bytes = src->reply_bytes; } +/* Return true if the specified client has pending reply buffers to write to + * the socket. */ +int clientHasPendingReplies(client *c) { + return c->bufpos || listLength(c->reply); +} + #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags) { client *c; @@ -824,7 +832,7 @@ int writeToClient(int fd, client *c, int handler_installed) { size_t objmem; robj *o; - while(c->bufpos > 0 || listLength(c->reply)) { + while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; @@ -890,7 +898,7 @@ int writeToClient(int fd, client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } - if (c->bufpos == 0 && listLength(c->reply) == 0) { + if (!clientHasPendingReplies(c)) { c->sentlen = 0; if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); @@ -929,7 +937,7 @@ void handleClientsWithPendingWrites(void) { /* If there is nothing left, do nothing. Otherwise install * the write handler. */ - if ((c->bufpos || listLength(c->reply)) && + if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) { diff --git a/src/replication.c b/src/replication.c index ec33c57d..6b5f44dd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -570,7 +570,7 @@ void syncCommand(client *c) { * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ - if (listLength(c->reply) != 0 || c->bufpos != 0) { + if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } @@ -1924,7 +1924,7 @@ void replicationResurrectCachedMaster(int newfd) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ - if (server.master->bufpos || listLength(server.master->reply)) { + if (clientHasPendingReplies(server.master)) { if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); diff --git a/src/server.h b/src/server.h index fdcc45c2..400f1ff8 100644 --- a/src/server.h +++ b/src/server.h @@ -1111,6 +1111,7 @@ void pauseClients(mstime_t duration); int clientsArePaused(void); int processEventsWhileBlocked(void); void handleClientsWithPendingWrites(void); +int clientHasPendingReplies(client *c); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...)