From 2136035e47c5c50aec07270e0c87a9e5cd5e243d Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Mon, 27 Nov 2017 16:29:55 +0200 Subject: [PATCH] finished implementation of notifications. Tests unfinished --- src/module.c | 137 +++++++++++++++++++++++++++++++++++++- src/modules/Makefile | 7 ++ src/modules/hellonotify.c | 79 ++++++++++++++++++++++ src/modules/testmodule.c | 89 ++++++++++++++++++++++++- src/notify.c | 9 +++ src/redismodule.h | 16 +++++ src/server.h | 3 + 7 files changed, 338 insertions(+), 2 deletions(-) create mode 100644 src/modules/hellonotify.c diff --git a/src/module.c b/src/module.c index 545f1a74..9288dafc 100644 --- a/src/module.c +++ b/src/module.c @@ -216,6 +216,25 @@ static list *moduleUnblockedClients; * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; + +/* Function pointer type for keyspace event notification subscriptions from modules. */ +typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); + +/* Keyspace notification subscriber information. See RM_SubscribeToKeyspaceEvents */ +typedef struct RedisModuleKeyspaceSubscriber { + /* The module subscribed to the event */ + RedisModule *module; + /* Notification callback in the module*/ + RedisModuleNotificationFunc notify_callback; + /* A bit mask of the events the module is interested in */ + int event_mask; + /* Active flag set on entry, to avoid reentrant subscribers calling themselves */ + int active; +} RedisModuleKeyspaceSubscriber; + +/* The module keyspace notification subscribers list */ +static list *moduleKeyspaceSubscribers; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -3669,6 +3688,114 @@ void moduleReleaseGIL(void) { pthread_mutex_unlock(&moduleGIL); } + +/* -------------------------------------------------------------------------- + * Module Keyspace Notifications API + * -------------------------------------------------------------------------- */ + +/* Subscribe to keyspace notifications. This is a low-level version of the + * keyspace-notifications API. A module cand register callbacks to be notified + * when keyspce events occur. + * Notification events are filtered by their type (string events, set events, + * etc), and the subsriber callback receives only events that match a specific + * mask of event types. + * We do not distinguish between key events and keyspace events, and it is up + * to the module to filter the actions taken based on the key. + * + * The subscriber signature is: + * + * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, + * const char *event, + * RedisModuleString *key); + * + * `type` is the event type bit, that must match the mask given at registration + * time. The event string is the actual command being executed, and key is the + * relevant Redis key. + * + * A notification callback gets executed with a redis context that can not be + * used to send anything to the client, and has the db number where the event + * occured as its selected db number. + * + * Notice that it is not necessary to enable norifications in redis.conf for + * module notifications to work. + * + * Warning: the notification callbacks are performed in a synchronous manner, + * so notification callbacks must to be fast, or they would slow Redis down. + * If you need to take long actions, use threads to offload them. + * + * See https://redis.io/topics/notifications for more information. + */ +int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { + RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub)); + sub->module = ctx->module; + sub->event_mask = types; + sub->notify_callback = callback; + sub->active = 0; + + /* Let the notification system know that modules are interested in notifications */ + server.notify_keyspace_events |= NOTIFY_MODULE; + listAddNodeTail(moduleKeyspaceSubscribers, sub); + return REDISMODULE_OK; + +} + +/* Dispatcher for keyspace notifications to module subscriber functions. + * This gets called only if at least one module requested to be notified on + * keyspace notifications */ +void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { + listIter li; + listNode *ln; + + listRewind(moduleKeyspaceSubscribers,&li); + + /* Remove irrelevant flags from the type mask */ + type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE); + + /* Setup a fake client, so we can have proper db selection when performing + * actions. We use one client for all handlers, writing to it will crash */ + client *c = createClient(-1); + c->flags |= CLIENT_MODULE; + + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + /* Only notify subscribers on events matching they registration, + * and avoid subscribers triggering themselves */ + if ((sub->event_mask & type) && sub->active == 0) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = sub->module; + selectDb(c, dbid); + ctx.client = c; + /* mark the handler as activer to avoid reentrant loops. + * If the subscriber performs an action triggering itself, + * it will not be notified about it. */ + sub->active = 1; + sub->notify_callback(&ctx, type, event, key); + sub->active = 0; + moduleFreeContext(&ctx); + } + } + freeClient(c); +} + +/* Unsubscribe any notification subscirbers this module has upon unloading */ +void moduleUnsubscribeNotifications(RedisModule *module) { + listIter li; + listNode *ln; + listRewind(moduleKeyspaceSubscribers,&li); + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + if (sub->module == module) { + listDelNode(moduleKeyspaceSubscribers, ln); + zfree(sub); + } + } + /* If no subscribers are left - do not call the module norification function */ + if (listLength(moduleKeyspaceSubscribers) == 0) { + server.notify_keyspace_events &= ~NOTIFY_MODULE; + } +} + + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -3706,9 +3833,11 @@ void moduleRegisterCoreAPI(void); void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); - + server.loadmodule_queue = listCreate(); modules = dictCreate(&modulesDictType,NULL); + moduleKeyspaceSubscribers = listCreate(); + moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, @@ -3759,6 +3888,7 @@ void moduleFreeModuleStructure(struct RedisModule *module) { zfree(module); } + void moduleUnregisterCommands(struct RedisModule *module) { /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); @@ -3819,6 +3949,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { return C_OK; } + /* Unload the module registered with the specified name. On success * C_OK is returned, otherwise C_ERR is returned and errno is set * to the following values depending on the type of error: @@ -3840,6 +3971,9 @@ int moduleUnload(sds name) { moduleUnregisterCommands(module); + /* Remvoe any noification subscribers this module might have */ + moduleUnsubscribeNotifications(module); + /* Unregister all the hooks. TODO: Yet no hooks support here. */ /* Unload the dynamic library. */ @@ -4037,4 +4171,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DigestAddStringBuffer); REGISTER_API(DigestAddLongLong); REGISTER_API(DigestEndSequence); + REGISTER_API(SubscribeToKeyspaceEvents); } diff --git a/src/modules/Makefile b/src/modules/Makefile index 066e65e9..7f7b9a74 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -33,6 +33,13 @@ helloblock.xo: ../redismodule.h helloblock.so: helloblock.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc + +hellonotify.xo: ../redismodule.h + +hellonotify.so: hellonotify.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc + + testmodule.xo: ../redismodule.h testmodule.so: testmodule.xo diff --git a/src/modules/hellonotify.c b/src/modules/hellonotify.c new file mode 100644 index 00000000..f5885923 --- /dev/null +++ b/src/modules/hellonotify.c @@ -0,0 +1,79 @@ +/* Helloworld module -- A few examples of the Redis Modules API in the form + * of commands showing how to accomplish common tasks. + * + * This module does not do anything useful, if not for a few commands. The + * examples are designed in order to show the API. + * + * ----------------------------------------------------------------------------- + * + * Copyright (c) 2016, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#define REDISMODULE_EXPERIMENTAL_API +#include +#include +#include +#include +#include "../redismodule.h" + +/* HELLO.SIMPLE is among the simplest commands you can implement. + * It just returns the currently selected DB id, a functionality which is + * missing in Redis. The command uses two important API calls: one to + * fetch the currently selected DB, the other in order to send the client + * an integer reply as response. */ +int HelloSimple_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, + int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ReplyWithLongLong(ctx, RedisModule_GetSelectedDb(ctx)); + return REDISMODULE_OK; +} + +int HelloNotify_Callback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(ctx); + RedisModule_Log(ctx, + "notice", + "Received notification! Event type: %d, event: %s, key: %s", + type, event, RedisModule_StringPtrLen(key, NULL)); + RedisModule_Call(ctx, "SET", "cc", "foo", "bar"); + return REDISMODULE_OK; +} +/* This function must be present on each Redis module. It is used in order to + * register the commands into the Redis server. */ +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, + int argc) { + if (RedisModule_Init(ctx, "notify", 1, REDISMODULE_APIVER_1) == + REDISMODULE_ERR) + return REDISMODULE_ERR; + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_ALL, HelloNotify_Callback); + + return REDISMODULE_OK; +} \ No newline at end of file diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index 956b2417..e360866f 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -30,6 +30,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#define REDISMODULE_EXPERIMENTAL_API #include "../redismodule.h" #include @@ -153,6 +154,80 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } +int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + // Increment a counter on the notifications + RedisModule_Log(ctx, "notice", "Got event type %d, event %s, key %s\n", type, event, RedisModule_StringPtrLen(key, NULL)); + + RedisModule_Call(ctx, "HINCRBY", "csc", "notifications", key, "1"); + return REDISMODULE_OK; +} + +/* TEST.NOTIFICATIONS -- Test Keyspace Notifications. */ +int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + +#define FAIL(msg, ...) \ + { \ + RedisModule_Log(ctx, "warning", "Failed NOTIFY Test. Reason: " #msg, \ + ##__VA_ARGS__); \ + goto err; \ + } + RedisModule_Call(ctx, "FLUSHDB", ""); + + RedisModule_Call(ctx, "SET", "cc", "foo", "bar"); + RedisModule_Call(ctx, "SET", "cc", "foo", "baz"); + RedisModule_Call(ctx, "SADD", "cc", "bar", "x"); + RedisModule_Call(ctx, "SADD", "cc", "bar", "y"); + + RedisModule_Call(ctx, "HSET", "ccc", "baz", "x", "y"); + // LPUSH should be ignored and not increment any counters + RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + + size_t sz; + const char *rep; + RedisModuleCallReply *r = + RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for foo"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%s'. expected '2'", + RedisModule_CallReplyStringPtr(r, NULL)); + } + } + + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "bar"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for bar"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%s'. expected '2'", rep); + } + } + + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "baz"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for baz"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '1') { + FAIL("Got reply '%.*s'. expected '1'", sz, rep); + } + } + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "l"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_NULL) { + FAIL("Wrong reply for l"); + } + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +err: + return RedisModule_ReplyWithSimpleString(ctx, "ERR"); +} + /* TEST.CTXFLAGS -- Test GetContextFlags. */ int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argc); @@ -162,7 +237,7 @@ int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int ok = 1; const char *errString = NULL; - + #undef FAIL #define FAIL(msg) \ { \ ok = 0; \ @@ -310,6 +385,9 @@ int TestIt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { T("test.string.printf", "cc", "foo", "bar"); if (!TestAssertStringReply(ctx,reply,"Got 3 args. argv[1]: foo, argv[2]: bar",38)) goto fail; + T("test.notify", ""); + if (!TestAssertStringReply(ctx,reply,"OK",2)) goto fail; + RedisModule_ReplyWithSimpleString(ctx,"ALL TESTS PASSED"); return REDISMODULE_OK; @@ -354,5 +432,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) TestIt,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + RedisModule_SubscribeToKeyspaceEvents(ctx, + REDISMODULE_NOTIFY_HASH | + REDISMODULE_NOTIFY_SET | + REDISMODULE_NOTIFY_STRING, + NotifyCallback); + if (RedisModule_CreateCommand(ctx,"test.notify", + TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/src/notify.c b/src/notify.c index 9bbeb142..e77f6791 100644 --- a/src/notify.c +++ b/src/notify.c @@ -87,6 +87,8 @@ sds keyspaceEventsFlagsToString(int flags) { return res; } + + /* The API provided to the rest of the Redis core is a simple function: * * notifyKeyspaceEvent(char *event, robj *key, int dbid); @@ -100,6 +102,13 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { int len = -1; char buf[24]; + /* If any modules are interested in events, notify the module system now. + * This bypasses the notifications configuration, but the module engine + * will only call event subscribers if the event type matches the types + * they are interested in. */ + if (server.notify_keyspace_events & NOTIFY_MODULE) + moduleNotifyKeyspaceEvent(type, event, key, dbid); + /* If notifications for this class of events are off, return ASAP. */ if (!(server.notify_keyspace_events & type)) return; diff --git a/src/redismodule.h b/src/redismodule.h index 09d681c2..4c512cfd 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -82,6 +82,17 @@ #define REDISMODULE_CTX_FLAGS_EVICT 0x0200 +#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ +#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */ +#define REDISMODULE_NOTIFY_LIST (1<<4) /* l */ +#define REDISMODULE_NOTIFY_SET (1<<5) /* s */ +#define REDISMODULE_NOTIFY_HASH (1<<6) /* h */ +#define REDISMODULE_NOTIFY_ZSET (1<<7) /* z */ +#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ +#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED) /* A */ + + /* A special pointer that we can use between the core and the module to signal * field deletion, and that is impossible to be a valid pointer. */ #define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1) @@ -112,6 +123,7 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); @@ -251,6 +263,8 @@ RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModu void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx); +int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb); + #endif /* This is included inline inside each Redis module. */ @@ -372,6 +386,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(IsBlockedTimeoutRequest); REDISMODULE_GET_API(GetBlockedClientPrivateData); REDISMODULE_GET_API(AbortBlock); + REDISMODULE_GET_API(SubscribeToKeyspaceEvents); + #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/server.h b/src/server.h index 271f217f..f4cb7cde 100644 --- a/src/server.h +++ b/src/server.h @@ -429,6 +429,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_STREAM (1<<10) /* t */ +#define NOTIFY_MODULE (1<<11) /* modules are interested in notifications */ #define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ /* Get the first bind addr or NULL */ @@ -1335,6 +1336,8 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in size_t moduleCount(void); void moduleAcquireGIL(void); void moduleReleaseGIL(void); +void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); + /* Utils */ long long ustime(void);