From 69c30965eb15ed86acadd8e9db79feb7b7633497 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 9 Oct 2018 13:15:41 +0200 Subject: [PATCH] Introduce protectClient() + some refactoring. The idea is to have an API for the cases like -BUSY state and DEBUG RELOAD where we have to manually deinstall the read handler. See #4804. --- src/networking.c | 75 ++++++++++++++++++++++++++++++++++++------------ src/server.h | 3 ++ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/networking.c b/src/networking.c index 06715ee4..abce50bb 100644 --- a/src/networking.c +++ b/src/networking.c @@ -160,6 +160,32 @@ client *createClient(int fd) { return c; } +/* This funciton puts the client in the queue of clients that should write + * their output buffers to the socket. Note that it does not *yet* install + * the write handler, to start clients are put in a queue of clients that need + * to write, so we try to do that before returning in the event loop (see the + * handleClientsWithPendingWrites() function). + * If we fail and there is more data to write, compared to what the socket + * buffers can hold, then we'll really install the handler. */ +void clientInstallWriteHandler(client *c) { + /* Schedule the client to write the output buffers to the socket only + * if not already done and, for slaves, if the slave can actually receive + * writes at this stage. */ + if (!(c->flags & CLIENT_PENDING_WRITE) && + (c->replstate == REPL_STATE_NONE || + (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) + { + /* Here instead of installing the write handler, we just flag the + * client and put it into a list of clients that have something + * to write to the socket. This way before re-entering the event + * loop, we can try to directly write to the client sockets avoiding + * a system call. We'll only really install the write handler if + * we'll not be able to write the whole reply at once. */ + c->flags |= CLIENT_PENDING_WRITE; + listAddNodeHead(server.clients_pending_write,c); + } +} + /* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * @@ -197,24 +223,9 @@ int prepareClientToWrite(client *c) { if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */ - /* Schedule the client to write the output buffers to the socket only - * if not already done (there were no pending writes already and the client - * was yet not flagged), and, for slaves, if the slave can actually - * receive writes at this stage. */ - if (!clientHasPendingReplies(c) && - !(c->flags & CLIENT_PENDING_WRITE) && - (c->replstate == REPL_STATE_NONE || - (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) - { - /* Here instead of installing the write handler, we just flag the - * client and put it into a list of clients that have something - * to write to the socket. This way before re-entering the event - * loop, we can try to directly write to the client sockets avoiding - * a system call. We'll only really install the write handler if - * we'll not be able to write the whole reply at once. */ - c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.clients_pending_write,c); - } + /* Schedule the client to write the output buffers to the socket, unless + * it should already be setup to do so (it has already pending data). */ + if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -1105,6 +1116,34 @@ void resetClient(client *c) { } } +/* This funciton is used when we want to re-enter the event loop but there + * is the risk that the client we are dealing with will be freed in some + * way. This happens for instance in: + * + * * DEBUG RELOAD and similar. + * * When a Lua script is in -BUSY state. + * + * So the function will protect the client by doing two things: + * + * 1) It removes the file events. This way it is not possible that an + * error is signaled on the socket, freeing the client. + * 2) Moreover it makes sure that if the client is freed in a different code + * path, it is not really released, but only marked for later release. */ +void protectClient(client *c) { + c->flags |= CLIENT_PROTECTED; + aeDeleteFileEvent(server.el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); +} + +/* This will undo the client protection done by protectClient() */ +void unprotectClient(client *c) { + if (c->flags & CLIENT_PROTECTED) { + c->flags &= ~CLIENT_PROTECTED; + aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c); + if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); + } +} + /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP, * this function consumes the client query buffer and creates a command ready * to be executed inside the client structure. Returns C_OK if the command diff --git a/src/server.h b/src/server.h index 4c4c0ce5..73630b89 100644 --- a/src/server.h +++ b/src/server.h @@ -256,6 +256,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */ #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ +#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1473,6 +1474,8 @@ int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); void linkClient(client *c); +void protectClient(client *c); +void unprotectClient(client *c); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...)