From 439120c62076718e8f7e7e602c623febaec6f04a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 6 Sep 2017 13:11:47 +0200 Subject: [PATCH] Streams: implement stream object release. --- src/object.c | 5 +++++ src/rax.c | 18 +++++++++++++----- src/rax.h | 1 + src/server.h | 1 + src/t_stream.c | 5 +++++ 5 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/object.c b/src/object.c index 8eeb5c6c..b689edcf 100644 --- a/src/object.c +++ b/src/object.c @@ -310,6 +310,10 @@ void freeModuleObject(robj *o) { zfree(mv); } +void freeStreamObject(robj *o) { + freeStream(o->ptr); +} + void incrRefCount(robj *o) { if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++; } @@ -323,6 +327,7 @@ void decrRefCount(robj *o) { case OBJ_ZSET: freeZsetObject(o); break; case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; + case OBJ_STREAM: freeStreamObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); diff --git a/src/rax.c b/src/rax.c index 3ead27ed..442e7bfe 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1093,28 +1093,36 @@ int raxRemove(rax *rax, unsigned char *s, size_t len, void **old) { /* This is the core of raxFree(): performs a depth-first scan of the * tree and releases all the nodes found. */ -void raxRecursiveFree(rax *rax, raxNode *n) { +void raxRecursiveFree(rax *rax, raxNode *n, void (*free_callback)(void*)) { debugnode("free traversing",n); int numchildren = n->iscompr ? 1 : n->size; raxNode **cp = raxNodeLastChildPtr(n); while(numchildren--) { raxNode *child; memcpy(&child,cp,sizeof(child)); - raxRecursiveFree(rax,child); + raxRecursiveFree(rax,child,free_callback); cp--; } debugnode("free depth-first",n); + if (free_callback && n->iskey && !n->isnull) + free_callback(raxGetData(n)); rax_free(n); rax->numnodes--; } -/* Free a whole radix tree. */ -void raxFree(rax *rax) { - raxRecursiveFree(rax,rax->head); +/* Free a whole radix tree, calling the specified callback in order to + * free the auxiliary data. */ +void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)) { + raxRecursiveFree(rax,rax->head,free_callback); assert(rax->numnodes == 0); rax_free(rax); } +/* Free a whole radix tree. */ +void raxFree(rax *rax) { + raxFreeWithCallback(rax,NULL); +} + /* ------------------------------- Iterator --------------------------------- */ /* Initialize a Rax iterator. This call should be performed a single time diff --git a/src/rax.h b/src/rax.h index e22b6e69..b4e2fd91 100644 --- a/src/rax.h +++ b/src/rax.h @@ -148,6 +148,7 @@ int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old); int raxRemove(rax *rax, unsigned char *s, size_t len, void **old); void *raxFind(rax *rax, unsigned char *s, size_t len); void raxFree(rax *rax); +void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)); void raxStart(raxIterator *it, rax *rt); int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len); int raxNext(raxIterator *it); diff --git a/src/server.h b/src/server.h index c934d7f6..8ea18341 100644 --- a/src/server.h +++ b/src/server.h @@ -1419,6 +1419,7 @@ void signalListAsReady(redisDb *db, robj *key); /* Stream data type. */ stream *streamNew(void); +void freeStream(stream *s); /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); diff --git a/src/t_stream.c b/src/t_stream.c index 3474d478..52b0e105 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -51,6 +51,11 @@ stream *streamNew(void) { return s; } +/* Free a stream, including the listpacks stored inside the radix tree. */ +void freeStream(stream *s) { + raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the