From 8fadfe52a2d1abf3d4d12707004f1209703d446c Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Oct 2016 11:55:35 +0200 Subject: [PATCH] Module: API to block clients with threading support. Just a draft to align the main ideas, never executed code. Compiles. --- src/blocked.c | 7 +- src/module.c | 180 ++++++++++++++++++++++++++++++++++++++++++++++---- src/server.c | 4 ++ src/server.h | 9 +++ 4 files changed, 187 insertions(+), 13 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index d2287254..54b26b71 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -136,6 +136,8 @@ void unblockClient(client *c) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); + } else if (c->btype == BLOCKED_MODULE) { + unblockClientFromModule(c); } else { serverPanic("Unknown btype in unblockClient()."); } @@ -153,12 +155,15 @@ void unblockClient(client *c) { } /* This function gets called when a blocked client timed out in order to - * send it a reply of some kind. */ + * send it a reply of some kind. After this function is called, + * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); + } else if (c->btype == BLOCKED_MODULE) { + moduleBlockedClientTimedOut(c); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/module.c b/src/module.c index 742d9b97..9a939d2f 100644 --- a/src/module.c +++ b/src/module.c @@ -105,6 +105,7 @@ struct RedisModuleCtx { int flags; /* REDISMODULE_CTX_... flags. */ void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ + void *blocked_privdata; /* Privdata set when unblocking a clinet. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -114,10 +115,12 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, 0, NULL} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) +#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3) +#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4) /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -183,6 +186,23 @@ typedef struct RedisModuleCallReply { } val; } RedisModuleCallReply; +/* Structure representing a blocked client. We get a pointer to such + * an object when blocking from modules. */ +typedef struct RedisModuleBlockedClient { + client *client; /* Pointer to the blocked client. or NULL if the client + was destroyed during the life of this object. */ + RedisModule *module; /* Module blocking the client. */ + RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/ + RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */ + void (*free_privdata)(void *); /* privdata cleanup callback. */ + void *privdata; /* Module private data that may be used by the reply + or timeout callback. It is set via the + RedisModule_UnblockClient() API. */ +} RedisModuleBlockedClient; + +static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; +static list *moduleUnblockedClients; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -403,6 +423,26 @@ void moduleFreeContext(RedisModuleCtx *ctx) { } } +/* Helper function for when a command callback is called, in order to handle + * details needed to correctly replicate commands. */ +void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { + client *c = ctx->client; + + /* We don't want any automatic propagation here since in modules we handle + * replication / AOF propagation in explicit ways. */ + preventCommandPropagation(c); + + /* Handle the replication of the final EXEC, since whatever a command + * emits is always wrappered around MULTI/EXEC. */ + if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) { + robj *propargv[1]; + propargv[0] = createStringObject("EXEC",4); + alsoPropagate(server.execCommand,c->db->id,propargv,1, + PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(propargv[0]); + } +} + /* This Redis command binds the normal Redis command invocation with commands * exported by modules. */ void RedisModuleCommandDispatcher(client *c) { @@ -412,17 +452,7 @@ void RedisModuleCommandDispatcher(client *c) { ctx.module = cp->module; ctx.client = c; cp->func(&ctx,(void**)c->argv,c->argc); - preventCommandPropagation(c); - - /* Handle the replication of the final EXEC, since whatever a command - * emits is always wrappered around MULTI/EXEC. */ - if (ctx.flags & REDISMODULE_CTX_MULTI_EMITTED) { - robj *propargv[1]; - propargv[0] = createStringObject("EXEC",4); - alsoPropagate(server.execCommand,c->db->id,propargv,1, - PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(propargv[0]); - } + moduleHandlePropagationAfterCommandCallback(&ctx); moduleFreeContext(&ctx); } @@ -3034,6 +3064,130 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ... va_end(ap); } +/* -------------------------------------------------------------------------- + * Blocking clients from modules + * -------------------------------------------------------------------------- */ + +/* This is called from blocked.c in order to unblock a client: may be called + * for multiple reasons while the client is in the middle of being blocked + * because the client is terminated, but is also called for cleanup when a + * client is unblocked in a clean way after replaying. + * + * What we do here is just to set the client to NULL in the redis module + * blocked client handle. This way if the client is terminated while there + * is a pending threaded operation involving the blocked client, we'll know + * that the client no longer exists and no reply callback should be called. + * + * The structure RedisModuleBlockedClient will be always deallocated when + * running the list of clients blocked by a module that need to be unblocked. */ +void unblockClientFromModule(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + bc->client = NULL; +} + +int RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) { + client *c = ctx->client; + c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + + bc->client = c; + bc->module = ctx->module; + bc->reply_callback = reply_callback; + bc->timeout_callback = timeout_callback; + bc->free_privdata = free_privdata; + bc->privdata = NULL; + c->bpop.timeout = timeout_ms; + + blockClient(c,BLOCKED_MODULE); + return REDISMODULE_OK; +} + +/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger + * the reply callbacks to be called in order to reply to the client. + * The 'privdata' argument will be accessible by the reply callback, so + * the caller of this function can pass any value that is needed in order to + * actually reply to the client. + * + * A common usage for 'privdata' is a thread that computes something that + * needs to be passed to the client, included but not limited some slow + * to compute reply or some reply obtained via networking. + * + * Note: this function can be called from threads spawned by the module. */ +int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { + pthread_mutex_lock(&moduleUnblockedClientsMutex); + bc->privdata = privdata; + listAddNodeTail(moduleUnblockedClients,bc); + pthread_mutex_unlock(&moduleUnblockedClientsMutex); + return REDISMODULE_OK; +} + +/* This function will check the moduleUnblockedClients queue in order to + * call the reply callback and really unblock the client. + * + * Clients end into this list because of calls to RM_UnblockClient(), + * however it is possible that while the module was doing work for the + * blocked client, it was terminated by Redis (for timeout or other reasons). + * When this happens the RedisModuleBlockedClient structure in the queue + * will have the 'client' field set to NULL. */ +void moduleHandleBlockedClients(void) { + listNode *ln; + RedisModuleBlockedClient *bc; + + pthread_mutex_lock(&moduleUnblockedClientsMutex); + while (listLength(moduleUnblockedClients)) { + ln = listFirst(moduleUnblockedClients); + bc = ln->value; + client *c = bc->client; + listDelNode(server.unblocked_clients,ln); + + if (c != NULL) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + ctx.blocked_privdata = bc->privdata; + ctx.module = bc->module; + ctx.client = bc->client; + bc->reply_callback(&ctx,(void**)c->argv,c->argc); + moduleHandlePropagationAfterCommandCallback(&ctx); + moduleFreeContext(&ctx); + } + if (bc->privdata && bc->free_privdata) + bc->free_privdata(bc->privdata); + zfree(bc); + } + pthread_mutex_unlock(&moduleUnblockedClientsMutex); +} + +/* Called when our client timed out. After this function unblockClient() + * is called, and it will invalidate the blocked client. So this function + * does not need to do any cleanup. Eventually the module will call the + * API to unblock the client and the memory will be released. */ +void moduleBlockedClientTimedOut(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT; + ctx.module = bc->module; + ctx.client = bc->client; + bc->timeout_callback(&ctx,(void**)c->argv,c->argc); + moduleFreeContext(&ctx); +} + +/* Return non-zero if a module command was called in order to fill the + * reply for a blocked client. */ +int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) { + return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0; +} + +/* Return non-zero if a module command was called in order to fill the + * reply for a blocked client that timed out. */ +int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) { + return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0; +} + +/* Get the privata data set by RedisModule_UnblockClient() */ +void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { + return ctx->blocked_privdata; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -3070,6 +3224,8 @@ int moduleRegisterApi(const char *funcname, void *funcptr) { void moduleRegisterCoreAPI(void); void moduleInitModulesSystem(void) { + moduleUnblockedClients = listCreate(); + server.loadmodule_queue = listCreate(); modules = dictCreate(&modulesDictType,NULL); moduleRegisterCoreAPI(); diff --git a/src/server.c b/src/server.c index 36b04abf..a0549185 100644 --- a/src/server.c +++ b/src/server.c @@ -1195,6 +1195,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + moduleHandleBlockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); diff --git a/src/server.h b/src/server.h index b9c46b81..69ee52e6 100644 --- a/src/server.h +++ b/src/server.h @@ -245,6 +245,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ #define BLOCKED_LIST 1 /* BLPOP & co. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ +#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -619,6 +620,11 @@ typedef struct blockingState { /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */ + + /* BLOCKED_MODULE */ + void *module_blocked_handle; /* RedisModuleBlockedClient structure. + which is opaque for the Redis core, only + handled in module.c. */ } blockingState; /* The following structure represents a node in the server.ready_keys list, @@ -1226,6 +1232,9 @@ int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, moduleType *moduleTypeLookupModuleByID(uint64_t id); void moduleTypeNameByID(char *name, uint64_t moduleid); void moduleFreeContext(struct RedisModuleCtx *ctx); +void unblockClientFromModule(client *c); +void moduleHandleBlockedClients(void); +void moduleBlockedClientTimedOut(client *c); /* Utils */ long long ustime(void);