diff --git a/src/module.c b/src/module.c index 545f1a74..96e65f0b 100644 --- a/src/module.c +++ b/src/module.c @@ -216,6 +216,28 @@ static list *moduleUnblockedClients; * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; + +/* Function pointer type for keyspace event notification subscriptions from modules. */ +typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); + +/* Keyspace notification subscriber information. See RM_SubscribeToKeyspaceEvents */ +typedef struct RedisModuleKeyspaceSubscriber { + /* The module subscribed to the event */ + RedisModule *module; + /* Notification callback in the module*/ + RedisModuleNotificationFunc notify_callback; + /* A bit mask of the events the module is interested in */ + int event_mask; + /* Active flag set on entry, to avoid reentrant subscribers calling themselves */ + int active; +} RedisModuleKeyspaceSubscriber; + +/* The module keyspace notification subscribers list */ +static list *moduleKeyspaceSubscribers; + +/* Static client recycled for all notification clients, to avoid allocating per round. */ +static client *moduleKeyspaceSubscribersClient; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -3669,6 +3691,126 @@ void moduleReleaseGIL(void) { pthread_mutex_unlock(&moduleGIL); } + +/* -------------------------------------------------------------------------- + * Module Keyspace Notifications API + * -------------------------------------------------------------------------- */ + +/* Subscribe to keyspace notifications. This is a low-level version of the + * keyspace-notifications API. A module cand register callbacks to be notified + * when keyspce events occur. + * + * Notification events are filtered by their type (string events, set events, + * etc), and the subsriber callback receives only events that match a specific + * mask of event types. + * + * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents + * the module must provide an event type-mask, denoting the events the subscriber + * is interested in. This can be an ORed mask of any of the following flags: + * + * - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME + * - REDISMODULE_NOTIFY_STRING: String events + * - REDISMODULE_NOTIFY_LIST: List events + * - REDISMODULE_NOTIFY_SET: Set events + * - REDISMODULE_NOTIFY_HASH: Hash events + * - REDISMODULE_NOTIFY_ZSET: Sorted Set events + * - REDISMODULE_NOTIFY_EXPIRED: Expiration events + * - REDISMODULE_NOTIFY_EVICTED: Eviction events + * - REDISMODULE_NOTIFY_STREAM: Stream events + * - REDISMODULE_NOTIFY_ALL: All events + * + * We do not distinguish between key events and keyspace events, and it is up + * to the module to filter the actions taken based on the key. + * + * The subscriber signature is: + * + * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, + * const char *event, + * RedisModuleString *key); + * + * `type` is the event type bit, that must match the mask given at registration + * time. The event string is the actual command being executed, and key is the + * relevant Redis key. + * + * Notification callback gets executed with a redis context that can not be + * used to send anything to the client, and has the db number where the event + * occured as its selected db number. + * + * Notice that it is not necessary to enable norifications in redis.conf for + * module notifications to work. + * + * Warning: the notification callbacks are performed in a synchronous manner, + * so notification callbacks must to be fast, or they would slow Redis down. + * If you need to take long actions, use threads to offload them. + * + * See https://redis.io/topics/notifications for more information. + */ +int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { + RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub)); + sub->module = ctx->module; + sub->event_mask = types; + sub->notify_callback = callback; + sub->active = 0; + + listAddNodeTail(moduleKeyspaceSubscribers, sub); + return REDISMODULE_OK; + +} + +/* Dispatcher for keyspace notifications to module subscriber functions. + * This gets called only if at least one module requested to be notified on + * keyspace notifications */ +void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { + + /* Don't do anything if there aren't any subscribers */ + if (listLength(moduleKeyspaceSubscribers) == 0) return; + + listIter li; + listNode *ln; + + listRewind(moduleKeyspaceSubscribers,&li); + + /* Remove irrelevant flags from the type mask */ + type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE); + + + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + /* Only notify subscribers on events matching they registration, + * and avoid subscribers triggering themselves */ + if ((sub->event_mask & type) && sub->active == 0) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = sub->module; + ctx.client = moduleKeyspaceSubscribersClient; + selectDb(ctx.client, dbid); + + /* mark the handler as activer to avoid reentrant loops. + * If the subscriber performs an action triggering itself, + * it will not be notified about it. */ + sub->active = 1; + sub->notify_callback(&ctx, type, event, key); + sub->active = 0; + moduleFreeContext(&ctx); + } + } + +} + +/* Unsubscribe any notification subscirbers this module has upon unloading */ +void moduleUnsubscribeNotifications(RedisModule *module) { + listIter li; + listNode *ln; + listRewind(moduleKeyspaceSubscribers,&li); + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + if (sub->module == module) { + listDelNode(moduleKeyspaceSubscribers, ln); + zfree(sub); + } + } +} + + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -3706,9 +3848,15 @@ void moduleRegisterCoreAPI(void); void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); - + server.loadmodule_queue = listCreate(); modules = dictCreate(&modulesDictType,NULL); + + /* Set up the keyspace notification susbscriber list and static client */ + moduleKeyspaceSubscribers = listCreate(); + moduleKeyspaceSubscribersClient = createClient(-1); + moduleKeyspaceSubscribersClient->flags |= CLIENT_MODULE; + moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, @@ -3759,6 +3907,7 @@ void moduleFreeModuleStructure(struct RedisModule *module) { zfree(module); } + void moduleUnregisterCommands(struct RedisModule *module) { /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); @@ -3819,6 +3968,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { return C_OK; } + /* Unload the module registered with the specified name. On success * C_OK is returned, otherwise C_ERR is returned and errno is set * to the following values depending on the type of error: @@ -3840,6 +3990,9 @@ int moduleUnload(sds name) { moduleUnregisterCommands(module); + /* Remvoe any noification subscribers this module might have */ + moduleUnsubscribeNotifications(module); + /* Unregister all the hooks. TODO: Yet no hooks support here. */ /* Unload the dynamic library. */ @@ -4037,4 +4190,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DigestAddStringBuffer); REGISTER_API(DigestAddLongLong); REGISTER_API(DigestEndSequence); + REGISTER_API(SubscribeToKeyspaceEvents); } diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index 956b2417..67a86170 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -30,6 +30,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#define REDISMODULE_EXPERIMENTAL_API #include "../redismodule.h" #include @@ -124,6 +125,7 @@ int failTest(RedisModuleCtx *ctx, const char *msg) { RedisModule_ReplyWithError(ctx, msg); return REDISMODULE_ERR; } + int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); REDISMODULE_NOT_USED(argv); @@ -153,80 +155,153 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } +int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, + RedisModuleString *key) { + /* Increment a counter on the notifications: for each key notified we + * increment a counter */ + RedisModule_Log(ctx, "notice", "Got event type %d, event %s, key %s", type, + event, RedisModule_StringPtrLen(key, NULL)); + + RedisModule_Call(ctx, "HINCRBY", "csc", "notifications", key, "1"); + return REDISMODULE_OK; +} + +/* TEST.NOTIFICATIONS -- Test Keyspace Notifications. */ +int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + +#define FAIL(msg, ...) \ + { \ + RedisModule_Log(ctx, "warning", "Failed NOTIFY Test. Reason: " #msg, ##__VA_ARGS__); \ + goto err; \ + } + RedisModule_Call(ctx, "FLUSHDB", ""); + + RedisModule_Call(ctx, "SET", "cc", "foo", "bar"); + RedisModule_Call(ctx, "SET", "cc", "foo", "baz"); + RedisModule_Call(ctx, "SADD", "cc", "bar", "x"); + RedisModule_Call(ctx, "SADD", "cc", "bar", "y"); + + RedisModule_Call(ctx, "HSET", "ccc", "baz", "x", "y"); + /* LPUSH should be ignored and not increment any counters */ + RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + + size_t sz; + const char *rep; + RedisModuleCallReply *r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for foo"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%s'. expected '2'", RedisModule_CallReplyStringPtr(r, NULL)); + } + } + + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "bar"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for bar"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%s'. expected '2'", rep); + } + } + + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "baz"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for baz"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '1') { + FAIL("Got reply '%.*s'. expected '1'", sz, rep); + } + } + /* For l we expect nothing since we didn't subscribe to list events */ + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "l"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_NULL) { + FAIL("Wrong reply for l"); + } + + RedisModule_Call(ctx, "FLUSHDB", ""); + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +err: + RedisModule_Call(ctx, "FLUSHDB", ""); + + return RedisModule_ReplyWithSimpleString(ctx, "ERR"); +} + /* TEST.CTXFLAGS -- Test GetContextFlags. */ int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argv); - + RedisModule_AutoMemory(ctx); - + int ok = 1; const char *errString = NULL; - - #define FAIL(msg) \ - { \ - ok = 0; \ - errString = msg; \ - goto end; \ +#undef FAIL +#define FAIL(msg) \ + { \ + ok = 0; \ + errString = msg; \ + goto end; \ } - + int flags = RedisModule_GetContextFlags(ctx); if (flags == 0) { - FAIL("Got no flags"); + FAIL("Got no flags"); } - + if (flags & REDISMODULE_CTX_FLAGS_LUA) FAIL("Lua flag was set"); if (flags & REDISMODULE_CTX_FLAGS_MULTI) FAIL("Multi flag was set"); - + if (flags & REDISMODULE_CTX_FLAGS_AOF) FAIL("AOF Flag was set") /* Enable AOF to test AOF flags */ RedisModule_Call(ctx, "config", "ccc", "set", "appendonly", "yes"); flags = RedisModule_GetContextFlags(ctx); - if (!(flags & REDISMODULE_CTX_FLAGS_AOF)) - FAIL("AOF Flag not set after config set"); - + if (!(flags & REDISMODULE_CTX_FLAGS_AOF)) FAIL("AOF Flag not set after config set"); + if (flags & REDISMODULE_CTX_FLAGS_RDB) FAIL("RDB Flag was set"); /* Enable RDB to test RDB flags */ RedisModule_Call(ctx, "config", "ccc", "set", "save", "900 1"); flags = RedisModule_GetContextFlags(ctx); - if (!(flags & REDISMODULE_CTX_FLAGS_RDB)) - FAIL("RDB Flag was not set after config set"); - + if (!(flags & REDISMODULE_CTX_FLAGS_RDB)) FAIL("RDB Flag was not set after config set"); + if (!(flags & REDISMODULE_CTX_FLAGS_MASTER)) FAIL("Master flag was not set"); if (flags & REDISMODULE_CTX_FLAGS_SLAVE) FAIL("Slave flag was set"); if (flags & REDISMODULE_CTX_FLAGS_READONLY) FAIL("Read-only flag was set"); if (flags & REDISMODULE_CTX_FLAGS_CLUSTER) FAIL("Cluster flag was set"); - + if (flags & REDISMODULE_CTX_FLAGS_MAXMEMORY) FAIL("Maxmemory flag was set"); - ; + RedisModule_Call(ctx, "config", "ccc", "set", "maxmemory", "100000000"); flags = RedisModule_GetContextFlags(ctx); if (!(flags & REDISMODULE_CTX_FLAGS_MAXMEMORY)) - FAIL("Maxmemory flag was not set after config set"); - + FAIL("Maxmemory flag was not set after config set"); + if (flags & REDISMODULE_CTX_FLAGS_EVICT) FAIL("Eviction flag was set"); - RedisModule_Call(ctx, "config", "ccc", "set", "maxmemory-policy", - "allkeys-lru"); + RedisModule_Call(ctx, "config", "ccc", "set", "maxmemory-policy", "allkeys-lru"); flags = RedisModule_GetContextFlags(ctx); - if (!(flags & REDISMODULE_CTX_FLAGS_EVICT)) - FAIL("Eviction flag was not set after config set"); - - end: + if (!(flags & REDISMODULE_CTX_FLAGS_EVICT)) FAIL("Eviction flag was not set after config set"); + +end: /* Revert config changes */ RedisModule_Call(ctx, "config", "ccc", "set", "appendonly", "no"); RedisModule_Call(ctx, "config", "ccc", "set", "save", ""); RedisModule_Call(ctx, "config", "ccc", "set", "maxmemory", "0"); RedisModule_Call(ctx, "config", "ccc", "set", "maxmemory-policy", "noeviction"); - - if (!ok) { - RedisModule_Log(ctx, "warning", "Failed CTXFLAGS Test. Reason: %s", - errString); - return RedisModule_ReplyWithSimpleString(ctx, "ERR"); - } - - return RedisModule_ReplyWithSimpleString(ctx, "OK"); - } + if (!ok) { + RedisModule_Log(ctx, "warning", "Failed CTXFLAGS Test. Reason: %s", errString); + return RedisModule_ReplyWithSimpleString(ctx, "ERR"); + } + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} /* ----------------------------- Test framework ----------------------------- */ @@ -310,6 +385,9 @@ int TestIt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { T("test.string.printf", "cc", "foo", "bar"); if (!TestAssertStringReply(ctx,reply,"Got 3 args. argv[1]: foo, argv[2]: bar",38)) goto fail; + T("test.notify", ""); + if (!TestAssertStringReply(ctx,reply,"OK",2)) goto fail; + RedisModule_ReplyWithSimpleString(ctx,"ALL TESTS PASSED"); return REDISMODULE_OK; @@ -354,5 +432,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) TestIt,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + RedisModule_SubscribeToKeyspaceEvents(ctx, + REDISMODULE_NOTIFY_HASH | + REDISMODULE_NOTIFY_SET | + REDISMODULE_NOTIFY_STRING, + NotifyCallback); + if (RedisModule_CreateCommand(ctx,"test.notify", + TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/src/notify.c b/src/notify.c index f98cb920..6dd72f0a 100644 --- a/src/notify.c +++ b/src/notify.c @@ -100,6 +100,12 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { int len = -1; char buf[24]; + /* If any modules are interested in events, notify the module system now. + * This bypasses the notifications configuration, but the module engine + * will only call event subscribers if the event type matches the types + * they are interested in. */ + moduleNotifyKeyspaceEvent(type, event, key, dbid); + /* If notifications for this class of events are off, return ASAP. */ if (!(server.notify_keyspace_events & type)) return; diff --git a/src/redismodule.h b/src/redismodule.h index 09d681c2..8c14fd0d 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -82,6 +82,18 @@ #define REDISMODULE_CTX_FLAGS_EVICT 0x0200 +#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ +#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */ +#define REDISMODULE_NOTIFY_LIST (1<<4) /* l */ +#define REDISMODULE_NOTIFY_SET (1<<5) /* s */ +#define REDISMODULE_NOTIFY_HASH (1<<6) /* h */ +#define REDISMODULE_NOTIFY_ZSET (1<<7) /* z */ +#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ +#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ +#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ + + /* A special pointer that we can use between the core and the module to signal * field deletion, and that is impossible to be a valid pointer. */ #define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1) @@ -112,6 +124,7 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); @@ -251,6 +264,8 @@ RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModu void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx); +int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb); + #endif /* This is included inline inside each Redis module. */ @@ -372,6 +387,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(IsBlockedTimeoutRequest); REDISMODULE_GET_API(GetBlockedClientPrivateData); REDISMODULE_GET_API(AbortBlock); + REDISMODULE_GET_API(SubscribeToKeyspaceEvents); + #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/server.h b/src/server.h index 271f217f..5b6074a8 100644 --- a/src/server.h +++ b/src/server.h @@ -1335,6 +1335,8 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in size_t moduleCount(void); void moduleAcquireGIL(void); void moduleReleaseGIL(void); +void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); + /* Utils */ long long ustime(void);