Modules Cluster API: sending / receiving API first implementation.

This commit is contained in:
antirez 2018-03-30 11:05:47 +02:00
parent 0701cad3de
commit b4dc782e4e
2 changed files with 81 additions and 5 deletions

View File

@ -3812,23 +3812,91 @@ void moduleUnsubscribeNotifications(RedisModule *module) {
* Modules Cluster API
* -------------------------------------------------------------------------- */
/* The Cluster message callback function pointer type. */
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
/* This structure identifies a registered caller: it must match a given module
* ID, for a given message type. The callback function is just the function
* that was registered as receiver. */
struct moduleClusterReceiver {
typedef struct moduleClusterReceiver {
uint64_t module_id;
uint8_t msg_type;
RedisModuleClusterMessageReceiver callback;
struct RedisModule *module;
struct moduleClusterReceiver *next;
};
} moduleClusterReceiver;
/* We have an array of message types: each bucket is a linked list of
* configured receivers. */
static struct moduleClusterReceiver *clusterReceivers[UINT8_MAX];
static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
/* Dispatch the message to the right module receiver. */
void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
moduleClusterReceiver *r = clusterReceivers[type];
while(r) {
if (r->module_id == module_id) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = r->module;
r->callback(&ctx,sender_id,type,payload,len);
moduleFreeContext(&ctx);
return;
}
r = r->next;
}
}
/* Register a callback receiver for cluster messages of type 'type'. If there
* was already a registered callback, this will replace the callback function
* with the one provided, otherwise if the callback is set to NULL and there
* is already a callback for this function, the callback is unregistered
* (so this API call is also used in order to delete the receiver). */
void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) {
uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL;
while(r) {
if (r->module_id == module_id) {
/* Found! Set or delete. */
if (callback) {
r->callback = callback;
} else {
/* Delete the receiver entry if the user is setting
* it to NULL. Just unlink the receiver node from the
* linked list. */
if (prev)
prev->next = r->next;
else
clusterReceivers[type]->next = r->next;
zfree(r);
}
return;
}
prev = r;
r = r->next;
}
/* Not found, let's add it. */
if (callback) {
r = zmalloc(sizeof(*r));
r->module_id = module_id;
r->module = ctx->module;
r->callback = callback;
r->next = clusterReceivers[type];
clusterReceivers[type] = r;
}
}
/* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
* at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as
* returned by the receiver callback or by the nodes iteration functions.
*
* The function returns REDISMODULE_OK if the message was successfully sent,
* otherwise if the node is not connected or such node ID does not map to any
* known cluster node, REDISMODULE_ERR is returned. */
int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) {
uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK)
return REDISMODULE_OK;
else
return REDISMODULE_ERR;
}
/* --------------------------------------------------------------------------
@ -4210,4 +4278,6 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(DigestAddLongLong);
REGISTER_API(DigestEndSequence);
REGISTER_API(SubscribeToKeyspaceEvents);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
}

View File

@ -104,6 +104,9 @@
#define REDISMODULE_POSITIVE_INFINITE (1.0/0.0)
#define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0)
/* Cluster API defines. */
#define REDISMODULE_NODE_ID_LEN 40
#define REDISMODULE_NOT_USED(V) ((void) V)
/* ------------------------- End of common defines ------------------------ */
@ -123,7 +126,6 @@ typedef struct RedisModuleDigest RedisModuleDigest;
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
@ -131,6 +133,7 @@ typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
#define REDISMODULE_TYPE_METHOD_VERSION 1
typedef struct RedisModuleTypeMethods {
@ -251,6 +254,7 @@ long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len);
void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele);
void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
int REDISMODULE_API(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
@ -265,6 +269,7 @@ void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback);
#endif
@ -375,6 +380,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(DigestAddStringBuffer);
REDISMODULE_GET_API(DigestAddLongLong);
REDISMODULE_GET_API(DigestEndSequence);
REDISMODULE_GET_API(SendClusterMessage);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);