mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 01:20:50 +00:00
Modules API: blocked client disconnection callback.
This commit is contained in:
parent
005c932f22
commit
404160a271
44
src/module.c
44
src/module.c
@ -158,7 +158,9 @@ typedef struct RedisModuleKey RedisModuleKey;
|
|||||||
|
|
||||||
/* Function pointer type of a function representing a command inside
|
/* Function pointer type of a function representing a command inside
|
||||||
* a Redis module. */
|
* a Redis module. */
|
||||||
|
struct RedisModuleBlockedClient;
|
||||||
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
|
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
|
||||||
|
typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
|
||||||
|
|
||||||
/* This struct holds the information about a command registered by a module.*/
|
/* This struct holds the information about a command registered by a module.*/
|
||||||
struct RedisModuleCommandProxy {
|
struct RedisModuleCommandProxy {
|
||||||
@ -201,6 +203,7 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
RedisModule *module; /* Module blocking the client. */
|
RedisModule *module; /* Module blocking the client. */
|
||||||
RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
|
RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
|
||||||
RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
|
RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
|
||||||
|
RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
|
||||||
void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
|
void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
|
||||||
void *privdata; /* Module private data that may be used by the reply
|
void *privdata; /* Module private data that may be used by the reply
|
||||||
or timeout callback. It is set via the
|
or timeout callback. It is set via the
|
||||||
@ -3438,6 +3441,17 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in
|
|||||||
* running the list of clients blocked by a module that need to be unblocked. */
|
* running the list of clients blocked by a module that need to be unblocked. */
|
||||||
void unblockClientFromModule(client *c) {
|
void unblockClientFromModule(client *c) {
|
||||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
|
||||||
|
/* Call the disconnection callback if any. */
|
||||||
|
if (bc->disconnect_callback) {
|
||||||
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
|
ctx.blocked_privdata = bc->privdata;
|
||||||
|
ctx.module = bc->module;
|
||||||
|
ctx.client = bc->client;
|
||||||
|
bc->disconnect_callback(&ctx,bc);
|
||||||
|
moduleFreeContext(&ctx);
|
||||||
|
}
|
||||||
|
|
||||||
bc->client = NULL;
|
bc->client = NULL;
|
||||||
/* Reset the client for a new query since, for blocking commands implemented
|
/* Reset the client for a new query since, for blocking commands implemented
|
||||||
* into modules, we do not it immediately after the command returns (and
|
* into modules, we do not it immediately after the command returns (and
|
||||||
@ -3478,6 +3492,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
|||||||
bc->module = ctx->module;
|
bc->module = ctx->module;
|
||||||
bc->reply_callback = reply_callback;
|
bc->reply_callback = reply_callback;
|
||||||
bc->timeout_callback = timeout_callback;
|
bc->timeout_callback = timeout_callback;
|
||||||
|
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||||
bc->free_privdata = free_privdata;
|
bc->free_privdata = free_privdata;
|
||||||
bc->privdata = NULL;
|
bc->privdata = NULL;
|
||||||
bc->reply_client = createClient(-1);
|
bc->reply_client = createClient(-1);
|
||||||
@ -3525,6 +3540,26 @@ int RM_AbortBlock(RedisModuleBlockedClient *bc) {
|
|||||||
return RM_UnblockClient(bc,NULL);
|
return RM_UnblockClient(bc,NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set a callback that will be called if a blocked client disconnects
|
||||||
|
* before the module has a chance to call RedisModule_UnblockClient()
|
||||||
|
*
|
||||||
|
* Usually what you want to do there, is to cleanup your module state
|
||||||
|
* so that you can call RedisModule_UnblockClient() safely, otherwise
|
||||||
|
* the client will remain blocked forever if the timeout is large.
|
||||||
|
*
|
||||||
|
* Notes:
|
||||||
|
*
|
||||||
|
* 1. It is not safe to call Reply* family functions here, it is also
|
||||||
|
* useless since the client is gone.
|
||||||
|
*
|
||||||
|
* 2. This callback is not called if the client disconnects because of
|
||||||
|
* a timeout. In such a case, the client is unblocked automatically
|
||||||
|
* and the timeout callback is called.
|
||||||
|
*/
|
||||||
|
void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
|
||||||
|
bc->disconnect_callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function will check the moduleUnblockedClients queue in order to
|
/* This function will check the moduleUnblockedClients queue in order to
|
||||||
* call the reply callback and really unblock the client.
|
* call the reply callback and really unblock the client.
|
||||||
*
|
*
|
||||||
@ -3592,6 +3627,10 @@ void moduleHandleBlockedClients(void) {
|
|||||||
freeClient(bc->reply_client);
|
freeClient(bc->reply_client);
|
||||||
|
|
||||||
if (c != NULL) {
|
if (c != NULL) {
|
||||||
|
/* Before unblocking the client, set the disconnect callback
|
||||||
|
* to NULL, because if we reached this point, the client was
|
||||||
|
* properly unblocked by the module. */
|
||||||
|
bc->disconnect_callback = NULL;
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
/* Put the client in the list of clients that need to write
|
/* Put the client in the list of clients that need to write
|
||||||
* if there are pending replies here. This is needed since
|
* if there are pending replies here. This is needed since
|
||||||
@ -3627,6 +3666,10 @@ void moduleBlockedClientTimedOut(client *c) {
|
|||||||
ctx.client = bc->client;
|
ctx.client = bc->client;
|
||||||
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
|
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
|
||||||
moduleFreeContext(&ctx);
|
moduleFreeContext(&ctx);
|
||||||
|
/* For timeout events, we do not want to call the disconnect callback,
|
||||||
|
* because the blocekd client will be automatically disconnected in
|
||||||
|
* this case, and the user can still hook using the timeout callback. */
|
||||||
|
bc->disconnect_callback = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return non-zero if a module command was called in order to fill the
|
/* Return non-zero if a module command was called in order to fill the
|
||||||
@ -4625,4 +4668,5 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(GetRandomBytes);
|
REGISTER_API(GetRandomBytes);
|
||||||
REGISTER_API(GetRandomHexChars);
|
REGISTER_API(GetRandomHexChars);
|
||||||
REGISTER_API(BlockedClientDisconnected);
|
REGISTER_API(BlockedClientDisconnected);
|
||||||
|
REGISTER_API(SetDisconnectCallback);
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,23 @@ void *HelloBlock_ThreadMain(void *arg) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* An example blocked client disconnection callback.
|
||||||
|
*
|
||||||
|
* Note that in the case of the HELLO.BLOCK command, the blocked client is now
|
||||||
|
* owned by the thread calling sleep(). In this speciifc case, there is not
|
||||||
|
* much we can do, however normally we could instead implement a way to
|
||||||
|
* signal the thread that the client disconnected, and sleep the specified
|
||||||
|
* amount of seconds with a while loop calling sleep(1), so that once we
|
||||||
|
* detect the client disconnection, we can terminate the thread ASAP. */
|
||||||
|
void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
|
||||||
|
RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
|
||||||
|
(void*)bc);
|
||||||
|
|
||||||
|
/* Here you should cleanup your state / threads, and if possible
|
||||||
|
* call RedisModule_UnblockClient(), or notify the thread that will
|
||||||
|
* call the function ASAP. */
|
||||||
|
}
|
||||||
|
|
||||||
/* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
|
/* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
|
||||||
* a random number. Timeout is the command timeout, so that you can test
|
* a random number. Timeout is the command timeout, so that you can test
|
||||||
* what happens when the delay is greater than the timeout. */
|
* what happens when the delay is greater than the timeout. */
|
||||||
@ -93,6 +110,11 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
|
|||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||||
|
|
||||||
|
/* Here we set a disconnection handler, however since this module will
|
||||||
|
* block in sleep() in a thread, there is not much we can do in the
|
||||||
|
* callback, so this is just to show you the API. */
|
||||||
|
RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
|
||||||
|
|
||||||
/* Now that we setup a blocking client, we need to pass the control
|
/* Now that we setup a blocking client, we need to pass the control
|
||||||
* to the thread. However we need to pass arguments to the thread:
|
* to the thread. However we need to pass arguments to the thread:
|
||||||
* the delay and a reference to the blocked client handle. */
|
* the delay and a reference to the blocked client handle. */
|
||||||
|
@ -142,8 +142,9 @@ typedef struct RedisModuleDigest RedisModuleDigest;
|
|||||||
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
|
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
|
||||||
typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
|
typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
|
||||||
|
|
||||||
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
|
||||||
|
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||||
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
||||||
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
||||||
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
||||||
@ -275,7 +276,7 @@ void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
|
|||||||
|
|
||||||
/* Experimental APIs */
|
/* Experimental APIs */
|
||||||
#ifdef REDISMODULE_EXPERIMENTAL_API
|
#ifdef REDISMODULE_EXPERIMENTAL_API
|
||||||
#define REDISMODULE_EXPERIMENTAL_API_VERSION 2
|
#define REDISMODULE_EXPERIMENTAL_API_VERSION 3
|
||||||
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
|
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
|
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
|
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
|
||||||
@ -300,6 +301,7 @@ const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void);
|
|||||||
size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
|
size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
|
||||||
void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
|
void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
|
||||||
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
|
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
|
||||||
|
void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* This is included inline inside each Redis module. */
|
/* This is included inline inside each Redis module. */
|
||||||
@ -421,6 +423,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
|
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
|
||||||
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
||||||
REDISMODULE_GET_API(AbortBlock);
|
REDISMODULE_GET_API(AbortBlock);
|
||||||
|
REDISMODULE_GET_API(SetDisconnectCallback);
|
||||||
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
|
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
|
||||||
REDISMODULE_GET_API(BlockedClientDisconnected);
|
REDISMODULE_GET_API(BlockedClientDisconnected);
|
||||||
REDISMODULE_GET_API(RegisterClusterMessageReceiver);
|
REDISMODULE_GET_API(RegisterClusterMessageReceiver);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user