diff --git a/src/module.c b/src/module.c index a6b4ae4e..9effe21c 100644 --- a/src/module.c +++ b/src/module.c @@ -158,7 +158,9 @@ typedef struct RedisModuleKey RedisModuleKey; /* Function pointer type of a function representing a command inside * a Redis module. */ +struct RedisModuleBlockedClient; typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc); +typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc); /* This struct holds the information about a command registered by a module.*/ struct RedisModuleCommandProxy { @@ -201,6 +203,7 @@ typedef struct RedisModuleBlockedClient { RedisModule *module; /* Module blocking the client. */ RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/ RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */ + RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/ void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/ void *privdata; /* Module private data that may be used by the reply or timeout callback. It is set via the @@ -3438,6 +3441,17 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in * running the list of clients blocked by a module that need to be unblocked. */ void unblockClientFromModule(client *c) { RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + + /* Call the disconnection callback if any. */ + if (bc->disconnect_callback) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.blocked_privdata = bc->privdata; + ctx.module = bc->module; + ctx.client = bc->client; + bc->disconnect_callback(&ctx,bc); + moduleFreeContext(&ctx); + } + bc->client = NULL; /* Reset the client for a new query since, for blocking commands implemented * into modules, we do not it immediately after the command returns (and @@ -3478,6 +3492,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc bc->module = ctx->module; bc->reply_callback = reply_callback; bc->timeout_callback = timeout_callback; + bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ bc->free_privdata = free_privdata; bc->privdata = NULL; bc->reply_client = createClient(-1); @@ -3525,6 +3540,26 @@ int RM_AbortBlock(RedisModuleBlockedClient *bc) { return RM_UnblockClient(bc,NULL); } +/* Set a callback that will be called if a blocked client disconnects + * before the module has a chance to call RedisModule_UnblockClient() + * + * Usually what you want to do there, is to cleanup your module state + * so that you can call RedisModule_UnblockClient() safely, otherwise + * the client will remain blocked forever if the timeout is large. + * + * Notes: + * + * 1. It is not safe to call Reply* family functions here, it is also + * useless since the client is gone. + * + * 2. This callback is not called if the client disconnects because of + * a timeout. In such a case, the client is unblocked automatically + * and the timeout callback is called. + */ +void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) { + bc->disconnect_callback = callback; +} + /* This function will check the moduleUnblockedClients queue in order to * call the reply callback and really unblock the client. * @@ -3592,6 +3627,10 @@ void moduleHandleBlockedClients(void) { freeClient(bc->reply_client); if (c != NULL) { + /* Before unblocking the client, set the disconnect callback + * to NULL, because if we reached this point, the client was + * properly unblocked by the module. */ + bc->disconnect_callback = NULL; unblockClient(c); /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since @@ -3627,6 +3666,10 @@ void moduleBlockedClientTimedOut(client *c) { ctx.client = bc->client; bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); + /* For timeout events, we do not want to call the disconnect callback, + * because the blocekd client will be automatically disconnected in + * this case, and the user can still hook using the timeout callback. */ + bc->disconnect_callback = NULL; } /* Return non-zero if a module command was called in order to fill the @@ -4625,4 +4668,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetRandomBytes); REGISTER_API(GetRandomHexChars); REGISTER_API(BlockedClientDisconnected); + REGISTER_API(SetDisconnectCallback); } diff --git a/src/modules/helloblock.c b/src/modules/helloblock.c index cabaeff6..6bba17d3 100644 --- a/src/modules/helloblock.c +++ b/src/modules/helloblock.c @@ -74,6 +74,23 @@ void *HelloBlock_ThreadMain(void *arg) { return NULL; } +/* An example blocked client disconnection callback. + * + * Note that in the case of the HELLO.BLOCK command, the blocked client is now + * owned by the thread calling sleep(). In this speciifc case, there is not + * much we can do, however normally we could instead implement a way to + * signal the thread that the client disconnected, and sleep the specified + * amount of seconds with a while loop calling sleep(1), so that once we + * detect the client disconnection, we can terminate the thread ASAP. */ +void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) { + RedisModule_Log(ctx,"warning","Blocked client %p disconnected!", + (void*)bc); + + /* Here you should cleanup your state / threads, and if possible + * call RedisModule_UnblockClient(), or notify the thread that will + * call the function ASAP. */ +} + /* HELLO.BLOCK -- Block for seconds, then reply with * a random number. Timeout is the command timeout, so that you can test * what happens when the delay is greater than the timeout. */ @@ -93,6 +110,11 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + /* Here we set a disconnection handler, however since this module will + * block in sleep() in a thread, there is not much we can do in the + * callback, so this is just to show you the API. */ + RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected); + /* Now that we setup a blocking client, we need to pass the control * to the thread. However we need to pass arguments to the thread: * the delay and a reference to the blocked client handle. */ diff --git a/src/redismodule.h b/src/redismodule.h index d500d28b..9c52012d 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -142,8 +142,9 @@ typedef struct RedisModuleDigest RedisModuleDigest; typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; -typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); -typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); +typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); +typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); @@ -275,7 +276,7 @@ void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API -#define REDISMODULE_EXPERIMENTAL_API_VERSION 2 +#define REDISMODULE_EXPERIMENTAL_API_VERSION 3 RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms); int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata); int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx); @@ -300,6 +301,7 @@ const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void); size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void); void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len); void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len); +void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback); #endif /* This is included inline inside each Redis module. */ @@ -421,6 +423,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(IsBlockedTimeoutRequest); REDISMODULE_GET_API(GetBlockedClientPrivateData); REDISMODULE_GET_API(AbortBlock); + REDISMODULE_GET_API(SetDisconnectCallback); REDISMODULE_GET_API(SubscribeToKeyspaceEvents); REDISMODULE_GET_API(BlockedClientDisconnected); REDISMODULE_GET_API(RegisterClusterMessageReceiver);