mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 01:20:50 +00:00
Modules TSC: Basic TS context creeation and handling.
This commit is contained in:
parent
59b06b14c9
commit
9c500b89fb
61
src/module.c
61
src/module.c
@ -95,10 +95,15 @@ typedef struct RedisModulePoolAllocBlock {
|
|||||||
*
|
*
|
||||||
* Note that not all the context structure is always filled with actual values
|
* Note that not all the context structure is always filled with actual values
|
||||||
* but only the fields needed in a given context. */
|
* but only the fields needed in a given context. */
|
||||||
|
|
||||||
|
struct RedisModuleBlockedClient;
|
||||||
|
|
||||||
struct RedisModuleCtx {
|
struct RedisModuleCtx {
|
||||||
void *getapifuncptr; /* NOTE: Must be the first field. */
|
void *getapifuncptr; /* NOTE: Must be the first field. */
|
||||||
struct RedisModule *module; /* Module reference. */
|
struct RedisModule *module; /* Module reference. */
|
||||||
client *client; /* Client calling a command. */
|
client *client; /* Client calling a command. */
|
||||||
|
struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
|
||||||
|
thread safe context. */
|
||||||
struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
|
struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
|
||||||
int amqueue_len; /* Number of slots in amqueue. */
|
int amqueue_len; /* Number of slots in amqueue. */
|
||||||
int amqueue_used; /* Number of used slots in amqueue. */
|
int amqueue_used; /* Number of used slots in amqueue. */
|
||||||
@ -115,12 +120,13 @@ struct RedisModuleCtx {
|
|||||||
};
|
};
|
||||||
typedef struct RedisModuleCtx RedisModuleCtx;
|
typedef struct RedisModuleCtx RedisModuleCtx;
|
||||||
|
|
||||||
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
|
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
|
||||||
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
||||||
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
||||||
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
|
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
|
||||||
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
|
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
|
||||||
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
|
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
|
||||||
|
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
|
||||||
|
|
||||||
/* This represents a Redis key opened with RM_OpenKey(). */
|
/* This represents a Redis key opened with RM_OpenKey(). */
|
||||||
struct RedisModuleKey {
|
struct RedisModuleKey {
|
||||||
@ -198,6 +204,8 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
void *privdata; /* Module private data that may be used by the reply
|
void *privdata; /* Module private data that may be used by the reply
|
||||||
or timeout callback. It is set via the
|
or timeout callback. It is set via the
|
||||||
RedisModule_UnblockClient() API. */
|
RedisModule_UnblockClient() API. */
|
||||||
|
client *reply_client; /* Fake client used to accumulate replies
|
||||||
|
in thread safe contexts. */
|
||||||
} RedisModuleBlockedClient;
|
} RedisModuleBlockedClient;
|
||||||
|
|
||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
@ -3165,6 +3173,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
|||||||
bc->timeout_callback = timeout_callback;
|
bc->timeout_callback = timeout_callback;
|
||||||
bc->free_privdata = free_privdata;
|
bc->free_privdata = free_privdata;
|
||||||
bc->privdata = NULL;
|
bc->privdata = NULL;
|
||||||
|
bc->reply_client = NULL;
|
||||||
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||||
|
|
||||||
blockClient(c,BLOCKED_MODULE);
|
blockClient(c,BLOCKED_MODULE);
|
||||||
@ -3300,6 +3309,56 @@ void moduleCooperativeMultiTaskingCycle(void) {
|
|||||||
pthread_mutex_lock(&moduleGIL);
|
pthread_mutex_lock(&moduleGIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return a context which can be used inside threads to make Redis context
|
||||||
|
* calls with certain modules APIs. If 'bc' is not NULL then the module will
|
||||||
|
* be bound to a blocked client, and it will be possible to use the
|
||||||
|
* `RedisModule_Reply*` family of functions to accumulate a reply for when the
|
||||||
|
* client will be unblocked. Otherwise the thread safe context will be
|
||||||
|
* detached by a specific client.
|
||||||
|
*
|
||||||
|
* To call non-reply APIs, the thread safe context must be prepared with:
|
||||||
|
*
|
||||||
|
* RedisModule_ThreadSafeCallStart(ctx);
|
||||||
|
* ... make your call here ...
|
||||||
|
* RedisModule_ThreadSafeCallStop(ctx);
|
||||||
|
*
|
||||||
|
* This is not needed when using `RedisModule_Reply*` functions, assuming
|
||||||
|
* that a blocked client was used when the context was created, otherwise
|
||||||
|
* no RedisModule_Reply* call should be made at all.
|
||||||
|
*/
|
||||||
|
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
||||||
|
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
|
||||||
|
RedisModuleCtx empty = REDISMODULE_CTX_INIT;
|
||||||
|
memcpy(ctx,&empty,sizeof(empty));
|
||||||
|
if (bc) {
|
||||||
|
ctx->blocked_client = bc;
|
||||||
|
if (bc->reply_client == NULL)
|
||||||
|
bc->reply_client = createClient(-1);
|
||||||
|
}
|
||||||
|
ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release a thread safe context. */
|
||||||
|
void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
||||||
|
moduleFreeContext(ctx);
|
||||||
|
zfree(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Acquire the server lock before executing a thread safe API call.
|
||||||
|
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||||
|
* a blocked client connected to the thread safe context. */
|
||||||
|
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||||
|
DICT_NOTUSED(ctx);
|
||||||
|
pthread_mutex_lock(&moduleGIL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release the server lock after a thread safe API call was executed. */
|
||||||
|
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||||
|
DICT_NOTUSED(ctx);
|
||||||
|
pthread_mutex_unlock(&moduleGIL);
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Modules API internals
|
* Modules API internals
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user