diff --git a/src/module.c b/src/module.c index 347ddabf..0e7fedc1 100644 --- a/src/module.c +++ b/src/module.c @@ -65,6 +65,10 @@ struct RedisModuleKey { robj *value; /* Value object, or NULL if the key was not found. */ void *iter; /* Iterator. */ int mode; /* Opening mode. */ + /* Zset iterator. */ + RedisModuleZsetRange *zr; /* Zset iterator range passed by user. */ + void *zcurrent; /* Zset iterator current node. */ + int zer; /* Zset iterator end reached flag (true if end was reached). */ }; typedef struct RedisModuleKey RedisModuleKey; @@ -115,6 +119,7 @@ void RM_CloseKey(RedisModuleKey *key); void RM_AutoMemoryCollect(RedisModuleCtx *ctx); robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap); void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); +void RM_ZsetRangeStop(RedisModuleKey *key); /* -------------------------------------------------------------------------- * Helpers for modules API implementation @@ -380,7 +385,7 @@ const char *RM_StringPtrLen(RedisModuleString *str, size_t *len) { return str->ptr; } -/* Turn the string into a long long, storing it at *ll if not NULL. +/* Turn the string into a long long, storing it at *ll. * Returns REDISMODULE_OK on success. If the string can't be parsed * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR * is returned. */ @@ -389,6 +394,14 @@ int RM_StringToLongLong(RedisModuleString *str, long long *ll) { REDISMODULE_ERR; } +/* Turn the string into a double, storing it at *d. + * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is + * not a valid string representation of a double value. */ +int RM_StringToDouble(RedisModuleString *str, double *d) { + int retval = getDoubleFromObject(str,d); + return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; +} + /* -------------------------------------------------------------------------- * Reply APIs * @@ -480,6 +493,12 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { return REDISMODULE_OK; } +/* Send a string reply obtained converting the double 'd' into a string. */ +int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { + addReplyDouble(ctx->client,d); + return REDISMODULE_OK; +} + /* -------------------------------------------------------------------------- * Commands replication API * -------------------------------------------------------------------------- */ @@ -592,6 +611,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { kp->value = value; kp->iter = NULL; kp->mode = mode; + RM_ZsetRangeStop(kp); RM_AutoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } @@ -971,6 +991,119 @@ int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) { return REDISMODULE_OK; } +/* -------------------------------------------------------------------------- + * Key API for Sorted Set iterator + * -------------------------------------------------------------------------- */ + +/* Stop a sorted set iteration. */ +void RM_ZsetRangeStop(RedisModuleKey *key) { + /* Setup sensible values so that misused iteration API calls when an + * iterator is not active will result into something more sensible + * than crashing. */ + key->zr = NULL; + key->zcurrent = NULL; + key->zer = 1; +} + +/* Return the "End of range" flag value to signal the end of the iteration. */ +int RM_ZsetRangeEndReached(RedisModuleKey *key) { + return key->zer; +} + +/* Setup a sorted set iterator seeking the first element in the specified + * range. Returns REDISMODULE_OK if the iterator was correctly initialized + * otherwise REDISMODULE_ERR is returned in the following conditions: + * + * 1. The value stored at key is not a sorted set or the key is empty. + * 2. The iterator type is unrecognized. */ +int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { + if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; + key->zr = zr; + key->zcurrent = NULL; + key->zer = 0; + + if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) { + /* Setup the range structure used by the sorted set core implementation + * in order to seek at the specified element. */ + zrangespec zrs; + zrs.min = zr->score_start; + zrs.max = zr->score_end; + zrs.minex = (zr->flags & REDISMODULE_ZSET_RANGE_START_EX) != 0; + zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0; + + if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { + key->zcurrent = zzlFirstInRange(key->value->ptr,&zrs); + } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = key->value->ptr; + zskiplist *zsl = zs->zsl; + key->zcurrent = zslFirstInRange(zsl,&zrs); + } else { + serverPanic("Unsupported zset encoding"); + } + if (key->zcurrent == NULL) key->zer = 1; + return REDISMODULE_OK; + } else { + return REDISMODULE_ERR; + } +} + +/* Return the current sorted set element of an active sorted set iterator + * or NULL if the range specified in the iterator does not include any + * element. */ +RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) { + if (key->zcurrent == NULL) return NULL; + if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *eptr, *sptr; + eptr = key->zcurrent; + sds ele = ziplistGetObject(eptr); + if (score) { + sptr = ziplistNext(key->value->ptr,eptr); + *score = zzlGetScore(sptr); + } + return createObject(OBJ_STRING,ele); + } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { + zskiplistNode *ln = key->zcurrent; + if (score) *score = ln->score; + return createStringObject(ln->ele,sdslen(ln->ele)); + } else { + serverPanic("Unsupported zset encoding"); + } +} + +/* Go to the next element of the sorted set iterator. Returns 1 if there was + * a next element, 0 if we are already at the latest element or the range + * does not include any item at all. */ +int RM_ZsetRangeNext(RedisModuleKey *key) { + if (!key->zr || !key->zcurrent) return 0; /* No active iterator. */ + if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = key->value->ptr; + unsigned char *eptr = key->zcurrent; + unsigned char *next; + next = ziplistNext(zl,eptr); /* Skip element. */ + if (next) next = ziplistNext(zl,next); /* Skip score. */ + if (next == NULL) { + key->zer = 1; + return 0; + } else { + /* TODO: check if we are in range. */ + key->zcurrent = next; + return 1; + } + } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { + zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward; + if (next == NULL) { + key->zer = 1; + return 0; + } else { + /* TODO: check if we are in range. */ + key->zcurrent = next; + return 1; + } + } else { + serverPanic("Unsupported zset encoding"); + } +} + /* -------------------------------------------------------------------------- * Redis <-> Modules generic Call() API * -------------------------------------------------------------------------- */ @@ -1385,6 +1518,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ReplyWithStringBuffer); REGISTER_API(ReplyWithNull); REGISTER_API(ReplyWithCallReply); + REGISTER_API(ReplyWithDouble); REGISTER_API(GetSelectedDb); REGISTER_API(SelectDb); REGISTER_API(OpenKey); @@ -1394,6 +1528,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ListPush); REGISTER_API(ListPop); REGISTER_API(StringToLongLong); + REGISTER_API(StringToDouble); REGISTER_API(Call); REGISTER_API(CallReplyProto); REGISTER_API(FreeCallReply); @@ -1420,6 +1555,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); REGISTER_API(ZsetRem); + REGISTER_API(ZsetRangeStop); + REGISTER_API(ZsetFirstInRange); + REGISTER_API(ZsetRangeCurrentElement); + REGISTER_API(ZsetRangeNext); + REGISTER_API(ZsetRangeEndReached); } /* Global initialization at Redis startup. */ diff --git a/src/modules/helloworld.c b/src/modules/helloworld.c index 7480415e..5f436609 100644 --- a/src/modules/helloworld.c +++ b/src/modules/helloworld.c @@ -326,6 +326,39 @@ int HelloMoreExpire_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, return RedisModule_ReplyWithSimpleString(ctx,"OK"); } +/* HELLO.ZSUMRANGE key startscore endscore + * Return the sum of all the scores elements between startscore and endscore. */ +int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModuleZsetRange zrange = REDISMODULE_ZSET_RANGE_INIT; + zrange.type = REDISMODULE_ZSET_RANGE_SCORE; + if (RedisModule_StringToDouble(argv[2],&zrange.score_start) != REDISMODULE_OK || + RedisModule_StringToDouble(argv[3],&zrange.score_end) != REDISMODULE_OK) + { + return RedisModule_ReplyWithError(ctx,"ERR invalid range"); + } + zrange.flags = 0; + + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_ZSET) { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + RedisModule_ZsetFirstInRange(key,&zrange); + double scoresum = 0; + while(!RedisModule_ZsetRangeEndReached(key)) { + double score; + RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score); + RedisModule_FreeString(ctx,ele); + scoresum += score; + RedisModule_ZsetRangeNext(key); + } + RedisModule_ZsetRangeStop(key); + RedisModule_CloseKey(key); + RedisModule_ReplyWithDouble(ctx,scoresum); + return REDISMODULE_OK; +} + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx) { @@ -380,5 +413,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) { HelloMoreExpire_RedisCommand) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hello.zsumrange", + HelloZsumRange_RedisCommand) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/src/redismodule.h b/src/redismodule.h index fcc53e50..42058b99 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -2,6 +2,7 @@ #define REDISMODULE_H #include +#include #include /* ---------------- Defines common between core and modules --------------- */ @@ -49,6 +50,31 @@ /* Error messages. */ #define REDISMODULE_ERRORMSG_WRONGTYPE "WRONGTYPE Operation against a key holding the wrong kind of value" +/* Sorted set range structure. */ +typedef struct RedisModuleZsetRange { + uint32_t type; + uint32_t flags; + double score_start; + double score_end; + char *lex_start; + char *lex_end; + uint32_t lex_start_len; + uint32_t lex_end_len; + uint32_t pos_start; + uint32_t pos_end; +} RedisModuleZsetRange; + +#define REDISMODULE_POSITIVE_INFINITE (1.0/0.0) +#define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0) + +#define REDISMODULE_ZSET_RANGE_INIT {0,0,REDISMODULE_NEGATIVE_INFINITE,REDISMODULE_POSITIVE_INFINITE,"-","+",1,1,0,-1} +#define REDISMODULE_ZSET_RANGE_LEX 1 +#define REDISMODULE_ZSET_RANGE_SCORE 2 +#define REDISMODULE_ZSET_RANGE_POS 3 + +#define REDISMODULE_ZSET_RANGE_START_EX (1<<0) +#define REDISMODULE_ZSET_RANGE_END_EX (1<<1) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -98,8 +124,10 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, int le int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); +int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(RedisModuleString *str, long long *ll); +int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(RedisModuleString *str, double *d); void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...); int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx); @@ -115,6 +143,11 @@ int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted); +void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key); +int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score); +int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key); +int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key); /* This is included inline inside each Redis module. */ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) { @@ -131,6 +164,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ReplyWithString); REDISMODULE_GET_API(ReplyWithNull); REDISMODULE_GET_API(ReplyWithCallReply); + REDISMODULE_GET_API(ReplyWithDouble); REDISMODULE_GET_API(GetSelectedDb); REDISMODULE_GET_API(SelectDb); REDISMODULE_GET_API(OpenKey); @@ -140,6 +174,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ListPush); REDISMODULE_GET_API(ListPop); REDISMODULE_GET_API(StringToLongLong); + REDISMODULE_GET_API(StringToDouble); REDISMODULE_GET_API(Call); REDISMODULE_GET_API(CallReplyProto); REDISMODULE_GET_API(FreeCallReply); @@ -166,6 +201,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); REDISMODULE_GET_API(ZsetRem); + REDISMODULE_GET_API(ZsetRangeStop); + REDISMODULE_GET_API(ZsetFirstInRange); + REDISMODULE_GET_API(ZsetRangeCurrentElement); + REDISMODULE_GET_API(ZsetRangeNext); + REDISMODULE_GET_API(ZsetRangeEndReached); RedisModule_SetModuleAttribs(ctx,name,ver,apiver); return REDISMODULE_OK; diff --git a/src/server.h b/src/server.h index aa42b100..9cd8661f 100644 --- a/src/server.h +++ b/src/server.h @@ -1235,6 +1235,7 @@ int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg); int checkType(client *c, robj *o, int type); int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg); int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg); +int getDoubleFromObject(robj *o, double *target); int getLongLongFromObject(robj *o, long long *target); int getLongDoubleFromObject(robj *o, long double *target); int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg); @@ -1333,6 +1334,7 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range); double zzlGetScore(unsigned char *sptr); void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range); unsigned int zsetLength(robj *zobj); void zsetConvert(robj *zobj, int encoding); void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen); @@ -1341,6 +1343,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o); int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore); long zsetRank(robj *zobj, sds ele, int reverse); int zsetDel(robj *zobj, sds ele); +sds ziplistGetObject(unsigned char *sptr); /* Core functions */ int freeMemoryIfNeeded(void);