diff --git a/src/module.c b/src/module.c index cbc6ff62..171887ff 100644 --- a/src/module.c +++ b/src/module.c @@ -95,10 +95,15 @@ typedef struct RedisModulePoolAllocBlock { * * Note that not all the context structure is always filled with actual values * but only the fields needed in a given context. */ + +struct RedisModuleBlockedClient; + struct RedisModuleCtx { void *getapifuncptr; /* NOTE: Must be the first field. */ struct RedisModule *module; /* Module reference. */ client *client; /* Client calling a command. */ + struct RedisModuleBlockedClient *blocked_client; /* Blocked client for + thread safe context. */ struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */ int amqueue_len; /* Number of slots in amqueue. */ int amqueue_used; /* Number of used slots in amqueue. */ @@ -115,12 +120,13 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3) #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4) +#define REDISMODULE_CTX_THREAD_SAFE (1<<5) /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -198,6 +204,8 @@ typedef struct RedisModuleBlockedClient { void *privdata; /* Module private data that may be used by the reply or timeout callback. It is set via the RedisModule_UnblockClient() API. */ + client *reply_client; /* Fake client used to accumulate replies + in thread safe contexts. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -3165,6 +3173,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc bc->timeout_callback = timeout_callback; bc->free_privdata = free_privdata; bc->privdata = NULL; + bc->reply_client = NULL; c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; blockClient(c,BLOCKED_MODULE); @@ -3300,6 +3309,56 @@ void moduleCooperativeMultiTaskingCycle(void) { pthread_mutex_lock(&moduleGIL); } +/* Return a context which can be used inside threads to make Redis context + * calls with certain modules APIs. If 'bc' is not NULL then the module will + * be bound to a blocked client, and it will be possible to use the + * `RedisModule_Reply*` family of functions to accumulate a reply for when the + * client will be unblocked. Otherwise the thread safe context will be + * detached by a specific client. + * + * To call non-reply APIs, the thread safe context must be prepared with: + * + * RedisModule_ThreadSafeCallStart(ctx); + * ... make your call here ... + * RedisModule_ThreadSafeCallStop(ctx); + * + * This is not needed when using `RedisModule_Reply*` functions, assuming + * that a blocked client was used when the context was created, otherwise + * no RedisModule_Reply* call should be made at all. + */ +RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { + RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); + RedisModuleCtx empty = REDISMODULE_CTX_INIT; + memcpy(ctx,&empty,sizeof(empty)); + if (bc) { + ctx->blocked_client = bc; + if (bc->reply_client == NULL) + bc->reply_client = createClient(-1); + } + ctx->flags |= REDISMODULE_CTX_THREAD_SAFE; + return ctx; +} + +/* Release a thread safe context. */ +void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { + moduleFreeContext(ctx); + zfree(ctx); +} + +/* Acquire the server lock before executing a thread safe API call. + * This is not needed for `RedisModule_Reply*` calls when there is + * a blocked client connected to the thread safe context. */ +void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { + DICT_NOTUSED(ctx); + pthread_mutex_lock(&moduleGIL); +} + +/* Release the server lock after a thread safe API call was executed. */ +void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { + DICT_NOTUSED(ctx); + pthread_mutex_unlock(&moduleGIL); +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */