diff --git a/src/ae.c b/src/ae.c index e66808a8..ecbaa94f 100644 --- a/src/ae.c +++ b/src/ae.c @@ -75,6 +75,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; + eventLoop->aftersleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ @@ -397,7 +398,14 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); + + /* After sleep callback. */ + if (eventLoop->aftersleep != NULL) + eventLoop->aftersleep(eventLoop); + for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; @@ -463,3 +471,7 @@ char *aeGetApiName(void) { void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { eventLoop->beforesleep = beforesleep; } + +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) { + eventLoop->aftersleep = aftersleep; +} diff --git a/src/ae.h b/src/ae.h index 827c4c9e..e3617759 100644 --- a/src/ae.h +++ b/src/ae.h @@ -98,6 +98,7 @@ typedef struct aeEventLoop { int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; + aeBeforeSleepProc *aftersleep; } aeEventLoop; /* Prototypes */ @@ -117,6 +118,7 @@ int aeWait(int fd, int mask, long long milliseconds); void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); diff --git a/src/module.c b/src/module.c index e9b95f97..9b78a4a5 100644 --- a/src/module.c +++ b/src/module.c @@ -433,6 +433,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) { "calls.", ctx->module->name); } + if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client); } /* Helper function for when a command callback is called, in order to handle @@ -967,8 +968,10 @@ int RM_WrongArity(RedisModuleCtx *ctx) { * context of a thread safe context that was not initialized with a blocked * client object. */ client *moduleGetReplyClient(RedisModuleCtx *ctx) { - if (ctx->client) return ctx->client; - if (ctx->blocked_client) return ctx->blocked_client->reply_client; + if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE) && ctx->client) + return ctx->client; + if (ctx->blocked_client) + return ctx->blocked_client->reply_client; return NULL; } @@ -3351,20 +3354,6 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { * Thread Safe Contexts * -------------------------------------------------------------------------- */ -/* Operations executed in thread safe contexts use a global lock in order to - * be ran at a safe time. This function unlocks and re-acquire the locks: - * hopefully with *any* sane implementation of pthreads, this will allow the - * modules to make progresses. - * - * This function is called in beforeSleep(). */ -void moduleCooperativeMultiTaskingCycle(void) { - if (dictSize(modules) == 0) return; /* No modules, no async ops. */ - pthread_mutex_unlock(&moduleGIL); - /* Here hopefully thread modules waiting to be executed at a safe time - * should be able to acquire the lock. */ - pthread_mutex_lock(&moduleGIL); -} - /* Return a context which can be used inside threads to make Redis context * calls with certain modules APIs. If 'bc' is not NULL then the module will * be bound to a blocked client, and it will be possible to use the @@ -3381,7 +3370,9 @@ void moduleCooperativeMultiTaskingCycle(void) { * This is not needed when using `RedisModule_Reply*` functions, assuming * that a blocked client was used when the context was created, otherwise * no RedisModule_Reply* call should be made at all. - */ + * + * TODO: thread safe contexts do not inherit the blocked client + * selected database. */ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); RedisModuleCtx empty = REDISMODULE_CTX_INIT; @@ -3391,6 +3382,11 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { ctx->module = bc->module; } ctx->flags |= REDISMODULE_CTX_THREAD_SAFE; + /* Even when the context is associated with a blocked client, we can't + * access it safely from another thread, so we create a fake client here + * in order to keep things like the currently selected database and similar + * things. */ + ctx->client = createClient(-1); return ctx; } @@ -3405,12 +3401,20 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { * a blocked client connected to the thread safe context. */ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { DICT_NOTUSED(ctx); - pthread_mutex_lock(&moduleGIL); + moduleAcquireGIL(); } /* Release the server lock after a thread safe API call was executed. */ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { DICT_NOTUSED(ctx); + moduleReleaseGIL(); +} + +void moduleAcquireGIL(void) { + pthread_mutex_lock(&moduleGIL); +} + +void moduleReleaseGIL(void) { pthread_mutex_unlock(&moduleGIL); } @@ -3655,6 +3659,11 @@ void moduleCommand(client *c) { } } +/* Return the number of registered modules. */ +size_t moduleCount(void) { + return dictSize(modules); +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { diff --git a/src/modules/helloblock.c b/src/modules/helloblock.c index 71ec9b12..e760e33f 100644 --- a/src/modules/helloblock.c +++ b/src/modules/helloblock.c @@ -105,6 +105,45 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a return REDISMODULE_OK; } +/* The thread entry point that actually executes the blocking part + * of the command HELLO.KEYS. */ +void *HelloKeys_ThreadMain(void *arg) { + RedisModuleBlockedClient *bc = arg; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + + RedisModule_ThreadSafeContextLock(ctx); + RedisModule_ReplyWithLongLong(ctx,1234); + RedisModule_ThreadSafeContextUnlock(ctx); + + RedisModule_UnblockClient(bc,NULL); + return NULL; +} + +/* HELLO.KEYS -- Return all the keys in the current database without blocking + * the server. The keys do not represent a point-in-time state so only the keys + * that were in the database from the start to the end are guaranteed to be + * there. */ +int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + if (argc != 1) return RedisModule_WrongArity(ctx); + + pthread_t tid; + + /* Note that when blocking the client we do not set any callback: no + * timeout is possible since we passed '0', nor we need a reply callback + * because we'll use the thread safe context to accumulate a reply. */ + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the reference to the blocked client handle. */ + if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) { + RedisModule_AbortBlock(bc); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + return REDISMODULE_OK; +} + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -117,6 +156,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"hello.block", HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hello.keys", + HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/src/server.c b/src/server.c index e9013bf6..6be12cff 100644 --- a/src/server.c +++ b/src/server.c @@ -1172,9 +1172,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - /* Give some run time to modules threads using thread safe contexts. */ - moduleCooperativeMultiTaskingCycle(); - /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients @@ -1219,6 +1216,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); + + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); +} + +/* This function is called immadiately after the event loop multiplexing + * API returned, and the control is going to soon return to Redis by invoking + * the different events callbacks. */ +void afterSleep(struct aeEventLoop *eventLoop) { + UNUSED(eventLoop); + if (moduleCount()) moduleAcquireGIL(); } /* =========================== Server initialization ======================== */ @@ -3808,6 +3818,7 @@ int main(int argc, char **argv) { } aeSetBeforeSleepProc(server.el,beforeSleep); + aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; diff --git a/src/server.h b/src/server.h index 95637029..2bc49299 100644 --- a/src/server.h +++ b/src/server.h @@ -1294,7 +1294,9 @@ void unblockClientFromModule(client *c); void moduleHandleBlockedClients(void); void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); -void moduleCooperativeMultiTaskingCycle(void); +size_t moduleCount(void); +void moduleAcquireGIL(void); +void moduleReleaseGIL(void); /* Utils */ long long ustime(void);