diff --git a/src/aof.c b/src/aof.c index 9df1e9b9..aa726d33 100644 --- a/src/aof.c +++ b/src/aof.c @@ -693,6 +693,7 @@ int loadAppendOnlyFile(char *filename) { } /* Run the command in the context of a fake client */ + fakeClient->cmd = cmd; cmd->proc(fakeClient); /* The fake client should not have a reply */ @@ -703,6 +704,7 @@ int loadAppendOnlyFile(char *filename) { /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ freeFakeClientArgv(fakeClient); + fakeClient->cmd = NULL; if (server.aof_load_truncated) valid_up_to = ftello(fp); } @@ -983,6 +985,18 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { return 1; } +/* Call the module type callback in order to rewrite a data type + * taht is exported by a module and is not handled by Redis itself. + * The function returns 0 on error, 1 on success. */ +int rewriteModuleObject(rio *r, robj *key, robj *o) { + RedisModuleIO io; + moduleValue *mv = o->ptr; + moduleType *mt = mv->type; + moduleInitIOContext(io,mt,r); + mt->aof_rewrite(&io,key,mv->value); + return io.error ? 0 : 1; +} + /* This function is called by the child rewriting the AOF file to read * the difference accumulated from the parent into a buffer, that is * concatenated at the end of the rewrite. */ @@ -1075,6 +1089,8 @@ int rewriteAppendOnlyFile(char *filename) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; + } else if (o->type == OBJ_MODULE) { + if (rewriteModuleObject(&aof,&key,o) == 0) goto werr; } else { serverPanic("Unknown object type"); } diff --git a/src/db.c b/src/db.c index 6f70a538..a7701c45 100644 --- a/src/db.c +++ b/src/db.c @@ -731,6 +731,10 @@ void typeCommand(client *c) { case OBJ_SET: type = "set"; break; case OBJ_ZSET: type = "zset"; break; case OBJ_HASH: type = "hash"; break; + case OBJ_MODULE: { + moduleValue *mv = o->ptr; + type = mv->type->name; + }; break; default: type = "unknown"; break; } } diff --git a/src/module.c b/src/module.c index 6a8a5f5b..0a16b940 100644 --- a/src/module.c +++ b/src/module.c @@ -17,6 +17,7 @@ struct RedisModule { char *name; /* Module name. */ int ver; /* Module version. We use just progressive integers. */ int apiver; /* Module API version as requested during initialization.*/ + list *types; /* Module data types. */ }; typedef struct RedisModule RedisModule; @@ -164,6 +165,35 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); void RM_ZsetRangeStop(RedisModuleKey *key); +/* -------------------------------------------------------------------------- + * Heap allocation raw functions + * -------------------------------------------------------------------------- */ + +/* Use like malloc(). Memory allocated with this function is reported in + * Redis INFO memory, used for keys eviction according to maxmemory settings + * and in general is taken into account as memory allocated by Redis. + * You should avoid to use malloc(). */ +void *RM_Alloc(size_t bytes) { + return zmalloc(bytes); +} + +/* Use like realloc() for memory obtained with RedisModule_Alloc(). */ +void* RM_Realloc(void *ptr, size_t bytes) { + return zrealloc(ptr,bytes); +} + +/* Use like free() for memory obtained by RedisModule_Alloc() and + * RedisModule_Realloc(). However you should never try to free with + * RedisModule_Free() memory allocated with malloc() inside your module. */ +void RM_Free(void *ptr) { + zfree(ptr); +} + +/* Like strdup() but returns memory allocated with RedisModule_Alloc(). */ +char *RM_Strdup(const char *str) { + return zstrdup(str); +} + /* -------------------------------------------------------------------------- * Pool allocator * -------------------------------------------------------------------------- */ @@ -546,6 +576,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->name = sdsnew((char*)name); module->ver = ver; module->apiver = apiver; + module->types = listCreate(); ctx->module = module; } @@ -1044,6 +1075,7 @@ int RM_KeyType(RedisModuleKey *key) { case OBJ_SET: return REDISMODULE_KEYTYPE_SET; case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET; case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH; + case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE; default: return 0; } } @@ -2280,6 +2312,449 @@ const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) { return reply->proto; } +/* -------------------------------------------------------------------------- + * Modules data types + * + * When String DMA or using existing data structures is not enough, it is + * possible to create new data types from scratch and export them to + * Redis. The module must provide a set of callbacks for handling the + * new values exported (for example in order to provide RDB saving/loading, + * AOF rewrite, and so forth). In this section we define this API. + * -------------------------------------------------------------------------- */ + +/* Turn a 9 chars name in the specified charset and a 10 bit encver into + * a single 64 bit unsigned integer that represents this exact module name + * and version. This final number is called a "type ID" and is used when + * writing module exported values to RDB files, in order to re-associate the + * value to the right module to load them during RDB loading. + * + * If the string is not of the right length or the charset is wrong, or + * if encver is outside the unsigned 10 bit integer range, 0 is returned, + * otherwise the function returns the right type ID. + * + * The resulting 64 bit integer is composed as follows: + * + * (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits) + * + * The first 6 bits value is the first character, name[0], while the last + * 6 bits value, immediately before the 10 bits integer, is name[8]. + * The last 10 bits are the encoding version. + * + * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce + * zero as return value, that is the same we use to signal errors, thus + * this combination is invalid, and also useless since type names should + * try to be vary to avoid collisions. */ + +const char *ModuleTypeNameCharSet = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789-_"; + +uint64_t moduleTypeEncodeId(const char *name, int encver) { + /* We use 64 symbols so that we can map each character into 6 bits + * of the final output. */ + const char *cset = ModuleTypeNameCharSet; + if (strlen(name) != 9) return 0; + if (encver < 0 || encver > 1023) return 0; + + uint64_t id = 0; + for (int j = 0; j < 9; j++) { + char *p = strchr(cset,name[j]); + if (!p) return 0; + unsigned long pos = p-cset; + id = (id << 6) | pos; + } + id = (id << 10) | encver; + return id; +} + +/* Search, in the list of exported data types of all the modules registered, + * a type with the same name as the one given. Returns the moduleType + * structure pointer if such a module is found, or NULL otherwise. */ +moduleType *moduleTypeLookupModuleByName(const char *name) { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + listIter li; + listNode *ln; + + listRewind(module->types,&li); + while((ln = listNext(&li))) { + moduleType *mt = ln->value; + if (memcmp(name,mt->name,sizeof(mt->name)) == 0) { + dictReleaseIterator(di); + return mt; + } + } + } + dictReleaseIterator(di); + return NULL; +} + +/* Lookup a module by ID, with caching. This function is used during RDB + * loading. Modules exporting data types should never be able to unload, so + * our cache does not need to expire. */ +#define MODULE_LOOKUP_CACHE_SIZE 3 + +moduleType *moduleTypeLookupModuleByID(uint64_t id) { + static struct { + uint64_t id; + moduleType *mt; + } cache[MODULE_LOOKUP_CACHE_SIZE]; + + /* Search in cache to start. */ + int j; + for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE; j++) + if (cache[j].id == id) return cache[j].mt; + + /* Slow module by module lookup. */ + moduleType *mt = NULL; + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + listIter li; + listNode *ln; + + listRewind(module->types,&li); + while((ln = listNext(&li))) { + mt = ln->value; + /* Compare only the 54 bit module identifier and not the + * encoding version. */ + if (mt->id >> 10 == id >> 10) break; + } + } + dictReleaseIterator(di); + + /* Add to cache if possible. */ + if (mt && j < MODULE_LOOKUP_CACHE_SIZE) { + cache[j].id = id; + cache[j].mt = mt; + } + return mt; +} + +/* Turn an (unresolved) module ID into a type name, to show the user an + * error when RDB files contain module data we can't load. */ +void moduleTypeNameByID(char *name, uint64_t moduleid) { + const char *cset = ModuleTypeNameCharSet; + + name[0] = '\0'; + char *p = name+8; + moduleid >>= 10; + for (int j = 0; j < 9; j++) { + *p-- = cset[moduleid & 63]; + moduleid >>= 6; + } +} + +/* Register a new data type exported by the module. The parameters are the + * following. Please for in depth documentation check the modules API + * documentation, especially the INTRO.md file. + * + * * **name**: A 9 characters data type name that MUST be unique in the Redis + * Modules ecosystem. Be creative... and there will be no collisions. Use + * the charset A-Z a-z 9-0, plus the two "-_" characters. A good + * idea is to use, for example `-`. For example + * "tree-AntZ" may mean "Tree data structure by @antirez". To use both + * lower case and upper case letters helps in order to prevent collisions. + * * **encver**: Encoding version, which is, the version of the serialization + * that a module used in order to persist data. As long as the "name" + * matches, the RDB loading will be dispatched to the type callbacks + * whatever 'encver' is used, however the module can understand if + * the encoding it must load are of an older version of the module. + * For example the module "tree-AntZ" initially used encver=0. Later + * after an upgrade, it started to serialize data in a different format + * and to register the type with encver=1. However this module may + * still load old data produced by an older version if the rdb_load + * callback is able to check the encver value and act accordingly. + * The encver must be a positive value between 0 and 1023. + * * **rdb_load**: A callback function pointer that loads data from RDB files. + * * **rdb_save**: A callback function pointer that saves data to RDB files. + * * **aof_rewrite**: A callback function pointer that rewrites data as commands. + * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`. + * * **free**: A callback function pointer that can free a type value. + * + * Note: the module name "AAAAAAAAA" is reserved and produces an error, it + * happens to be pretty lame as well. + * + * If there is already a module registering a type with the same name, + * and if the module name or encver is invalid, NULL is returned. + * Otherwise the new type is registered into Redis, and a reference of + * type RedisModuleType is returned: the caller of the function should store + * this reference into a gobal variable to make future use of it in the + * modules type API, since a single module may register multiple types. + * Example code fragment: + * + * static RedisModuleType *BalancedTreeType; + * + * int RedisModule_OnLoad(RedisModuleCtx *ctx) { + * // some code here ... + * BalancedTreeType = RM_CreateDataType(...); + * } + */ +moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeLoadFunc rdb_load, moduleTypeSaveFunc rdb_save, moduleTypeRewriteFunc aof_rewrite, moduleTypeDigestFunc digest, moduleTypeFreeFunc free) { + uint64_t id = moduleTypeEncodeId(name,encver); + if (id == 0) return NULL; + if (moduleTypeLookupModuleByName(name) != NULL) return NULL; + + moduleType *mt = zmalloc(sizeof(*mt)); + mt->id = id; + mt->module = ctx->module; + mt->rdb_load = rdb_load; + mt->rdb_save = rdb_save; + mt->aof_rewrite = aof_rewrite; + mt->digest = digest; + mt->free = free; + memcpy(mt->name,name,sizeof(mt->name)); + listAddNodeTail(ctx->module->types,mt); + return mt; +} + +/* If the key is open for writing, set the specified module type object + * as the value of the key, deleting the old value if any. + * On success REDISMODULE_OK is returned. If the key is not open for + * writing or there is an active iterator, REDISMODULE_ERR is returned. */ +int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) { + if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; + RM_DeleteKey(key); + robj *o = createModuleObject(mt,value); + setKey(key->db,key->key,o); + decrRefCount(o); + key->value = o; + return REDISMODULE_OK; +} + +/* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on + * the key, returns the moduel type pointer of the value stored at key. + * + * If the key is NULL, is not associated with a module type, or is empty, + * then NULL is returned instead. */ +moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) { + if (key == NULL || + key->value == NULL || + RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL; + moduleValue *mv = key->value->ptr; + return mv->type; +} + +/* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on + * the key, returns the module type low-level value stored at key, as + * it was set by the user via RedisModule_ModuleTypeSet(). + * + * If the key is NULL, is not associated with a module type, or is empty, + * then NULL is returned instead. */ +void *RM_ModuleTypeGetValue(RedisModuleKey *key) { + if (key == NULL || + key->value == NULL || + RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL; + moduleValue *mv = key->value->ptr; + return mv->value; +} + +/* -------------------------------------------------------------------------- + * RDB loading and saving functions + * -------------------------------------------------------------------------- */ + +/* Called when there is a load error in the context of a module. This cannot + * be recovered like for the built-in types. */ +void moduleRDBLoadError(RedisModuleIO *io) { + serverLog(LL_WARNING, + "Error loading data from RDB (short read or EOF). " + "Read performed by module '%s' about type '%s' " + "after reading '%llu' bytes of a value.", + io->type->module->name, + io->type->name, + (unsigned long long)io->bytes); + exit(1); +} + +/* Save an unsigned 64 bit value into the RDB file. This function should only + * be called in the context of the rdb_save method of modules implementing new + * data types. */ +void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) { + if (io->error) return; + int retval = rdbSaveLen(io->rio, value); + if (retval == -1) { + io->error = 1; + } else { + io->bytes += retval; + } +} + +/* Load an unsigned 64 bit value from the RDB file. This function should only + * be called in the context of the rdb_load method of modules implementing + * new data types. */ +uint64_t RM_LoadUnsigned(RedisModuleIO *io) { + uint64_t value; + int retval = rdbLoadLenByRef(io->rio, NULL, &value); + if (retval == -1) { + moduleRDBLoadError(io); + return 0; /* Never reached. */ + } + return value; +} + +/* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */ +void RM_SaveSigned(RedisModuleIO *io, int64_t value) { + union {uint64_t u; int64_t i;} conv; + conv.i = value; + RM_SaveUnsigned(io,conv.u); +} + +/* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */ +int64_t RM_LoadSigned(RedisModuleIO *io) { + union {uint64_t u; int64_t i;} conv; + conv.u = RM_LoadUnsigned(io); + return conv.i; +} + +/* In the context of the rdb_save method of a module type, saves a + * string into the RDB file taking as input a RedisModuleString. + * + * The string can be later loaded with RedisModule_LoadString() or + * other Load family functions expecting a serialized string inside + * the RDB file. */ +void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) { + if (io->error) return; + int retval = rdbSaveStringObject(io->rio,s); + if (retval == -1) { + io->error = 1; + } else { + io->bytes += retval; + } +} + +/* Like RedisModule_SaveString() but takes a raw C pointer and length + * as input. */ +void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) { + if (io->error) return; + int retval = rdbSaveRawString(io->rio,(unsigned char*)str,len); + if (retval == -1) { + io->error = 1; + } else { + io->bytes += retval; + } +} + +/* Implements RM_LoadString() and RM_LoadStringBuffer() */ +void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { + void *s = rdbGenericLoadStringObject(io->rio, + plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr); + if (s == NULL) { + moduleRDBLoadError(io); + return NULL; /* Never reached. */ + } + return s; +} + +/* In the context of the rdb_load method of a module data type, loads a string + * from the RDB file, that was previously saved with RedisModule_SaveString() + * functions family. + * + * The returned string is a newly allocated RedisModuleString object, and + * the user should at some point free it with a call to RedisModule_FreeString(). + * + * If the data structure does not store strings as RedisModuleString objects, + * the similar function RedisModule_LoadStringBuffer() could be used instead. */ +RedisModuleString *RM_LoadString(RedisModuleIO *io) { + return moduleLoadString(io,0,NULL); +} + +/* Like RedisModule_LoadString() but returns an heap allocated string that + * was allocated with RedisModule_Alloc(), and can be resized or freed with + * RedisModule_Realloc() or RedisModule_Free(). + * + * The size of the string is stored at '*lenptr' if not NULL. + * The returned string is not automatically NULL termianted, it is loaded + * exactly as it was stored inisde the RDB file. */ +char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) { + return moduleLoadString(io,1,lenptr); +} + +/* In the context of the rdb_save method of a module data type, saves a double + * value to the RDB file. The double can be a valid number, a NaN or infinity. + * It is possible to load back the value with RedisModule_LoadDouble(). */ +void RM_SaveDouble(RedisModuleIO *io, double value) { + if (io->error) return; + int retval = rdbSaveBinaryDoubleValue(io->rio, value); + if (retval == -1) { + io->error = 1; + } else { + io->bytes += retval; + } +} + +/* In the context of the rdb_save method of a module data type, loads back the + * double value saved by RedisModule_SaveDouble(). */ +double RM_LoadDouble(RedisModuleIO *io) { + double value; + int retval = rdbLoadBinaryDoubleValue(io->rio, &value); + if (retval == -1) { + moduleRDBLoadError(io); + return 0; /* Never reached. */ + } + return value; +} + +/* -------------------------------------------------------------------------- + * AOF API for modules data types + * -------------------------------------------------------------------------- */ + +/* Emits a command into the AOF during the AOF rewriting process. This function + * is only called in the context of the aof_rewrite method of data types exported + * by a module. The command works exactly like RedisModule_Call() in the way + * the parameters are passed, but it does not return anything as the error + * handling is performed by Redis itself. */ +void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) { + if (io->error) return; + struct redisCommand *cmd; + robj **argv = NULL; + int argc = 0, flags = 0, j; + va_list ap; + + cmd = lookupCommandByCString((char*)cmdname); + if (!cmd) { + serverLog(LL_WARNING, + "Fatal: AOF method for module data type '%s' tried to " + "emit unknown command '%s'", + io->type->name, cmdname); + io->error = 1; + errno = EINVAL; + return; + } + + /* Emit the arguments into the AOF in Redis protocol format. */ + va_start(ap, fmt); + argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); + va_end(ap); + if (argv == NULL) { + serverLog(LL_WARNING, + "Fatal: AOF method for module data type '%s' tried to " + "call RedisModule_EmitAOF() with wrong format specifiers '%s'", + io->type->name, fmt); + io->error = 1; + errno = EINVAL; + return; + } + + /* Bulk count. */ + if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0) + io->error = 1; + + /* Arguments. */ + for (j = 0; j < argc; j++) { + if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0) + io->error = 1; + decrRefCount(argv[j]); + } + zfree(argv); + return; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -2315,6 +2790,10 @@ int moduleRegisterApi(const char *funcname, void *funcptr) { /* Register all the APIs we export. */ void moduleRegisterCoreAPI(void) { server.moduleapi = dictCreate(&moduleAPIDictType,NULL); + REGISTER_API(Alloc); + REGISTER_API(Realloc); + REGISTER_API(Free); + REGISTER_API(Strdup); REGISTER_API(CreateCommand); REGISTER_API(SetModuleAttribs); REGISTER_API(WrongArity); @@ -2379,6 +2858,21 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(KeyAtPos); REGISTER_API(GetClientId); REGISTER_API(PoolAlloc); + REGISTER_API(CreateDataType); + REGISTER_API(ModuleTypeSetValue); + REGISTER_API(ModuleTypeGetType); + REGISTER_API(ModuleTypeGetValue); + REGISTER_API(SaveUnsigned); + REGISTER_API(LoadUnsigned); + REGISTER_API(SaveSigned); + REGISTER_API(LoadSigned); + REGISTER_API(SaveString); + REGISTER_API(SaveStringBuffer); + REGISTER_API(LoadString); + REGISTER_API(LoadStringBuffer); + REGISTER_API(SaveDouble); + REGISTER_API(LoadDouble); + REGISTER_API(EmitAOF); } /* Global initialization at Redis startup. */ @@ -2414,6 +2908,7 @@ void moduleLoadFromQueue(void) { } void moduleFreeModuleStructure(struct RedisModule *module) { + listRelease(module->types); sdsfree(module->name); zfree(module); } @@ -2456,9 +2951,16 @@ int moduleLoad(const char *path) { * C_OK is returned, otherwise C_ERR is returned and errno is set * to the following values depending on the type of error: * - * ENONET: No such module having the specified name. */ + * ENONET: No such module having the specified name. + * EBUSY: The module exports a new data type and can only be reloaded. */ int moduleUnload(sds name) { struct RedisModule *module = dictFetchValue(modules,name); + + if (listLength(module->types)) { + errno = EBUSY; + return REDISMODULE_ERR; + } + if (module == NULL) { errno = ENOENT; return REDISMODULE_ERR; @@ -2497,9 +2999,7 @@ int moduleUnload(sds name) { /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); - - /* Free the module structure. */ - zfree(module); + moduleFreeModuleStructure(module); return REDISMODULE_OK; } @@ -2523,6 +3023,7 @@ void moduleCommand(client *c) { char *errmsg = "operation not possible."; switch(errno) { case ENOENT: errmsg = "no such module with that name"; + case EBUSY: errmsg = "the module exports one or more module-side data types, can't unload"; } addReplyErrorFormat(c,"Error unloading module: %s",errmsg); } diff --git a/src/modules/Makefile b/src/modules/Makefile index 0c91361a..ecac4683 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so +all: helloworld.so hellotype.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -23,5 +23,10 @@ helloworld.xo: ../redismodule.h helloworld.so: helloworld.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc +hellotype.xo: ../redismodule.h + +hellotype.so: hellotype.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + clean: rm -rf *.xo *.so diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c new file mode 100644 index 00000000..f688939f --- /dev/null +++ b/src/modules/hellotype.c @@ -0,0 +1,221 @@ +#include "../redismodule.h" +#include +#include +#include +#include +#include + +static RedisModuleType *HelloType; + +/* ========================== Internal data structure ======================= + * This is just a linked list of 64 bit integers where elements are inserted + * in-place, so it's ordered. There is no pop/push operation but just insert + * because it is enough to show the implementation of new data types without + * making things complex. */ + +struct HelloTypeNode { + int64_t value; + struct HelloTypeNode *next; +}; + +struct HelloTypeObject { + struct HelloTypeNode *head; + size_t len; /* Number of elements added. */ +}; + +struct HelloTypeObject *createHelloTypeObject(void) { + struct HelloTypeObject *o; + o = RedisModule_Alloc(sizeof(*o)); + o->head = NULL; + o->len = 0; + return o; +} + +void HelloTypeInsert(struct HelloTypeObject *o, int64_t ele) { + struct HelloTypeNode *next = o->head, *newnode, *prev = NULL; + + while(next && next->value < ele) { + prev = next; + next = next->next; + } + newnode = RedisModule_Alloc(sizeof(*newnode)); + newnode->value = ele; + newnode->next = next; + if (prev) { + prev->next = newnode; + } else { + o->head = newnode; + } + o->len++; +} + +void HelloTypeReleaseObject(struct HelloTypeObject *o) { + struct HelloTypeNode *cur, *next; + cur = o->head; + while(cur) { + next = cur->next; + RedisModule_Free(cur); + cur = next; + } + RedisModule_Free(o); +} + +/* ========================= "hellotype" type commands ======================= */ + +/* HELLOTYPE.INSERT key value */ +int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + + if (argc != 3) return RedisModule_WrongArity(ctx); + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + long long value; + if ((RedisModule_StringToLongLong(argv[2],&value) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx,"ERR invalid value: must be a signed 64 bit integer"); + } + + /* Create an empty value object if the key is currently empty. */ + struct HelloTypeObject *hto; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + hto = createHelloTypeObject(); + RedisModule_ModuleTypeSetValue(key,HelloType,hto); + } else { + hto = RedisModule_ModuleTypeGetValue(key); + } + + /* Insert the new element. */ + HelloTypeInsert(hto,value); + + RedisModule_ReplyWithLongLong(ctx,hto->len); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/* HELLOTYPE.RANGE key first count */ +int HelloTypeRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + + if (argc != 4) return RedisModule_WrongArity(ctx); + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + long long first, count; + if (RedisModule_StringToLongLong(argv[2],&first) != REDISMODULE_OK || + RedisModule_StringToLongLong(argv[3],&count) != REDISMODULE_OK || + first < 0 || count < 0) + { + return RedisModule_ReplyWithError(ctx, + "ERR invalid first or count parameters"); + } + + struct HelloTypeObject *hto = RedisModule_ModuleTypeGetValue(key); + struct HelloTypeNode *node = hto ? hto->head : NULL; + RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN); + long long arraylen = 0; + while(node && count--) { + RedisModule_ReplyWithLongLong(ctx,node->value); + arraylen++; + node = node->next; + } + RedisModule_ReplySetArrayLength(ctx,arraylen); + return REDISMODULE_OK; +} + +/* HELLOTYPE.LEN key */ +int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + + if (argc != 2) return RedisModule_WrongArity(ctx); + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + struct HelloTypeObject *hto = RedisModule_ModuleTypeGetValue(key); + RedisModule_ReplyWithLongLong(ctx,hto ? hto->len : 0); + return REDISMODULE_OK; +} + + +/* ========================== "hellotype" type methods ======================= */ + +void *HelloTypeRdbLoad(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + /* RedisModule_Log("warning","Can't load data with version %d", encver);*/ + return NULL; + } + uint64_t elements = RedisModule_LoadUnsigned(rdb); + struct HelloTypeObject *hto = createHelloTypeObject(); + while(elements--) { + int64_t ele = RedisModule_LoadSigned(rdb); + HelloTypeInsert(hto,ele); + } + return hto; +} + +void HelloTypeRdbSave(RedisModuleIO *rdb, void *value) { + struct HelloTypeObject *hto = value; + struct HelloTypeNode *node = hto->head; + RedisModule_SaveUnsigned(rdb,hto->len); + while(node) { + RedisModule_SaveSigned(rdb,node->value); + node = node->next; + } +} + +void HelloTypeAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) { + struct HelloTypeObject *hto = value; + struct HelloTypeNode *node = hto->head; + while(node) { + RedisModule_EmitAOF(aof,"HELLOTYPE.INSERT","sl",key,node->value); + node = node->next; + } +} + +void HelloTypeDigest(RedisModuleDigest *digest, void *value) { +} + +void HelloTypeFree(void *value) { + HelloTypeReleaseObject(value); +} + +/* This function must be present on each Redis module. It is used in order to + * register the commands into the Redis server. */ +int RedisModule_OnLoad(RedisModuleCtx *ctx) { + if (RedisModule_Init(ctx,"hellotype",1,REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + HelloType = RedisModule_CreateDataType(ctx,"hellotype",0,HelloTypeRdbLoad,HelloTypeRdbSave,HelloTypeAofRewrite,HelloTypeDigest,HelloTypeFree); + if (HelloType == NULL) return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"hellotype.insert", + HelloTypeInsert_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"hellotype.range", + HelloTypeRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"hellotype.len", + HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/src/object.c b/src/object.c index 167290d4..1d3d1566 100644 --- a/src/object.c +++ b/src/object.c @@ -221,6 +221,13 @@ robj *createZsetZiplistObject(void) { return o; } +robj *createModuleObject(moduleType *mt, void *value) { + moduleValue *mv = zmalloc(sizeof(*mv)); + mv->type = mt; + mv->value = value; + return createObject(OBJ_MODULE,mv); +} + void freeStringObject(robj *o) { if (o->encoding == OBJ_ENCODING_RAW) { sdsfree(o->ptr); @@ -281,6 +288,12 @@ void freeHashObject(robj *o) { } } +void freeModuleObject(robj *o) { + moduleValue *mv = o->ptr; + mv->type->free(mv->value); + zfree(mv); +} + void incrRefCount(robj *o) { if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++; } @@ -293,6 +306,7 @@ void decrRefCount(robj *o) { case OBJ_SET: freeSetObject(o); break; case OBJ_ZSET: freeZsetObject(o); break; case OBJ_HASH: freeHashObject(o); break; + case OBJ_MODULE: freeModuleObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); diff --git a/src/rdb.c b/src/rdb.c index c30bd9fb..f3c9c501 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -41,11 +41,6 @@ #include #include -#define RDB_LOAD_NONE 0 -#define RDB_LOAD_ENC (1<<0) -#define RDB_LOAD_PLAIN (1<<1) -#define RDB_LOAD_SDS (1<<2) - #define rdbExitReportCorruptRDB(reason) rdbCheckThenExit(reason, __LINE__); void rdbCheckThenExit(char *reason, int where) { @@ -213,7 +208,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) { /* Loads an integer-encoded object with the specified encoding type "enctype". * The returned value changes according to the flags, see * rdbGenerincLoadStringObject() for more info. */ -void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags) { +void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; int encode = flags & RDB_LOAD_ENC; @@ -240,6 +235,7 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags) { if (plain || sds) { char buf[LONG_STR_SIZE], *p; int len = ll2string(buf,sizeof(buf),val); + if (lenptr) *lenptr = len; p = plain ? zmalloc(len) : sdsnewlen(NULL,len); memcpy(p,buf,len); return p; @@ -315,7 +311,7 @@ ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { /* Load an LZF compressed string in RDB format. The returned value * changes according to 'flags'. For more info check the * rdbGenericLoadStringObject() function. */ -void *rdbLoadLzfStringObject(rio *rdb, int flags) { +void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; uint64_t len, clen; @@ -329,6 +325,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags) { /* Allocate our target according to the uncompressed size. */ if (plain) { val = zmalloc(len); + if (lenptr) *lenptr = len; } else { val = sdsnewlen(NULL,len); } @@ -427,8 +424,10 @@ int rdbSaveStringObject(rio *rdb, robj *obj) { * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc() * instead of a Redis object with an sds in it. * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. -*/ -void *rdbGenericLoadStringObject(rio *rdb, int flags) { + * + * On I/O error NULL is returned. + */ +void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { int encode = flags & RDB_LOAD_ENC; int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; @@ -441,9 +440,9 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags) { case RDB_ENC_INT8: case RDB_ENC_INT16: case RDB_ENC_INT32: - return rdbLoadIntegerObject(rdb,len,flags); + return rdbLoadIntegerObject(rdb,len,flags,lenptr); case RDB_ENC_LZF: - return rdbLoadLzfStringObject(rdb,flags); + return rdbLoadLzfStringObject(rdb,flags,lenptr); default: rdbExitReportCorruptRDB("Unknown RDB encoding type"); } @@ -452,6 +451,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags) { if (len == RDB_LENERR) return NULL; if (plain || sds) { void *buf = plain ? zmalloc(len) : sdsnewlen(NULL,len); + if (lenptr) *lenptr = len; if (len && rioRead(rdb,buf,len) == 0) { if (plain) zfree(buf); @@ -472,11 +472,11 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags) { } robj *rdbLoadStringObject(rio *rdb) { - return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE); + return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); } robj *rdbLoadEncodedStringObject(rio *rdb) { - return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC); + return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL); } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -541,14 +541,16 @@ int rdbLoadDoubleValue(rio *rdb, double *val) { /* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed. * We just make sure the integer is always stored in little endian, otherwise - * the value is copied verbatim from memory to disk. */ + * the value is copied verbatim from memory to disk. + * + * Return -1 on error, the size of the serialized value on success. */ int rdbSaveBinaryDoubleValue(rio *rdb, double val) { memrev64ifbe(&val); return rdbWriteRaw(rdb,&val,8); } /* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for - * more info. */ + * more info. On error -1 is returned, otherwise 0. */ int rdbLoadBinaryDoubleValue(rio *rdb, double *val) { if (rioRead(rdb,val,8) == 0) return -1; memrev64ifbe(val); @@ -586,6 +588,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) { return rdbSaveType(rdb,RDB_TYPE_HASH); else serverPanic("Unknown hash encoding"); + case OBJ_MODULE: + return rdbSaveType(rdb,RDB_TYPE_MODULE); default: serverPanic("Unknown object type"); } @@ -717,6 +721,22 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { serverPanic("Unknown hash encoding"); } + } else if (o->type == OBJ_MODULE) { + /* Save a module-specific value. */ + RedisModuleIO io; + moduleValue *mv = o->ptr; + moduleType *mt = mv->type; + moduleInitIOContext(io,mt,rdb); + + /* Write the "module" identifier as prefix, so that we'll be able + * to call the right module during loading. */ + int retval = rdbSaveLen(rdb,mt->id); + if (retval == -1) return -1; + io.bytes += retval; + + /* Then write the module-specific representation. */ + mt->rdb_save(&io,mv->value); + return io.error ? -1 : io.bytes; } else { serverPanic("Unknown object type"); } @@ -1055,8 +1075,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { long long llval; sds sdsele; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; if (o->encoding == OBJ_ENCODING_INTSET) { /* Fetch integer value from element. */ @@ -1092,8 +1112,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { double score; zskiplistNode *znode; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; @@ -1130,10 +1150,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) { len--; /* Load raw strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; /* Add pair to ziplist */ o->ptr = ziplistPush(o->ptr, (unsigned char*)field, @@ -1158,10 +1178,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { while (o->encoding == OBJ_ENCODING_HT && len > 0) { len--; /* Load encoded strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL) - return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) + == NULL) return NULL; /* Add pair to hash table */ ret = dictAdd((dict*)o->ptr, field, value); @@ -1179,7 +1199,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { server.list_compress_depth); while (len--) { - unsigned char *zl = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN); + unsigned char *zl = + rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); if (zl == NULL) return NULL; quicklistAppendZiplist(o->ptr, zl); } @@ -1189,7 +1210,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { rdbtype == RDB_TYPE_ZSET_ZIPLIST || rdbtype == RDB_TYPE_HASH_ZIPLIST) { - unsigned char *encoded = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN); + unsigned char *encoded = + rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); if (encoded == NULL) return NULL; o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */ @@ -1256,6 +1278,27 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { rdbExitReportCorruptRDB("Unknown encoding"); break; } + } else if (rdbtype == RDB_TYPE_MODULE) { + uint64_t moduleid = rdbLoadLen(rdb,NULL); + moduleType *mt = moduleTypeLookupModuleByID(moduleid); + char name[10]; + + if (mt == NULL) { + moduleTypeNameByID(name,moduleid); + serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name); + exit(1); + } + RedisModuleIO io; + moduleInitIOContext(io,mt,rdb); + /* Call the rdb_load method of the module providing the 10 bit + * encoding version in the lower 10 bits of the module ID. */ + void *ptr = mt->rdb_load(&io,moduleid&1023); + if (ptr == NULL) { + moduleTypeNameByID(name,moduleid); + serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); + exit(1); + } + o = createModuleObject(mt,ptr); } else { rdbExitReportCorruptRDB("Unknown object type"); } diff --git a/src/rdb.h b/src/rdb.h index 7ef6782d..a71ecb16 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -77,6 +77,7 @@ #define RDB_TYPE_ZSET 3 #define RDB_TYPE_HASH 4 #define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */ +#define RDB_TYPE_MODULE 6 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Object types for encoded objects. */ @@ -89,7 +90,7 @@ /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) ((t >= 0 && t <= 5) || (t >= 9 && t <= 14)) +#define rdbIsObjectType(t) ((t >= 0 && t <= 6) || (t >= 9 && t <= 14)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_AUX 250 @@ -99,12 +100,19 @@ #define RDB_OPCODE_SELECTDB 254 #define RDB_OPCODE_EOF 255 +/* rdbLoad...() functions flags. */ +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1<<0) +#define RDB_LOAD_PLAIN (1<<1) +#define RDB_LOAD_SDS (1<<2) + int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); time_t rdbLoadTime(rio *rdb); int rdbSaveLen(rio *rdb, uint64_t len); uint64_t rdbLoadLen(rio *rdb, int *isencoded); +int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename); @@ -118,5 +126,10 @@ robj *rdbLoadObject(int type, rio *rdb); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, long long now); robj *rdbLoadStringObject(rio *rdb); +int rdbSaveStringObject(rio *rdb, robj *obj); +ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); +void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr); +int rdbSaveBinaryDoubleValue(rio *rdb, double val); +int rdbLoadBinaryDoubleValue(rio *rdb, double *val); #endif diff --git a/src/redismodule.h b/src/redismodule.h index 80767ed4..0327487f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -28,6 +28,7 @@ #define REDISMODULE_KEYTYPE_HASH 3 #define REDISMODULE_KEYTYPE_SET 4 #define REDISMODULE_KEYTYPE_ZSET 5 +#define REDISMODULE_KEYTYPE_MODULE 6 /* Reply types. */ #define REDISMODULE_REPLY_UNKNOWN -1 @@ -78,14 +79,27 @@ typedef struct RedisModuleCtx RedisModuleCtx; typedef struct RedisModuleKey RedisModuleKey; typedef struct RedisModuleString RedisModuleString; typedef struct RedisModuleCallReply RedisModuleCallReply; +typedef struct RedisModuleIO RedisModuleIO; +typedef struct RedisModuleType RedisModuleType; +typedef struct RedisModuleDigest RedisModuleDigest; typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); +typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); +typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); +typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); +typedef void (*RedisModuleTypeFreeFunc)(void *value); + #define REDISMODULE_GET_API(name) \ RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name)) #define REDISMODULE_API_FUNC(x) (*x) +void *REDISMODULE_API_FUNC(RedisModule_Alloc)(size_t bytes); +void *REDISMODULE_API_FUNC(RedisModule_Realloc)(void *ptr, size_t bytes); +void REDISMODULE_API_FUNC(RedisModule_Free)(void *ptr); +char *REDISMODULE_API_FUNC(RedisModule_Strdup)(const char *str); int REDISMODULE_API_FUNC(RedisModule_GetApi)(const char *, void *); int REDISMODULE_API_FUNC(RedisModule_CreateCommand)(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep); int REDISMODULE_API_FUNC(RedisModule_SetModuleAttribs)(RedisModuleCtx *ctx, const char *name, int ver, int apiver); @@ -151,11 +165,30 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); +RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeLoadFunc rdb_load, RedisModuleTypeSaveFunc rdb_save, RedisModuleTypeRewriteFunc aof_rewrite, RedisModuleTypeDigestFunc digest, RedisModuleTypeFreeFunc free); +int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); +RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); +void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); +void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); +uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); +int64_t REDISMODULE_API_FUNC(RedisModule_LoadSigned)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_EmitAOF)(RedisModuleIO *io, const char *cmdname, const char *fmt, ...); +void REDISMODULE_API_FUNC(RedisModule_SaveString)(RedisModuleIO *io, RedisModuleString *s); +void REDISMODULE_API_FUNC(RedisModule_SaveStringBuffer)(RedisModuleIO *io, const char *str, size_t len); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_LoadString)(RedisModuleIO *io); +char *REDISMODULE_API_FUNC(RedisModule_LoadStringBuffer)(RedisModuleIO *io, size_t *lenptr); +void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double value); +double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io); /* This is included inline inside each Redis module. */ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) { void *getapifuncptr = ((void**)ctx)[0]; RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr; + REDISMODULE_GET_API(Alloc); + REDISMODULE_GET_API(Free); + REDISMODULE_GET_API(Realloc); + REDISMODULE_GET_API(Strdup); REDISMODULE_GET_API(CreateCommand); REDISMODULE_GET_API(SetModuleAttribs); REDISMODULE_GET_API(WrongArity); @@ -221,6 +254,21 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(KeyAtPos); REDISMODULE_GET_API(GetClientId); REDISMODULE_GET_API(PoolAlloc); + REDISMODULE_GET_API(CreateDataType); + REDISMODULE_GET_API(ModuleTypeSetValue); + REDISMODULE_GET_API(ModuleTypeGetType); + REDISMODULE_GET_API(ModuleTypeGetValue); + REDISMODULE_GET_API(SaveUnsigned); + REDISMODULE_GET_API(LoadUnsigned); + REDISMODULE_GET_API(SaveSigned); + REDISMODULE_GET_API(LoadSigned); + REDISMODULE_GET_API(SaveString); + REDISMODULE_GET_API(SaveStringBuffer); + REDISMODULE_GET_API(LoadString); + REDISMODULE_GET_API(LoadStringBuffer); + REDISMODULE_GET_API(SaveDouble); + REDISMODULE_GET_API(LoadDouble); + REDISMODULE_GET_API(EmitAOF); RedisModule_SetModuleAttribs(ctx,name,ver,apiver); return REDISMODULE_OK; diff --git a/src/rio.h b/src/rio.h index 711308ce..6749723d 100644 --- a/src/rio.h +++ b/src/rio.h @@ -135,6 +135,9 @@ size_t rioWriteBulkString(rio *r, const char *buf, size_t len); size_t rioWriteBulkLongLong(rio *r, long long l); size_t rioWriteBulkDouble(rio *r, double d); +struct redisObject; +int rioWriteBulkObject(rio *r, struct redisObject *obj); + void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); void rioSetAutoSync(rio *r, off_t bytes); diff --git a/src/server.h b/src/server.h index 07cada62..e5e4ea23 100644 --- a/src/server.h +++ b/src/server.h @@ -33,6 +33,7 @@ #include "fmacros.h" #include "config.h" #include "solarisfixes.h" +#include "rio.h" #include #include @@ -421,6 +422,90 @@ typedef long long mstime_t; /* millisecond time type. */ #define OBJ_ZSET 3 #define OBJ_HASH 4 +/* The "module" object type is a special one that signals that the object + * is one directly managed by a Redis module. In this case the value points + * to a moduleValue struct, which contains the object value (which is only + * handled by the module itself) and the RedisModuleType struct which lists + * function pointers in order to serialize, deserialize, AOF-rewrite and + * free the object. + * + * Inside the RDB file, module types are encoded as OBJ_MODULE followed + * by a 64 bit module type ID, which has a 54 bits module-specific signature + * in order to dispatch the loading to the right module, plus a 10 bits + * encoding version. */ +#define OBJ_MODULE 5 + +/* Extract encver / signature from a module type ID. */ +#define REDISMODULE_TYPE_ENCVER_BITS 10 +#define REDISMODULE_TYPE_ENCVER_MASK ((1<>REDISMODULE_TYPE_ENCVER_BITS) + +struct RedisModule; +struct RedisModuleIO; +struct RedisModuleDigest; +struct redisObject; + +/* Each module type implementation should export a set of methods in order + * to serialize and deserialize the value in the RDB file, rewrite the AOF + * log, create the digest for "DEBUG DIGEST", and free the value when a key + * is deleted. */ +typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver); +typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value); +typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value); +typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value); +typedef void (*moduleTypeFreeFunc)(void *value); + +/* The module type, which is referenced in each value of a given type, defines + * the methods and links to the module exporting the type. */ +typedef struct RedisModuleType { + uint64_t id; /* Higher 54 bits of type ID + 10 lower bits of encoding ver. */ + struct RedisModule *module; + moduleTypeLoadFunc rdb_load; + moduleTypeSaveFunc rdb_save; + moduleTypeRewriteFunc aof_rewrite; + moduleTypeDigestFunc digest; + moduleTypeFreeFunc free; + char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ +} moduleType; + +/* In Redis objects 'robj' structures of type OBJ_MODULE, the value pointer + * is set to the following structure, referencing the moduleType structure + * in order to work with the value, and at the same time providing a raw + * pointer to the value, as created by the module commands operating with + * the module type. + * + * So for example in order to free such a value, it is possible to use + * the following code: + * + * if (robj->type == OBJ_MODULE) { + * moduleValue *mt = robj->ptr; + * mt->type->free(mt->value); + * zfree(mt); // We need to release this in-the-middle struct as well. + * } + */ +typedef struct moduleValue { + moduleType *type; + void *value; +} moduleValue; + +/* This is a wrapper for the 'rio' streams used inside rdb.c in Redis, so that + * the user does not have to take the total count of the written bytes nor + * to care about error conditions. */ +typedef struct RedisModuleIO { + size_t bytes; /* Bytes read / written so far. */ + rio *rio; /* Rio stream. */ + moduleType *type; /* Module type doing the operation. */ + int error; /* True if error condition happened. */ +} RedisModuleIO; + +#define moduleInitIOContext(iovar,mtype,rioptr) do { \ + iovar.rio = rioptr; \ + iovar.type = mtype; \ + iovar.bytes = 0; \ + iovar.error = 0; \ +} while(0); + /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ @@ -1074,6 +1159,8 @@ void moduleInitModulesSystem(void); int moduleLoad(const char *path); void moduleLoadFromQueue(void); int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); +moduleType *moduleTypeLookupModuleByID(uint64_t id); +void moduleTypeNameByID(char *name, uint64_t moduleid); /* Utils */ long long ustime(void); @@ -1207,6 +1294,7 @@ robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); robj *createZsetZiplistObject(void); +robj *createModuleObject(moduleType *mt, void *value); int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg); int checkType(client *c, robj *o, int type); int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg);