From b4dc782e4e991b8b4f95f1f28305109fcd7b2d70 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 30 Mar 2018 11:05:47 +0200 Subject: [PATCH] Modules Cluster API: sending / receiving API first implementation. --- src/module.c | 78 ++++++++++++++++++++++++++++++++++++++++++++--- src/redismodule.h | 8 ++++- 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/src/module.c b/src/module.c index fc00e9f9..c1e5f0f5 100644 --- a/src/module.c +++ b/src/module.c @@ -3812,23 +3812,91 @@ void moduleUnsubscribeNotifications(RedisModule *module) { * Modules Cluster API * -------------------------------------------------------------------------- */ +/* The Cluster message callback function pointer type. */ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); /* This structure identifies a registered caller: it must match a given module * ID, for a given message type. The callback function is just the function * that was registered as receiver. */ -struct moduleClusterReceiver { +typedef struct moduleClusterReceiver { uint64_t module_id; - uint8_t msg_type; RedisModuleClusterMessageReceiver callback; + struct RedisModule *module; struct moduleClusterReceiver *next; -}; +} moduleClusterReceiver; /* We have an array of message types: each bucket is a linked list of * configured receivers. */ -static struct moduleClusterReceiver *clusterReceivers[UINT8_MAX]; +static moduleClusterReceiver *clusterReceivers[UINT8_MAX]; +/* Dispatch the message to the right module receiver. */ void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) { + moduleClusterReceiver *r = clusterReceivers[type]; + while(r) { + if (r->module_id == module_id) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = r->module; + r->callback(&ctx,sender_id,type,payload,len); + moduleFreeContext(&ctx); + return; + } + r = r->next; + } +} + +/* Register a callback receiver for cluster messages of type 'type'. If there + * was already a registered callback, this will replace the callback function + * with the one provided, otherwise if the callback is set to NULL and there + * is already a callback for this function, the callback is unregistered + * (so this API call is also used in order to delete the receiver). */ +void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) { + uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0); + moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL; + while(r) { + if (r->module_id == module_id) { + /* Found! Set or delete. */ + if (callback) { + r->callback = callback; + } else { + /* Delete the receiver entry if the user is setting + * it to NULL. Just unlink the receiver node from the + * linked list. */ + if (prev) + prev->next = r->next; + else + clusterReceivers[type]->next = r->next; + zfree(r); + } + return; + } + prev = r; + r = r->next; + } + + /* Not found, let's add it. */ + if (callback) { + r = zmalloc(sizeof(*r)); + r->module_id = module_id; + r->module = ctx->module; + r->callback = callback; + r->next = clusterReceivers[type]; + clusterReceivers[type] = r; + } +} + +/* Send a message to all the nodes in the cluster if `target` is NULL, otherwise + * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as + * returned by the receiver callback or by the nodes iteration functions. + * + * The function returns REDISMODULE_OK if the message was successfully sent, + * otherwise if the node is not connected or such node ID does not map to any + * known cluster node, REDISMODULE_ERR is returned. */ +int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) { + uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0); + if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK) + return REDISMODULE_OK; + else + return REDISMODULE_ERR; } /* -------------------------------------------------------------------------- @@ -4210,4 +4278,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DigestAddLongLong); REGISTER_API(DigestEndSequence); REGISTER_API(SubscribeToKeyspaceEvents); + REGISTER_API(RegisterClusterMessageReceiver); + REGISTER_API(SendClusterMessage); } diff --git a/src/redismodule.h b/src/redismodule.h index 8c14fd0d..4af92d9e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -104,6 +104,9 @@ #define REDISMODULE_POSITIVE_INFINITE (1.0/0.0) #define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0) +/* Cluster API defines. */ +#define REDISMODULE_NODE_ID_LEN 40 + #define REDISMODULE_NOT_USED(V) ((void) V) /* ------------------------- End of common defines ------------------------ */ @@ -123,7 +126,6 @@ typedef struct RedisModuleDigest RedisModuleDigest; typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); - typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); @@ -131,6 +133,7 @@ typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value); +typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { @@ -251,6 +254,7 @@ long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void); void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len); void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele); void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md); +int REDISMODULE_API(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -265,6 +269,7 @@ void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb); +void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback); #endif @@ -375,6 +380,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(DigestAddStringBuffer); REDISMODULE_GET_API(DigestAddLongLong); REDISMODULE_GET_API(DigestEndSequence); + REDISMODULE_GET_API(SendClusterMessage); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext);