From 1c7d87df0cd64fd786bebd5cb9636912504359d5 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 28 Sep 2015 18:25:57 +0200 Subject: [PATCH] Avoid installing the client write handler when possible. --- src/networking.c | 22 +++++++++++++++------- src/replication.c | 6 ++++++ src/server.c | 32 ++++++++++++++++++++++++++++++++ src/server.h | 3 +++ 4 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/networking.c b/src/networking.c index 336561e1..4f6441de 100644 --- a/src/networking.c +++ b/src/networking.c @@ -166,16 +166,18 @@ int prepareClientToWrite(client *c) { /* Only install the handler if not already installed and, in case of * slaves, if the client can actually receive writes. */ if (c->bufpos == 0 && listLength(c->reply) == 0 && + !(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { - /* Try to install the write handler. */ - if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c) == AE_ERR) - { - freeClientAsync(c); - return C_ERR; - } + /* Here instead of installing the write handler, we just flag the + * client and put it into a list of clients that have something + * to write to the socket. This way before re-entering the event + * loop, we can try to directly write to the client sockets avoiding + * a system call. We'll only really install the write handler if + * we'll not be able to write the whole reply at once. */ + c->flags |= CLIENT_PENDING_WRITE; + listAddNodeTail(server.clients_pending_write,c); } /* Authorize the caller to queue in the output buffer of this client. */ @@ -739,6 +741,12 @@ void freeClient(client *c) { listDelNode(server.clients,ln); } + /* Remove from the list of pending writes if needed. */ + if (c->flags & CLIENT_PENDING_WRITE) { + ln = listSearchKey(server.clients_pending_write,c); + listDelNode(server.clients_pending_write,ln); + } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { diff --git a/src/replication.c b/src/replication.c index 24157eb6..ec33c57d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1857,6 +1857,12 @@ void replicationCacheMaster(client *c) { serverAssert(ln != NULL); listDelNode(server.clients,ln); + /* Remove from the list of clients with pending writes as well. */ + if (c->flags & CLIENT_PENDING_WRITE) { + ln = listSearchKey(server.clients_pending_write,c); + if (ln) listDelNode(server.clients_pending_write,ln); + } + /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ server.cached_master = server.master; diff --git a/src/server.c b/src/server.c index 5ceef9ae..316eba49 100644 --- a/src/server.c +++ b/src/server.c @@ -1274,6 +1274,34 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +/* This function is called just before entering the event loop, in the hope + * we can just write the replies to the client output buffer without any + * need to use a syscall in order to install the writable event handler, + * get it called, and so forth. */ +void handleClientsWithPendingWrites(void) { + listIter li; + listNode *ln; + + listRewind(server.clients_pending_write,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flags &= ~CLIENT_PENDING_WRITE; + listDelNode(server.clients_pending_write,ln); + + /* Try to write buffers to the client socket. */ + sendReplyToClient(server.el,c->fd,c,0); + + /* If there is nothing left, do nothing. Otherwise install + * the write handler. */ + if ((c->bufpos || listLength(c->reply)) && + aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, + sendReplyToClient, c) == AE_ERR) + { + freeClientAsync(c); + } + } +} + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ @@ -1317,6 +1345,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); + + /* Handle writes with pending output buffers. */ + handleClientsWithPendingWrites(); } /* =========================== Server initialization ======================== */ @@ -1781,6 +1812,7 @@ void initServer(void) { server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.clients_pending_write = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); diff --git a/src/server.h b/src/server.h index 4e99179e..99811289 100644 --- a/src/server.h +++ b/src/server.h @@ -260,6 +260,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */ #define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */ #define CLIENT_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */ +#define CLIENT_PENDING_WRITE (1<<20) /* Client has output to send but a write + handler is yet not installed. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -714,6 +716,7 @@ struct redisServer { int cfd_count; /* Used slots in cfd[] */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ + list *clients_pending_write; /* There is to write or install handler. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client, only used on crash report */ int clients_paused; /* True if clients are currently paused */