mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 01:20:50 +00:00
finished implementation of notifications. Tests unfinished
This commit is contained in:
parent
4f2d279dd7
commit
2136035e47
137
src/module.c
137
src/module.c
@ -216,6 +216,25 @@ static list *moduleUnblockedClients;
|
|||||||
* allow thread safe contexts to execute commands at a safe moment. */
|
* allow thread safe contexts to execute commands at a safe moment. */
|
||||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
|
||||||
|
/* Function pointer type for keyspace event notification subscriptions from modules. */
|
||||||
|
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||||
|
|
||||||
|
/* Keyspace notification subscriber information. See RM_SubscribeToKeyspaceEvents */
|
||||||
|
typedef struct RedisModuleKeyspaceSubscriber {
|
||||||
|
/* The module subscribed to the event */
|
||||||
|
RedisModule *module;
|
||||||
|
/* Notification callback in the module*/
|
||||||
|
RedisModuleNotificationFunc notify_callback;
|
||||||
|
/* A bit mask of the events the module is interested in */
|
||||||
|
int event_mask;
|
||||||
|
/* Active flag set on entry, to avoid reentrant subscribers calling themselves */
|
||||||
|
int active;
|
||||||
|
} RedisModuleKeyspaceSubscriber;
|
||||||
|
|
||||||
|
/* The module keyspace notification subscribers list */
|
||||||
|
static list *moduleKeyspaceSubscribers;
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Prototypes
|
* Prototypes
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -3669,6 +3688,114 @@ void moduleReleaseGIL(void) {
|
|||||||
pthread_mutex_unlock(&moduleGIL);
|
pthread_mutex_unlock(&moduleGIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* --------------------------------------------------------------------------
|
||||||
|
* Module Keyspace Notifications API
|
||||||
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* Subscribe to keyspace notifications. This is a low-level version of the
|
||||||
|
* keyspace-notifications API. A module cand register callbacks to be notified
|
||||||
|
* when keyspce events occur.
|
||||||
|
* Notification events are filtered by their type (string events, set events,
|
||||||
|
* etc), and the subsriber callback receives only events that match a specific
|
||||||
|
* mask of event types.
|
||||||
|
* We do not distinguish between key events and keyspace events, and it is up
|
||||||
|
* to the module to filter the actions taken based on the key.
|
||||||
|
*
|
||||||
|
* The subscriber signature is:
|
||||||
|
*
|
||||||
|
* int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
|
||||||
|
* const char *event,
|
||||||
|
* RedisModuleString *key);
|
||||||
|
*
|
||||||
|
* `type` is the event type bit, that must match the mask given at registration
|
||||||
|
* time. The event string is the actual command being executed, and key is the
|
||||||
|
* relevant Redis key.
|
||||||
|
*
|
||||||
|
* A notification callback gets executed with a redis context that can not be
|
||||||
|
* used to send anything to the client, and has the db number where the event
|
||||||
|
* occured as its selected db number.
|
||||||
|
*
|
||||||
|
* Notice that it is not necessary to enable norifications in redis.conf for
|
||||||
|
* module notifications to work.
|
||||||
|
*
|
||||||
|
* Warning: the notification callbacks are performed in a synchronous manner,
|
||||||
|
* so notification callbacks must to be fast, or they would slow Redis down.
|
||||||
|
* If you need to take long actions, use threads to offload them.
|
||||||
|
*
|
||||||
|
* See https://redis.io/topics/notifications for more information.
|
||||||
|
*/
|
||||||
|
int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
|
||||||
|
RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
|
||||||
|
sub->module = ctx->module;
|
||||||
|
sub->event_mask = types;
|
||||||
|
sub->notify_callback = callback;
|
||||||
|
sub->active = 0;
|
||||||
|
|
||||||
|
/* Let the notification system know that modules are interested in notifications */
|
||||||
|
server.notify_keyspace_events |= NOTIFY_MODULE;
|
||||||
|
listAddNodeTail(moduleKeyspaceSubscribers, sub);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Dispatcher for keyspace notifications to module subscriber functions.
|
||||||
|
* This gets called only if at least one module requested to be notified on
|
||||||
|
* keyspace notifications */
|
||||||
|
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
listRewind(moduleKeyspaceSubscribers,&li);
|
||||||
|
|
||||||
|
/* Remove irrelevant flags from the type mask */
|
||||||
|
type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
|
||||||
|
|
||||||
|
/* Setup a fake client, so we can have proper db selection when performing
|
||||||
|
* actions. We use one client for all handlers, writing to it will crash */
|
||||||
|
client *c = createClient(-1);
|
||||||
|
c->flags |= CLIENT_MODULE;
|
||||||
|
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
RedisModuleKeyspaceSubscriber *sub = ln->value;
|
||||||
|
/* Only notify subscribers on events matching they registration,
|
||||||
|
* and avoid subscribers triggering themselves */
|
||||||
|
if ((sub->event_mask & type) && sub->active == 0) {
|
||||||
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
|
ctx.module = sub->module;
|
||||||
|
selectDb(c, dbid);
|
||||||
|
ctx.client = c;
|
||||||
|
/* mark the handler as activer to avoid reentrant loops.
|
||||||
|
* If the subscriber performs an action triggering itself,
|
||||||
|
* it will not be notified about it. */
|
||||||
|
sub->active = 1;
|
||||||
|
sub->notify_callback(&ctx, type, event, key);
|
||||||
|
sub->active = 0;
|
||||||
|
moduleFreeContext(&ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
freeClient(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unsubscribe any notification subscirbers this module has upon unloading */
|
||||||
|
void moduleUnsubscribeNotifications(RedisModule *module) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(moduleKeyspaceSubscribers,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
RedisModuleKeyspaceSubscriber *sub = ln->value;
|
||||||
|
if (sub->module == module) {
|
||||||
|
listDelNode(moduleKeyspaceSubscribers, ln);
|
||||||
|
zfree(sub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* If no subscribers are left - do not call the module norification function */
|
||||||
|
if (listLength(moduleKeyspaceSubscribers) == 0) {
|
||||||
|
server.notify_keyspace_events &= ~NOTIFY_MODULE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Modules API internals
|
* Modules API internals
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -3706,9 +3833,11 @@ void moduleRegisterCoreAPI(void);
|
|||||||
|
|
||||||
void moduleInitModulesSystem(void) {
|
void moduleInitModulesSystem(void) {
|
||||||
moduleUnblockedClients = listCreate();
|
moduleUnblockedClients = listCreate();
|
||||||
|
|
||||||
server.loadmodule_queue = listCreate();
|
server.loadmodule_queue = listCreate();
|
||||||
modules = dictCreate(&modulesDictType,NULL);
|
modules = dictCreate(&modulesDictType,NULL);
|
||||||
|
moduleKeyspaceSubscribers = listCreate();
|
||||||
|
|
||||||
moduleRegisterCoreAPI();
|
moduleRegisterCoreAPI();
|
||||||
if (pipe(server.module_blocked_pipe) == -1) {
|
if (pipe(server.module_blocked_pipe) == -1) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -3759,6 +3888,7 @@ void moduleFreeModuleStructure(struct RedisModule *module) {
|
|||||||
zfree(module);
|
zfree(module);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void moduleUnregisterCommands(struct RedisModule *module) {
|
void moduleUnregisterCommands(struct RedisModule *module) {
|
||||||
/* Unregister all the commands registered by this module. */
|
/* Unregister all the commands registered by this module. */
|
||||||
dictIterator *di = dictGetSafeIterator(server.commands);
|
dictIterator *di = dictGetSafeIterator(server.commands);
|
||||||
@ -3819,6 +3949,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Unload the module registered with the specified name. On success
|
/* Unload the module registered with the specified name. On success
|
||||||
* C_OK is returned, otherwise C_ERR is returned and errno is set
|
* C_OK is returned, otherwise C_ERR is returned and errno is set
|
||||||
* to the following values depending on the type of error:
|
* to the following values depending on the type of error:
|
||||||
@ -3840,6 +3971,9 @@ int moduleUnload(sds name) {
|
|||||||
|
|
||||||
moduleUnregisterCommands(module);
|
moduleUnregisterCommands(module);
|
||||||
|
|
||||||
|
/* Remvoe any noification subscribers this module might have */
|
||||||
|
moduleUnsubscribeNotifications(module);
|
||||||
|
|
||||||
/* Unregister all the hooks. TODO: Yet no hooks support here. */
|
/* Unregister all the hooks. TODO: Yet no hooks support here. */
|
||||||
|
|
||||||
/* Unload the dynamic library. */
|
/* Unload the dynamic library. */
|
||||||
@ -4037,4 +4171,5 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(DigestAddStringBuffer);
|
REGISTER_API(DigestAddStringBuffer);
|
||||||
REGISTER_API(DigestAddLongLong);
|
REGISTER_API(DigestAddLongLong);
|
||||||
REGISTER_API(DigestEndSequence);
|
REGISTER_API(DigestEndSequence);
|
||||||
|
REGISTER_API(SubscribeToKeyspaceEvents);
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,13 @@ helloblock.xo: ../redismodule.h
|
|||||||
helloblock.so: helloblock.xo
|
helloblock.so: helloblock.xo
|
||||||
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
|
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
|
||||||
|
|
||||||
|
|
||||||
|
hellonotify.xo: ../redismodule.h
|
||||||
|
|
||||||
|
hellonotify.so: hellonotify.xo
|
||||||
|
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
|
||||||
|
|
||||||
|
|
||||||
testmodule.xo: ../redismodule.h
|
testmodule.xo: ../redismodule.h
|
||||||
|
|
||||||
testmodule.so: testmodule.xo
|
testmodule.so: testmodule.xo
|
||||||
|
79
src/modules/hellonotify.c
Normal file
79
src/modules/hellonotify.c
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
/* Helloworld module -- A few examples of the Redis Modules API in the form
|
||||||
|
* of commands showing how to accomplish common tasks.
|
||||||
|
*
|
||||||
|
* This module does not do anything useful, if not for a few commands. The
|
||||||
|
* examples are designed in order to show the API.
|
||||||
|
*
|
||||||
|
* -----------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions are met:
|
||||||
|
*
|
||||||
|
* * Redistributions of source code must retain the above copyright notice,
|
||||||
|
* this list of conditions and the following disclaimer.
|
||||||
|
* * Redistributions in binary form must reproduce the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer in the
|
||||||
|
* documentation and/or other materials provided with the distribution.
|
||||||
|
* * Neither the name of Redis nor the names of its contributors may be used
|
||||||
|
* to endorse or promote products derived from this software without
|
||||||
|
* specific prior written permission.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||||
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||||
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||||
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||||
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||||
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||||
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||||
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||||
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||||
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||||
|
* POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include <ctype.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "../redismodule.h"
|
||||||
|
|
||||||
|
/* HELLO.SIMPLE is among the simplest commands you can implement.
|
||||||
|
* It just returns the currently selected DB id, a functionality which is
|
||||||
|
* missing in Redis. The command uses two important API calls: one to
|
||||||
|
* fetch the currently selected DB, the other in order to send the client
|
||||||
|
* an integer reply as response. */
|
||||||
|
int HelloSimple_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||||
|
int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, RedisModule_GetSelectedDb(ctx));
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int HelloNotify_Callback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
||||||
|
REDISMODULE_NOT_USED(ctx);
|
||||||
|
REDISMODULE_NOT_USED(ctx);
|
||||||
|
RedisModule_Log(ctx,
|
||||||
|
"notice",
|
||||||
|
"Received notification! Event type: %d, event: %s, key: %s",
|
||||||
|
type, event, RedisModule_StringPtrLen(key, NULL));
|
||||||
|
RedisModule_Call(ctx, "SET", "cc", "foo", "bar");
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
/* This function must be present on each Redis module. It is used in order to
|
||||||
|
* register the commands into the Redis server. */
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||||
|
int argc) {
|
||||||
|
if (RedisModule_Init(ctx, "notify", 1, REDISMODULE_APIVER_1) ==
|
||||||
|
REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_ALL, HelloNotify_Callback);
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
@ -30,6 +30,7 @@
|
|||||||
* POSSIBILITY OF SUCH DAMAGE.
|
* POSSIBILITY OF SUCH DAMAGE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
#include "../redismodule.h"
|
#include "../redismodule.h"
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
@ -153,6 +154,80 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
||||||
|
// Increment a counter on the notifications
|
||||||
|
RedisModule_Log(ctx, "notice", "Got event type %d, event %s, key %s\n", type, event, RedisModule_StringPtrLen(key, NULL));
|
||||||
|
|
||||||
|
RedisModule_Call(ctx, "HINCRBY", "csc", "notifications", key, "1");
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* TEST.NOTIFICATIONS -- Test Keyspace Notifications. */
|
||||||
|
int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
|
||||||
|
#define FAIL(msg, ...) \
|
||||||
|
{ \
|
||||||
|
RedisModule_Log(ctx, "warning", "Failed NOTIFY Test. Reason: " #msg, \
|
||||||
|
##__VA_ARGS__); \
|
||||||
|
goto err; \
|
||||||
|
}
|
||||||
|
RedisModule_Call(ctx, "FLUSHDB", "");
|
||||||
|
|
||||||
|
RedisModule_Call(ctx, "SET", "cc", "foo", "bar");
|
||||||
|
RedisModule_Call(ctx, "SET", "cc", "foo", "baz");
|
||||||
|
RedisModule_Call(ctx, "SADD", "cc", "bar", "x");
|
||||||
|
RedisModule_Call(ctx, "SADD", "cc", "bar", "y");
|
||||||
|
|
||||||
|
RedisModule_Call(ctx, "HSET", "ccc", "baz", "x", "y");
|
||||||
|
// LPUSH should be ignored and not increment any counters
|
||||||
|
RedisModule_Call(ctx, "LPUSH", "cc", "l", "y");
|
||||||
|
RedisModule_Call(ctx, "LPUSH", "cc", "l", "y");
|
||||||
|
|
||||||
|
size_t sz;
|
||||||
|
const char *rep;
|
||||||
|
RedisModuleCallReply *r =
|
||||||
|
RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo");
|
||||||
|
if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) {
|
||||||
|
FAIL("Wrong or no reply for foo");
|
||||||
|
} else {
|
||||||
|
rep = RedisModule_CallReplyStringPtr(r, &sz);
|
||||||
|
if (sz != 1 || *rep != '2') {
|
||||||
|
FAIL("Got reply '%s'. expected '2'",
|
||||||
|
RedisModule_CallReplyStringPtr(r, NULL));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "bar");
|
||||||
|
if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) {
|
||||||
|
FAIL("Wrong or no reply for bar");
|
||||||
|
} else {
|
||||||
|
rep = RedisModule_CallReplyStringPtr(r, &sz);
|
||||||
|
if (sz != 1 || *rep != '2') {
|
||||||
|
FAIL("Got reply '%s'. expected '2'", rep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "baz");
|
||||||
|
if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) {
|
||||||
|
FAIL("Wrong or no reply for baz");
|
||||||
|
} else {
|
||||||
|
rep = RedisModule_CallReplyStringPtr(r, &sz);
|
||||||
|
if (sz != 1 || *rep != '1') {
|
||||||
|
FAIL("Got reply '%.*s'. expected '1'", sz, rep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "l");
|
||||||
|
if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_NULL) {
|
||||||
|
FAIL("Wrong reply for l");
|
||||||
|
}
|
||||||
|
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
|
err:
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "ERR");
|
||||||
|
}
|
||||||
|
|
||||||
/* TEST.CTXFLAGS -- Test GetContextFlags. */
|
/* TEST.CTXFLAGS -- Test GetContextFlags. */
|
||||||
int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
REDISMODULE_NOT_USED(argc);
|
REDISMODULE_NOT_USED(argc);
|
||||||
@ -162,7 +237,7 @@ int TestCtxFlags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|||||||
|
|
||||||
int ok = 1;
|
int ok = 1;
|
||||||
const char *errString = NULL;
|
const char *errString = NULL;
|
||||||
|
#undef FAIL
|
||||||
#define FAIL(msg) \
|
#define FAIL(msg) \
|
||||||
{ \
|
{ \
|
||||||
ok = 0; \
|
ok = 0; \
|
||||||
@ -310,6 +385,9 @@ int TestIt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|||||||
T("test.string.printf", "cc", "foo", "bar");
|
T("test.string.printf", "cc", "foo", "bar");
|
||||||
if (!TestAssertStringReply(ctx,reply,"Got 3 args. argv[1]: foo, argv[2]: bar",38)) goto fail;
|
if (!TestAssertStringReply(ctx,reply,"Got 3 args. argv[1]: foo, argv[2]: bar",38)) goto fail;
|
||||||
|
|
||||||
|
T("test.notify", "");
|
||||||
|
if (!TestAssertStringReply(ctx,reply,"OK",2)) goto fail;
|
||||||
|
|
||||||
RedisModule_ReplyWithSimpleString(ctx,"ALL TESTS PASSED");
|
RedisModule_ReplyWithSimpleString(ctx,"ALL TESTS PASSED");
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
|
|
||||||
@ -354,5 +432,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
TestIt,"readonly",1,1,1) == REDISMODULE_ERR)
|
TestIt,"readonly",1,1,1) == REDISMODULE_ERR)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
RedisModule_SubscribeToKeyspaceEvents(ctx,
|
||||||
|
REDISMODULE_NOTIFY_HASH |
|
||||||
|
REDISMODULE_NOTIFY_SET |
|
||||||
|
REDISMODULE_NOTIFY_STRING,
|
||||||
|
NotifyCallback);
|
||||||
|
if (RedisModule_CreateCommand(ctx,"test.notify",
|
||||||
|
TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
@ -87,6 +87,8 @@ sds keyspaceEventsFlagsToString(int flags) {
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* The API provided to the rest of the Redis core is a simple function:
|
/* The API provided to the rest of the Redis core is a simple function:
|
||||||
*
|
*
|
||||||
* notifyKeyspaceEvent(char *event, robj *key, int dbid);
|
* notifyKeyspaceEvent(char *event, robj *key, int dbid);
|
||||||
@ -100,6 +102,13 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
|
|||||||
int len = -1;
|
int len = -1;
|
||||||
char buf[24];
|
char buf[24];
|
||||||
|
|
||||||
|
/* If any modules are interested in events, notify the module system now.
|
||||||
|
* This bypasses the notifications configuration, but the module engine
|
||||||
|
* will only call event subscribers if the event type matches the types
|
||||||
|
* they are interested in. */
|
||||||
|
if (server.notify_keyspace_events & NOTIFY_MODULE)
|
||||||
|
moduleNotifyKeyspaceEvent(type, event, key, dbid);
|
||||||
|
|
||||||
/* If notifications for this class of events are off, return ASAP. */
|
/* If notifications for this class of events are off, return ASAP. */
|
||||||
if (!(server.notify_keyspace_events & type)) return;
|
if (!(server.notify_keyspace_events & type)) return;
|
||||||
|
|
||||||
|
@ -82,6 +82,17 @@
|
|||||||
#define REDISMODULE_CTX_FLAGS_EVICT 0x0200
|
#define REDISMODULE_CTX_FLAGS_EVICT 0x0200
|
||||||
|
|
||||||
|
|
||||||
|
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
|
||||||
|
#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */
|
||||||
|
#define REDISMODULE_NOTIFY_LIST (1<<4) /* l */
|
||||||
|
#define REDISMODULE_NOTIFY_SET (1<<5) /* s */
|
||||||
|
#define REDISMODULE_NOTIFY_HASH (1<<6) /* h */
|
||||||
|
#define REDISMODULE_NOTIFY_ZSET (1<<7) /* z */
|
||||||
|
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
|
||||||
|
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
|
||||||
|
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED) /* A */
|
||||||
|
|
||||||
|
|
||||||
/* A special pointer that we can use between the core and the module to signal
|
/* A special pointer that we can use between the core and the module to signal
|
||||||
* field deletion, and that is impossible to be a valid pointer. */
|
* field deletion, and that is impossible to be a valid pointer. */
|
||||||
#define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1)
|
#define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1)
|
||||||
@ -112,6 +123,7 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
|
|||||||
|
|
||||||
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||||
|
|
||||||
|
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||||
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
||||||
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
||||||
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
||||||
@ -251,6 +263,8 @@ RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModu
|
|||||||
void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
|
void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
|
||||||
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
|
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
|
||||||
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
|
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* This is included inline inside each Redis module. */
|
/* This is included inline inside each Redis module. */
|
||||||
@ -372,6 +386,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
|
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
|
||||||
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
||||||
REDISMODULE_GET_API(AbortBlock);
|
REDISMODULE_GET_API(AbortBlock);
|
||||||
|
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
||||||
|
@ -429,6 +429,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define NOTIFY_EXPIRED (1<<8) /* x */
|
#define NOTIFY_EXPIRED (1<<8) /* x */
|
||||||
#define NOTIFY_EVICTED (1<<9) /* e */
|
#define NOTIFY_EVICTED (1<<9) /* e */
|
||||||
#define NOTIFY_STREAM (1<<10) /* t */
|
#define NOTIFY_STREAM (1<<10) /* t */
|
||||||
|
#define NOTIFY_MODULE (1<<11) /* modules are interested in notifications */
|
||||||
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
|
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
|
||||||
|
|
||||||
/* Get the first bind addr or NULL */
|
/* Get the first bind addr or NULL */
|
||||||
@ -1335,6 +1336,8 @@ void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, in
|
|||||||
size_t moduleCount(void);
|
size_t moduleCount(void);
|
||||||
void moduleAcquireGIL(void);
|
void moduleAcquireGIL(void);
|
||||||
void moduleReleaseGIL(void);
|
void moduleReleaseGIL(void);
|
||||||
|
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||||
|
|
||||||
|
|
||||||
/* Utils */
|
/* Utils */
|
||||||
long long ustime(void);
|
long long ustime(void);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user