From 6c001bfc0ddf8e7a93e86dad250e416166253d85 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 3 Sep 2018 18:39:18 +0200 Subject: [PATCH] Unblocked clients API refactoring. See #4418. --- src/blocked.c | 32 ++++++++++++++++++++++++++------ src/networking.c | 9 ++++----- src/scripting.c | 6 ++---- src/server.h | 1 + 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index aeca87a6..00212ed6 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -132,6 +132,31 @@ void processUnblockedClients(void) { } } +/* This function will schedule the client for reprocessing at a safe time. + * + * This is useful when a client was blocked for some reason (blocking opeation, + * CLIENT PAUSE, or whatever), because it may end with some accumulated query + * buffer that needs to be processed ASAP: + * + * 1. When a client is blocked, its readable handler is still active. + * 2. However in this case it only gets data into the query buffer, but the + * query is not parsed or executed once there is enough to proceed as + * usually (because the client is blocked... so we can't execute commands). + * 3. When the client is unblocked, without this function, the client would + * have to write some query in order for the readable handler to finally + * call processQueryBuffer*() on it. + * 4. With this function instead we can put the client in a queue that will + * process it for queries ready to be executed at a safe time. + */ +void queueClientForReprocessing(client *c) { + /* The client may already be into the unblocked list because of a previous + * blocking operation, don't add back it into the list multiple times. */ + if (!(c->flags & CLIENT_UNBLOCKED)) { + c->flags |= CLIENT_UNBLOCKED; + listAddNodeTail(server.unblocked_clients,c); + } +} + /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { @@ -152,12 +177,7 @@ void unblockClient(client *c) { server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; - /* The client may already be into the unblocked list because of a previous - * blocking operation, don't add back it into the list multiple times. */ - if (!(c->flags & CLIENT_UNBLOCKED)) { - c->flags |= CLIENT_UNBLOCKED; - listAddNodeTail(server.unblocked_clients,c); - } + queueClientForReprocessing(c); } /* This function gets called when a blocked client timed out in order to diff --git a/src/networking.c b/src/networking.c index 824d8824..0c1b3016 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2134,11 +2134,10 @@ int clientsArePaused(void) { while ((ln = listNext(&li)) != NULL) { c = listNodeValue(ln); - /* Don't touch slaves and blocked or unblocked clients. - * The latter pending requests be processed when unblocked. */ - if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED|CLIENT_UNBLOCKED)) continue; - c->flags |= CLIENT_UNBLOCKED; - listAddNodeTail(server.unblocked_clients,c); + /* Don't touch slaves and blocked clients. + * The latter pending requests will be processed when unblocked. */ + if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; + queueClientForReprocessing(c); } } return server.clients_paused; diff --git a/src/scripting.c b/src/scripting.c index 6c311dbe..8aa62071 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1367,10 +1367,8 @@ void evalGenericCommand(client *c, int evalsha) { * script timeout was detected. */ aeCreateFileEvent(server.el,c->fd,AE_READABLE, readQueryFromClient,c); - if (server.masterhost && server.master && !(server.master->flags & CLIENT_UNBLOCKED)) { - server.master->flags |= CLIENT_UNBLOCKED; - listAddNodeTail(server.unblocked_clients,server.master); - } + if (server.masterhost && server.master) + queueClientForReprocessing(server.master); } server.lua_caller = NULL; diff --git a/src/server.h b/src/server.h index 4a5967f1..09348585 100644 --- a/src/server.h +++ b/src/server.h @@ -1884,6 +1884,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body); void processUnblockedClients(void); void blockClient(client *c, int btype); void unblockClient(client *c); +void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void);