mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 09:30:55 +00:00
Introduce protectClient() + some refactoring.
The idea is to have an API for the cases like -BUSY state and DEBUG RELOAD where we have to manually deinstall the read handler. See #4804.
This commit is contained in:
parent
6660458a4c
commit
69c30965eb
@ -160,6 +160,32 @@ client *createClient(int fd) {
|
|||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This funciton puts the client in the queue of clients that should write
|
||||||
|
* their output buffers to the socket. Note that it does not *yet* install
|
||||||
|
* the write handler, to start clients are put in a queue of clients that need
|
||||||
|
* to write, so we try to do that before returning in the event loop (see the
|
||||||
|
* handleClientsWithPendingWrites() function).
|
||||||
|
* If we fail and there is more data to write, compared to what the socket
|
||||||
|
* buffers can hold, then we'll really install the handler. */
|
||||||
|
void clientInstallWriteHandler(client *c) {
|
||||||
|
/* Schedule the client to write the output buffers to the socket only
|
||||||
|
* if not already done and, for slaves, if the slave can actually receive
|
||||||
|
* writes at this stage. */
|
||||||
|
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
|
{
|
||||||
|
/* Here instead of installing the write handler, we just flag the
|
||||||
|
* client and put it into a list of clients that have something
|
||||||
|
* to write to the socket. This way before re-entering the event
|
||||||
|
* loop, we can try to directly write to the client sockets avoiding
|
||||||
|
* a system call. We'll only really install the write handler if
|
||||||
|
* we'll not be able to write the whole reply at once. */
|
||||||
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
|
listAddNodeHead(server.clients_pending_write,c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called every time we are going to transmit new data
|
/* This function is called every time we are going to transmit new data
|
||||||
* to the client. The behavior is the following:
|
* to the client. The behavior is the following:
|
||||||
*
|
*
|
||||||
@ -197,24 +223,9 @@ int prepareClientToWrite(client *c) {
|
|||||||
|
|
||||||
if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
|
if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
|
||||||
|
|
||||||
/* Schedule the client to write the output buffers to the socket only
|
/* Schedule the client to write the output buffers to the socket, unless
|
||||||
* if not already done (there were no pending writes already and the client
|
* it should already be setup to do so (it has already pending data). */
|
||||||
* was yet not flagged), and, for slaves, if the slave can actually
|
if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||||
* receive writes at this stage. */
|
|
||||||
if (!clientHasPendingReplies(c) &&
|
|
||||||
!(c->flags & CLIENT_PENDING_WRITE) &&
|
|
||||||
(c->replstate == REPL_STATE_NONE ||
|
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
|
||||||
{
|
|
||||||
/* Here instead of installing the write handler, we just flag the
|
|
||||||
* client and put it into a list of clients that have something
|
|
||||||
* to write to the socket. This way before re-entering the event
|
|
||||||
* loop, we can try to directly write to the client sockets avoiding
|
|
||||||
* a system call. We'll only really install the write handler if
|
|
||||||
* we'll not be able to write the whole reply at once. */
|
|
||||||
c->flags |= CLIENT_PENDING_WRITE;
|
|
||||||
listAddNodeHead(server.clients_pending_write,c);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Authorize the caller to queue in the output buffer of this client. */
|
/* Authorize the caller to queue in the output buffer of this client. */
|
||||||
return C_OK;
|
return C_OK;
|
||||||
@ -1105,6 +1116,34 @@ void resetClient(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This funciton is used when we want to re-enter the event loop but there
|
||||||
|
* is the risk that the client we are dealing with will be freed in some
|
||||||
|
* way. This happens for instance in:
|
||||||
|
*
|
||||||
|
* * DEBUG RELOAD and similar.
|
||||||
|
* * When a Lua script is in -BUSY state.
|
||||||
|
*
|
||||||
|
* So the function will protect the client by doing two things:
|
||||||
|
*
|
||||||
|
* 1) It removes the file events. This way it is not possible that an
|
||||||
|
* error is signaled on the socket, freeing the client.
|
||||||
|
* 2) Moreover it makes sure that if the client is freed in a different code
|
||||||
|
* path, it is not really released, but only marked for later release. */
|
||||||
|
void protectClient(client *c) {
|
||||||
|
c->flags |= CLIENT_PROTECTED;
|
||||||
|
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
||||||
|
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This will undo the client protection done by protectClient() */
|
||||||
|
void unprotectClient(client *c) {
|
||||||
|
if (c->flags & CLIENT_PROTECTED) {
|
||||||
|
c->flags &= ~CLIENT_PROTECTED;
|
||||||
|
aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c);
|
||||||
|
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
|
/* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
|
||||||
* this function consumes the client query buffer and creates a command ready
|
* this function consumes the client query buffer and creates a command ready
|
||||||
* to be executed inside the client structure. Returns C_OK if the command
|
* to be executed inside the client structure. Returns C_OK if the command
|
||||||
|
@ -256,6 +256,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */
|
#define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */
|
||||||
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
|
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
|
||||||
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
|
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
|
||||||
|
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -1473,6 +1474,8 @@ int clientHasPendingReplies(client *c);
|
|||||||
void unlinkClient(client *c);
|
void unlinkClient(client *c);
|
||||||
int writeToClient(int fd, client *c, int handler_installed);
|
int writeToClient(int fd, client *c, int handler_installed);
|
||||||
void linkClient(client *c);
|
void linkClient(client *c);
|
||||||
|
void protectClient(client *c);
|
||||||
|
void unprotectClient(client *c);
|
||||||
|
|
||||||
#ifdef __GNUC__
|
#ifdef __GNUC__
|
||||||
void addReplyErrorFormat(client *c, const char *fmt, ...)
|
void addReplyErrorFormat(client *c, const char *fmt, ...)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user