mirror of
https://github.com/fluencelabs/redis
synced 2025-03-30 22:31:03 +00:00
Avoid installing the client write handler when possible.
This commit is contained in:
parent
d1b6a17d1e
commit
1c7d87df0c
@ -166,16 +166,18 @@ int prepareClientToWrite(client *c) {
|
|||||||
/* Only install the handler if not already installed and, in case of
|
/* Only install the handler if not already installed and, in case of
|
||||||
* slaves, if the client can actually receive writes. */
|
* slaves, if the client can actually receive writes. */
|
||||||
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
|
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
|
||||||
|
!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||||
(c->replstate == REPL_STATE_NONE ||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
{
|
{
|
||||||
/* Try to install the write handler. */
|
/* Here instead of installing the write handler, we just flag the
|
||||||
if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
|
* client and put it into a list of clients that have something
|
||||||
sendReplyToClient, c) == AE_ERR)
|
* to write to the socket. This way before re-entering the event
|
||||||
{
|
* loop, we can try to directly write to the client sockets avoiding
|
||||||
freeClientAsync(c);
|
* a system call. We'll only really install the write handler if
|
||||||
return C_ERR;
|
* we'll not be able to write the whole reply at once. */
|
||||||
}
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
|
listAddNodeTail(server.clients_pending_write,c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Authorize the caller to queue in the output buffer of this client. */
|
/* Authorize the caller to queue in the output buffer of this client. */
|
||||||
@ -739,6 +741,12 @@ void freeClient(client *c) {
|
|||||||
listDelNode(server.clients,ln);
|
listDelNode(server.clients,ln);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Remove from the list of pending writes if needed. */
|
||||||
|
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||||
|
ln = listSearchKey(server.clients_pending_write,c);
|
||||||
|
listDelNode(server.clients_pending_write,ln);
|
||||||
|
}
|
||||||
|
|
||||||
/* When client was just unblocked because of a blocking operation,
|
/* When client was just unblocked because of a blocking operation,
|
||||||
* remove it from the list of unblocked clients. */
|
* remove it from the list of unblocked clients. */
|
||||||
if (c->flags & CLIENT_UNBLOCKED) {
|
if (c->flags & CLIENT_UNBLOCKED) {
|
||||||
|
@ -1857,6 +1857,12 @@ void replicationCacheMaster(client *c) {
|
|||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
listDelNode(server.clients,ln);
|
listDelNode(server.clients,ln);
|
||||||
|
|
||||||
|
/* Remove from the list of clients with pending writes as well. */
|
||||||
|
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||||
|
ln = listSearchKey(server.clients_pending_write,c);
|
||||||
|
if (ln) listDelNode(server.clients_pending_write,ln);
|
||||||
|
}
|
||||||
|
|
||||||
/* Save the master. Server.master will be set to null later by
|
/* Save the master. Server.master will be set to null later by
|
||||||
* replicationHandleMasterDisconnection(). */
|
* replicationHandleMasterDisconnection(). */
|
||||||
server.cached_master = server.master;
|
server.cached_master = server.master;
|
||||||
|
32
src/server.c
32
src/server.c
@ -1274,6 +1274,34 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
return 1000/server.hz;
|
return 1000/server.hz;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function is called just before entering the event loop, in the hope
|
||||||
|
* we can just write the replies to the client output buffer without any
|
||||||
|
* need to use a syscall in order to install the writable event handler,
|
||||||
|
* get it called, and so forth. */
|
||||||
|
void handleClientsWithPendingWrites(void) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
listRewind(server.clients_pending_write,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
client *c = listNodeValue(ln);
|
||||||
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
|
listDelNode(server.clients_pending_write,ln);
|
||||||
|
|
||||||
|
/* Try to write buffers to the client socket. */
|
||||||
|
sendReplyToClient(server.el,c->fd,c,0);
|
||||||
|
|
||||||
|
/* If there is nothing left, do nothing. Otherwise install
|
||||||
|
* the write handler. */
|
||||||
|
if ((c->bufpos || listLength(c->reply)) &&
|
||||||
|
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
|
||||||
|
sendReplyToClient, c) == AE_ERR)
|
||||||
|
{
|
||||||
|
freeClientAsync(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This function gets called every time Redis is entering the
|
/* This function gets called every time Redis is entering the
|
||||||
* main loop of the event driven library, that is, before to sleep
|
* main loop of the event driven library, that is, before to sleep
|
||||||
* for ready file descriptors. */
|
* for ready file descriptors. */
|
||||||
@ -1317,6 +1345,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
|
|
||||||
|
/* Handle writes with pending output buffers. */
|
||||||
|
handleClientsWithPendingWrites();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================== Server initialization ======================== */
|
/* =========================== Server initialization ======================== */
|
||||||
@ -1781,6 +1812,7 @@ void initServer(void) {
|
|||||||
server.clients_to_close = listCreate();
|
server.clients_to_close = listCreate();
|
||||||
server.slaves = listCreate();
|
server.slaves = listCreate();
|
||||||
server.monitors = listCreate();
|
server.monitors = listCreate();
|
||||||
|
server.clients_pending_write = listCreate();
|
||||||
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
||||||
server.unblocked_clients = listCreate();
|
server.unblocked_clients = listCreate();
|
||||||
server.ready_keys = listCreate();
|
server.ready_keys = listCreate();
|
||||||
|
@ -260,6 +260,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
|
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
|
||||||
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
|
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
|
||||||
#define CLIENT_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
|
#define CLIENT_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
|
||||||
|
#define CLIENT_PENDING_WRITE (1<<20) /* Client has output to send but a write
|
||||||
|
handler is yet not installed. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -714,6 +716,7 @@ struct redisServer {
|
|||||||
int cfd_count; /* Used slots in cfd[] */
|
int cfd_count; /* Used slots in cfd[] */
|
||||||
list *clients; /* List of active clients */
|
list *clients; /* List of active clients */
|
||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
|
list *clients_pending_write; /* There is to write or install handler. */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
client *current_client; /* Current client, only used on crash report */
|
client *current_client; /* Current client, only used on crash report */
|
||||||
int clients_paused; /* True if clients are currently paused */
|
int clients_paused; /* True if clients are currently paused */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user