mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Modules: sorted set iterators WIP.
This commit is contained in:
parent
556d593d37
commit
eac5a13cb7
142
src/module.c
142
src/module.c
@ -65,6 +65,10 @@ struct RedisModuleKey {
|
||||
robj *value; /* Value object, or NULL if the key was not found. */
|
||||
void *iter; /* Iterator. */
|
||||
int mode; /* Opening mode. */
|
||||
/* Zset iterator. */
|
||||
RedisModuleZsetRange *zr; /* Zset iterator range passed by user. */
|
||||
void *zcurrent; /* Zset iterator current node. */
|
||||
int zer; /* Zset iterator end reached flag (true if end was reached). */
|
||||
};
|
||||
typedef struct RedisModuleKey RedisModuleKey;
|
||||
|
||||
@ -115,6 +119,7 @@ void RM_CloseKey(RedisModuleKey *key);
|
||||
void RM_AutoMemoryCollect(RedisModuleCtx *ctx);
|
||||
robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
|
||||
void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
|
||||
void RM_ZsetRangeStop(RedisModuleKey *key);
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Helpers for modules API implementation
|
||||
@ -380,7 +385,7 @@ const char *RM_StringPtrLen(RedisModuleString *str, size_t *len) {
|
||||
return str->ptr;
|
||||
}
|
||||
|
||||
/* Turn the string into a long long, storing it at *ll if not NULL.
|
||||
/* Turn the string into a long long, storing it at *ll.
|
||||
* Returns REDISMODULE_OK on success. If the string can't be parsed
|
||||
* as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
|
||||
* is returned. */
|
||||
@ -389,6 +394,14 @@ int RM_StringToLongLong(RedisModuleString *str, long long *ll) {
|
||||
REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* Turn the string into a double, storing it at *d.
|
||||
* Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
|
||||
* not a valid string representation of a double value. */
|
||||
int RM_StringToDouble(RedisModuleString *str, double *d) {
|
||||
int retval = getDoubleFromObject(str,d);
|
||||
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Reply APIs
|
||||
*
|
||||
@ -480,6 +493,12 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Send a string reply obtained converting the double 'd' into a string. */
|
||||
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
|
||||
addReplyDouble(ctx->client,d);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Commands replication API
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -592,6 +611,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
|
||||
kp->value = value;
|
||||
kp->iter = NULL;
|
||||
kp->mode = mode;
|
||||
RM_ZsetRangeStop(kp);
|
||||
RM_AutoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
|
||||
return (void*)kp;
|
||||
}
|
||||
@ -971,6 +991,119 @@ int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Key API for Sorted Set iterator
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Stop a sorted set iteration. */
|
||||
void RM_ZsetRangeStop(RedisModuleKey *key) {
|
||||
/* Setup sensible values so that misused iteration API calls when an
|
||||
* iterator is not active will result into something more sensible
|
||||
* than crashing. */
|
||||
key->zr = NULL;
|
||||
key->zcurrent = NULL;
|
||||
key->zer = 1;
|
||||
}
|
||||
|
||||
/* Return the "End of range" flag value to signal the end of the iteration. */
|
||||
int RM_ZsetRangeEndReached(RedisModuleKey *key) {
|
||||
return key->zer;
|
||||
}
|
||||
|
||||
/* Setup a sorted set iterator seeking the first element in the specified
|
||||
* range. Returns REDISMODULE_OK if the iterator was correctly initialized
|
||||
* otherwise REDISMODULE_ERR is returned in the following conditions:
|
||||
*
|
||||
* 1. The value stored at key is not a sorted set or the key is empty.
|
||||
* 2. The iterator type is unrecognized. */
|
||||
int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
|
||||
if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
|
||||
key->zr = zr;
|
||||
key->zcurrent = NULL;
|
||||
key->zer = 0;
|
||||
|
||||
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) {
|
||||
/* Setup the range structure used by the sorted set core implementation
|
||||
* in order to seek at the specified element. */
|
||||
zrangespec zrs;
|
||||
zrs.min = zr->score_start;
|
||||
zrs.max = zr->score_end;
|
||||
zrs.minex = (zr->flags & REDISMODULE_ZSET_RANGE_START_EX) != 0;
|
||||
zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0;
|
||||
|
||||
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
key->zcurrent = zzlFirstInRange(key->value->ptr,&zrs);
|
||||
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zset *zs = key->value->ptr;
|
||||
zskiplist *zsl = zs->zsl;
|
||||
key->zcurrent = zslFirstInRange(zsl,&zrs);
|
||||
} else {
|
||||
serverPanic("Unsupported zset encoding");
|
||||
}
|
||||
if (key->zcurrent == NULL) key->zer = 1;
|
||||
return REDISMODULE_OK;
|
||||
} else {
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the current sorted set element of an active sorted set iterator
|
||||
* or NULL if the range specified in the iterator does not include any
|
||||
* element. */
|
||||
RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
|
||||
if (key->zcurrent == NULL) return NULL;
|
||||
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
unsigned char *eptr, *sptr;
|
||||
eptr = key->zcurrent;
|
||||
sds ele = ziplistGetObject(eptr);
|
||||
if (score) {
|
||||
sptr = ziplistNext(key->value->ptr,eptr);
|
||||
*score = zzlGetScore(sptr);
|
||||
}
|
||||
return createObject(OBJ_STRING,ele);
|
||||
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zskiplistNode *ln = key->zcurrent;
|
||||
if (score) *score = ln->score;
|
||||
return createStringObject(ln->ele,sdslen(ln->ele));
|
||||
} else {
|
||||
serverPanic("Unsupported zset encoding");
|
||||
}
|
||||
}
|
||||
|
||||
/* Go to the next element of the sorted set iterator. Returns 1 if there was
|
||||
* a next element, 0 if we are already at the latest element or the range
|
||||
* does not include any item at all. */
|
||||
int RM_ZsetRangeNext(RedisModuleKey *key) {
|
||||
if (!key->zr || !key->zcurrent) return 0; /* No active iterator. */
|
||||
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
unsigned char *zl = key->value->ptr;
|
||||
unsigned char *eptr = key->zcurrent;
|
||||
unsigned char *next;
|
||||
next = ziplistNext(zl,eptr); /* Skip element. */
|
||||
if (next) next = ziplistNext(zl,next); /* Skip score. */
|
||||
if (next == NULL) {
|
||||
key->zer = 1;
|
||||
return 0;
|
||||
} else {
|
||||
/* TODO: check if we are in range. */
|
||||
key->zcurrent = next;
|
||||
return 1;
|
||||
}
|
||||
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
|
||||
if (next == NULL) {
|
||||
key->zer = 1;
|
||||
return 0;
|
||||
} else {
|
||||
/* TODO: check if we are in range. */
|
||||
key->zcurrent = next;
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
serverPanic("Unsupported zset encoding");
|
||||
}
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Redis <-> Modules generic Call() API
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -1385,6 +1518,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(ReplyWithStringBuffer);
|
||||
REGISTER_API(ReplyWithNull);
|
||||
REGISTER_API(ReplyWithCallReply);
|
||||
REGISTER_API(ReplyWithDouble);
|
||||
REGISTER_API(GetSelectedDb);
|
||||
REGISTER_API(SelectDb);
|
||||
REGISTER_API(OpenKey);
|
||||
@ -1394,6 +1528,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(ListPush);
|
||||
REGISTER_API(ListPop);
|
||||
REGISTER_API(StringToLongLong);
|
||||
REGISTER_API(StringToDouble);
|
||||
REGISTER_API(Call);
|
||||
REGISTER_API(CallReplyProto);
|
||||
REGISTER_API(FreeCallReply);
|
||||
@ -1420,6 +1555,11 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(ZsetIncrby);
|
||||
REGISTER_API(ZsetScore);
|
||||
REGISTER_API(ZsetRem);
|
||||
REGISTER_API(ZsetRangeStop);
|
||||
REGISTER_API(ZsetFirstInRange);
|
||||
REGISTER_API(ZsetRangeCurrentElement);
|
||||
REGISTER_API(ZsetRangeNext);
|
||||
REGISTER_API(ZsetRangeEndReached);
|
||||
}
|
||||
|
||||
/* Global initialization at Redis startup. */
|
||||
|
@ -326,6 +326,39 @@ int HelloMoreExpire_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
return RedisModule_ReplyWithSimpleString(ctx,"OK");
|
||||
}
|
||||
|
||||
/* HELLO.ZSUMRANGE key startscore endscore
|
||||
* Return the sum of all the scores elements between startscore and endscore. */
|
||||
int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
RedisModuleZsetRange zrange = REDISMODULE_ZSET_RANGE_INIT;
|
||||
zrange.type = REDISMODULE_ZSET_RANGE_SCORE;
|
||||
if (RedisModule_StringToDouble(argv[2],&zrange.score_start) != REDISMODULE_OK ||
|
||||
RedisModule_StringToDouble(argv[3],&zrange.score_end) != REDISMODULE_OK)
|
||||
{
|
||||
return RedisModule_ReplyWithError(ctx,"ERR invalid range");
|
||||
}
|
||||
zrange.flags = 0;
|
||||
|
||||
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
|
||||
REDISMODULE_READ|REDISMODULE_WRITE);
|
||||
if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_ZSET) {
|
||||
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
|
||||
}
|
||||
|
||||
RedisModule_ZsetFirstInRange(key,&zrange);
|
||||
double scoresum = 0;
|
||||
while(!RedisModule_ZsetRangeEndReached(key)) {
|
||||
double score;
|
||||
RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score);
|
||||
RedisModule_FreeString(ctx,ele);
|
||||
scoresum += score;
|
||||
RedisModule_ZsetRangeNext(key);
|
||||
}
|
||||
RedisModule_ZsetRangeStop(key);
|
||||
RedisModule_CloseKey(key);
|
||||
RedisModule_ReplyWithDouble(ctx,scoresum);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* This function must be present on each Redis module. It is used in order to
|
||||
* register the commands into the Redis server. */
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx) {
|
||||
@ -380,5 +413,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) {
|
||||
HelloMoreExpire_RedisCommand) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"hello.zsumrange",
|
||||
HelloZsumRange_RedisCommand) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#define REDISMODULE_H
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
|
||||
/* ---------------- Defines common between core and modules --------------- */
|
||||
@ -49,6 +50,31 @@
|
||||
/* Error messages. */
|
||||
#define REDISMODULE_ERRORMSG_WRONGTYPE "WRONGTYPE Operation against a key holding the wrong kind of value"
|
||||
|
||||
/* Sorted set range structure. */
|
||||
typedef struct RedisModuleZsetRange {
|
||||
uint32_t type;
|
||||
uint32_t flags;
|
||||
double score_start;
|
||||
double score_end;
|
||||
char *lex_start;
|
||||
char *lex_end;
|
||||
uint32_t lex_start_len;
|
||||
uint32_t lex_end_len;
|
||||
uint32_t pos_start;
|
||||
uint32_t pos_end;
|
||||
} RedisModuleZsetRange;
|
||||
|
||||
#define REDISMODULE_POSITIVE_INFINITE (1.0/0.0)
|
||||
#define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0)
|
||||
|
||||
#define REDISMODULE_ZSET_RANGE_INIT {0,0,REDISMODULE_NEGATIVE_INFINITE,REDISMODULE_POSITIVE_INFINITE,"-","+",1,1,0,-1}
|
||||
#define REDISMODULE_ZSET_RANGE_LEX 1
|
||||
#define REDISMODULE_ZSET_RANGE_SCORE 2
|
||||
#define REDISMODULE_ZSET_RANGE_POS 3
|
||||
|
||||
#define REDISMODULE_ZSET_RANGE_START_EX (1<<0)
|
||||
#define REDISMODULE_ZSET_RANGE_END_EX (1<<1)
|
||||
|
||||
/* ------------------------- End of common defines ------------------------ */
|
||||
|
||||
#ifndef REDISMODULE_CORE
|
||||
@ -98,8 +124,10 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, int le
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply);
|
||||
int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(RedisModuleString *str, long long *ll);
|
||||
int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(RedisModuleString *str, double *d);
|
||||
void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx);
|
||||
int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx);
|
||||
@ -115,6 +143,11 @@ int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score,
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted);
|
||||
void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr);
|
||||
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key);
|
||||
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key);
|
||||
|
||||
/* This is included inline inside each Redis module. */
|
||||
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
|
||||
@ -131,6 +164,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(ReplyWithString);
|
||||
REDISMODULE_GET_API(ReplyWithNull);
|
||||
REDISMODULE_GET_API(ReplyWithCallReply);
|
||||
REDISMODULE_GET_API(ReplyWithDouble);
|
||||
REDISMODULE_GET_API(GetSelectedDb);
|
||||
REDISMODULE_GET_API(SelectDb);
|
||||
REDISMODULE_GET_API(OpenKey);
|
||||
@ -140,6 +174,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(ListPush);
|
||||
REDISMODULE_GET_API(ListPop);
|
||||
REDISMODULE_GET_API(StringToLongLong);
|
||||
REDISMODULE_GET_API(StringToDouble);
|
||||
REDISMODULE_GET_API(Call);
|
||||
REDISMODULE_GET_API(CallReplyProto);
|
||||
REDISMODULE_GET_API(FreeCallReply);
|
||||
@ -166,6 +201,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(ZsetIncrby);
|
||||
REDISMODULE_GET_API(ZsetScore);
|
||||
REDISMODULE_GET_API(ZsetRem);
|
||||
REDISMODULE_GET_API(ZsetRangeStop);
|
||||
REDISMODULE_GET_API(ZsetFirstInRange);
|
||||
REDISMODULE_GET_API(ZsetRangeCurrentElement);
|
||||
REDISMODULE_GET_API(ZsetRangeNext);
|
||||
REDISMODULE_GET_API(ZsetRangeEndReached);
|
||||
|
||||
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
|
||||
return REDISMODULE_OK;
|
||||
|
@ -1235,6 +1235,7 @@ int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
|
||||
int checkType(client *c, robj *o, int type);
|
||||
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg);
|
||||
int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg);
|
||||
int getDoubleFromObject(robj *o, double *target);
|
||||
int getLongLongFromObject(robj *o, long long *target);
|
||||
int getLongDoubleFromObject(robj *o, long double *target);
|
||||
int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg);
|
||||
@ -1333,6 +1334,7 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range);
|
||||
double zzlGetScore(unsigned char *sptr);
|
||||
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
|
||||
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
|
||||
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range);
|
||||
unsigned int zsetLength(robj *zobj);
|
||||
void zsetConvert(robj *zobj, int encoding);
|
||||
void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen);
|
||||
@ -1341,6 +1343,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
|
||||
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
|
||||
long zsetRank(robj *zobj, sds ele, int reverse);
|
||||
int zsetDel(robj *zobj, sds ele);
|
||||
sds ziplistGetObject(unsigned char *sptr);
|
||||
|
||||
/* Core functions */
|
||||
int freeMemoryIfNeeded(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user