mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
Modules TSC: Release the GIL for all the time we are blocked.
Instead of giving the module background operations just a small time to run in the beforeSleep() function, we can have the lock released for all the time we are blocked in the multiplexing syscall.
This commit is contained in:
parent
ba4a5a3255
commit
3fcf959e60
12
src/ae.c
12
src/ae.c
@ -75,6 +75,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
|
|||||||
eventLoop->stop = 0;
|
eventLoop->stop = 0;
|
||||||
eventLoop->maxfd = -1;
|
eventLoop->maxfd = -1;
|
||||||
eventLoop->beforesleep = NULL;
|
eventLoop->beforesleep = NULL;
|
||||||
|
eventLoop->aftersleep = NULL;
|
||||||
if (aeApiCreate(eventLoop) == -1) goto err;
|
if (aeApiCreate(eventLoop) == -1) goto err;
|
||||||
/* Events with mask == AE_NONE are not set. So let's initialize the
|
/* Events with mask == AE_NONE are not set. So let's initialize the
|
||||||
* vector with it. */
|
* vector with it. */
|
||||||
@ -397,7 +398,14 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Call the multiplexing API, will return only on timeout or when
|
||||||
|
* some event fires. */
|
||||||
numevents = aeApiPoll(eventLoop, tvp);
|
numevents = aeApiPoll(eventLoop, tvp);
|
||||||
|
|
||||||
|
/* After sleep callback. */
|
||||||
|
if (eventLoop->aftersleep != NULL)
|
||||||
|
eventLoop->aftersleep(eventLoop);
|
||||||
|
|
||||||
for (j = 0; j < numevents; j++) {
|
for (j = 0; j < numevents; j++) {
|
||||||
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
|
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
|
||||||
int mask = eventLoop->fired[j].mask;
|
int mask = eventLoop->fired[j].mask;
|
||||||
@ -463,3 +471,7 @@ char *aeGetApiName(void) {
|
|||||||
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
|
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
|
||||||
eventLoop->beforesleep = beforesleep;
|
eventLoop->beforesleep = beforesleep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
|
||||||
|
eventLoop->aftersleep = aftersleep;
|
||||||
|
}
|
||||||
|
2
src/ae.h
2
src/ae.h
@ -98,6 +98,7 @@ typedef struct aeEventLoop {
|
|||||||
int stop;
|
int stop;
|
||||||
void *apidata; /* This is used for polling API specific data */
|
void *apidata; /* This is used for polling API specific data */
|
||||||
aeBeforeSleepProc *beforesleep;
|
aeBeforeSleepProc *beforesleep;
|
||||||
|
aeBeforeSleepProc *aftersleep;
|
||||||
} aeEventLoop;
|
} aeEventLoop;
|
||||||
|
|
||||||
/* Prototypes */
|
/* Prototypes */
|
||||||
@ -117,6 +118,7 @@ int aeWait(int fd, int mask, long long milliseconds);
|
|||||||
void aeMain(aeEventLoop *eventLoop);
|
void aeMain(aeEventLoop *eventLoop);
|
||||||
char *aeGetApiName(void);
|
char *aeGetApiName(void);
|
||||||
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
|
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
|
||||||
|
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
|
||||||
int aeGetSetSize(aeEventLoop *eventLoop);
|
int aeGetSetSize(aeEventLoop *eventLoop);
|
||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
||||||
|
|
||||||
|
45
src/module.c
45
src/module.c
@ -433,6 +433,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
|
|||||||
"calls.",
|
"calls.",
|
||||||
ctx->module->name);
|
ctx->module->name);
|
||||||
}
|
}
|
||||||
|
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function for when a command callback is called, in order to handle
|
/* Helper function for when a command callback is called, in order to handle
|
||||||
@ -967,8 +968,10 @@ int RM_WrongArity(RedisModuleCtx *ctx) {
|
|||||||
* context of a thread safe context that was not initialized with a blocked
|
* context of a thread safe context that was not initialized with a blocked
|
||||||
* client object. */
|
* client object. */
|
||||||
client *moduleGetReplyClient(RedisModuleCtx *ctx) {
|
client *moduleGetReplyClient(RedisModuleCtx *ctx) {
|
||||||
if (ctx->client) return ctx->client;
|
if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE) && ctx->client)
|
||||||
if (ctx->blocked_client) return ctx->blocked_client->reply_client;
|
return ctx->client;
|
||||||
|
if (ctx->blocked_client)
|
||||||
|
return ctx->blocked_client->reply_client;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3351,20 +3354,6 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
|
|||||||
* Thread Safe Contexts
|
* Thread Safe Contexts
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
/* Operations executed in thread safe contexts use a global lock in order to
|
|
||||||
* be ran at a safe time. This function unlocks and re-acquire the locks:
|
|
||||||
* hopefully with *any* sane implementation of pthreads, this will allow the
|
|
||||||
* modules to make progresses.
|
|
||||||
*
|
|
||||||
* This function is called in beforeSleep(). */
|
|
||||||
void moduleCooperativeMultiTaskingCycle(void) {
|
|
||||||
if (dictSize(modules) == 0) return; /* No modules, no async ops. */
|
|
||||||
pthread_mutex_unlock(&moduleGIL);
|
|
||||||
/* Here hopefully thread modules waiting to be executed at a safe time
|
|
||||||
* should be able to acquire the lock. */
|
|
||||||
pthread_mutex_lock(&moduleGIL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Return a context which can be used inside threads to make Redis context
|
/* Return a context which can be used inside threads to make Redis context
|
||||||
* calls with certain modules APIs. If 'bc' is not NULL then the module will
|
* calls with certain modules APIs. If 'bc' is not NULL then the module will
|
||||||
* be bound to a blocked client, and it will be possible to use the
|
* be bound to a blocked client, and it will be possible to use the
|
||||||
@ -3381,7 +3370,9 @@ void moduleCooperativeMultiTaskingCycle(void) {
|
|||||||
* This is not needed when using `RedisModule_Reply*` functions, assuming
|
* This is not needed when using `RedisModule_Reply*` functions, assuming
|
||||||
* that a blocked client was used when the context was created, otherwise
|
* that a blocked client was used when the context was created, otherwise
|
||||||
* no RedisModule_Reply* call should be made at all.
|
* no RedisModule_Reply* call should be made at all.
|
||||||
*/
|
*
|
||||||
|
* TODO: thread safe contexts do not inherit the blocked client
|
||||||
|
* selected database. */
|
||||||
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
||||||
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
|
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
|
||||||
RedisModuleCtx empty = REDISMODULE_CTX_INIT;
|
RedisModuleCtx empty = REDISMODULE_CTX_INIT;
|
||||||
@ -3391,6 +3382,11 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
|||||||
ctx->module = bc->module;
|
ctx->module = bc->module;
|
||||||
}
|
}
|
||||||
ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
|
ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
|
||||||
|
/* Even when the context is associated with a blocked client, we can't
|
||||||
|
* access it safely from another thread, so we create a fake client here
|
||||||
|
* in order to keep things like the currently selected database and similar
|
||||||
|
* things. */
|
||||||
|
ctx->client = createClient(-1);
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3405,12 +3401,20 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
|||||||
* a blocked client connected to the thread safe context. */
|
* a blocked client connected to the thread safe context. */
|
||||||
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||||
DICT_NOTUSED(ctx);
|
DICT_NOTUSED(ctx);
|
||||||
pthread_mutex_lock(&moduleGIL);
|
moduleAcquireGIL();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Release the server lock after a thread safe API call was executed. */
|
/* Release the server lock after a thread safe API call was executed. */
|
||||||
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||||
DICT_NOTUSED(ctx);
|
DICT_NOTUSED(ctx);
|
||||||
|
moduleReleaseGIL();
|
||||||
|
}
|
||||||
|
|
||||||
|
void moduleAcquireGIL(void) {
|
||||||
|
pthread_mutex_lock(&moduleGIL);
|
||||||
|
}
|
||||||
|
|
||||||
|
void moduleReleaseGIL(void) {
|
||||||
pthread_mutex_unlock(&moduleGIL);
|
pthread_mutex_unlock(&moduleGIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3655,6 +3659,11 @@ void moduleCommand(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return the number of registered modules. */
|
||||||
|
size_t moduleCount(void) {
|
||||||
|
return dictSize(modules);
|
||||||
|
}
|
||||||
|
|
||||||
/* Register all the APIs we export. Keep this function at the end of the
|
/* Register all the APIs we export. Keep this function at the end of the
|
||||||
* file so that's easy to seek it to add new entries. */
|
* file so that's easy to seek it to add new entries. */
|
||||||
void moduleRegisterCoreAPI(void) {
|
void moduleRegisterCoreAPI(void) {
|
||||||
|
@ -105,6 +105,45 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* The thread entry point that actually executes the blocking part
|
||||||
|
* of the command HELLO.KEYS. */
|
||||||
|
void *HelloKeys_ThreadMain(void *arg) {
|
||||||
|
RedisModuleBlockedClient *bc = arg;
|
||||||
|
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
|
||||||
|
|
||||||
|
RedisModule_ThreadSafeContextLock(ctx);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx,1234);
|
||||||
|
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||||
|
|
||||||
|
RedisModule_UnblockClient(bc,NULL);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* HELLO.KEYS -- Return all the keys in the current database without blocking
|
||||||
|
* the server. The keys do not represent a point-in-time state so only the keys
|
||||||
|
* that were in the database from the start to the end are guaranteed to be
|
||||||
|
* there. */
|
||||||
|
int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
if (argc != 1) return RedisModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
pthread_t tid;
|
||||||
|
|
||||||
|
/* Note that when blocking the client we do not set any callback: no
|
||||||
|
* timeout is possible since we passed '0', nor we need a reply callback
|
||||||
|
* because we'll use the thread safe context to accumulate a reply. */
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
|
||||||
|
|
||||||
|
/* Now that we setup a blocking client, we need to pass the control
|
||||||
|
* to the thread. However we need to pass arguments to the thread:
|
||||||
|
* the reference to the blocked client handle. */
|
||||||
|
if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
|
||||||
|
RedisModule_AbortBlock(bc);
|
||||||
|
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
||||||
|
}
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function must be present on each Redis module. It is used in order to
|
/* This function must be present on each Redis module. It is used in order to
|
||||||
* register the commands into the Redis server. */
|
* register the commands into the Redis server. */
|
||||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
@ -117,6 +156,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
if (RedisModule_CreateCommand(ctx,"hello.block",
|
if (RedisModule_CreateCommand(ctx,"hello.block",
|
||||||
HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
|
if (RedisModule_CreateCommand(ctx,"hello.keys",
|
||||||
|
HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
17
src/server.c
17
src/server.c
@ -1172,9 +1172,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
|
|
||||||
/* Give some run time to modules threads using thread safe contexts. */
|
|
||||||
moduleCooperativeMultiTaskingCycle();
|
|
||||||
|
|
||||||
/* Call the Redis Cluster before sleep function. Note that this function
|
/* Call the Redis Cluster before sleep function. Note that this function
|
||||||
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
||||||
* so it's a good idea to call it before serving the unblocked clients
|
* so it's a good idea to call it before serving the unblocked clients
|
||||||
@ -1219,6 +1216,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
/* Handle writes with pending output buffers. */
|
/* Handle writes with pending output buffers. */
|
||||||
handleClientsWithPendingWrites();
|
handleClientsWithPendingWrites();
|
||||||
|
|
||||||
|
/* Before we are going to sleep, let the threads access the dataset by
|
||||||
|
* releasing the GIL. Redis main thread will not touch anything at this
|
||||||
|
* time. */
|
||||||
|
if (moduleCount()) moduleReleaseGIL();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function is called immadiately after the event loop multiplexing
|
||||||
|
* API returned, and the control is going to soon return to Redis by invoking
|
||||||
|
* the different events callbacks. */
|
||||||
|
void afterSleep(struct aeEventLoop *eventLoop) {
|
||||||
|
UNUSED(eventLoop);
|
||||||
|
if (moduleCount()) moduleAcquireGIL();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================== Server initialization ======================== */
|
/* =========================== Server initialization ======================== */
|
||||||
@ -3808,6 +3818,7 @@ int main(int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
aeSetBeforeSleepProc(server.el,beforeSleep);
|
aeSetBeforeSleepProc(server.el,beforeSleep);
|
||||||
|
aeSetAfterSleepProc(server.el,afterSleep);
|
||||||
aeMain(server.el);
|
aeMain(server.el);
|
||||||
aeDeleteEventLoop(server.el);
|
aeDeleteEventLoop(server.el);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1294,7 +1294,9 @@ void unblockClientFromModule(client *c);
|
|||||||
void moduleHandleBlockedClients(void);
|
void moduleHandleBlockedClients(void);
|
||||||
void moduleBlockedClientTimedOut(client *c);
|
void moduleBlockedClientTimedOut(client *c);
|
||||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
void moduleCooperativeMultiTaskingCycle(void);
|
size_t moduleCount(void);
|
||||||
|
void moduleAcquireGIL(void);
|
||||||
|
void moduleReleaseGIL(void);
|
||||||
|
|
||||||
/* Utils */
|
/* Utils */
|
||||||
long long ustime(void);
|
long long ustime(void);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user