From a7c5be18a81c120b4bdeb139072f27c899fe1a4d Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Tue, 4 Aug 2015 09:20:55 +0200
Subject: [PATCH] Lazyfree: Sorted sets convereted to plain SDS. (several
 commits squashed)

---
 src/aof.c     |   8 +-
 src/cluster.c |   5 +-
 src/db.c      |  24 +--
 src/debug.c   |   9 +-
 src/geo.c     |  20 +--
 src/rdb.c     |  27 ++-
 src/server.c  |  10 +-
 src/server.h  |  17 +-
 src/sort.c    |  10 +-
 src/t_zset.c  | 442 +++++++++++++++++++++++++++-----------------------
 10 files changed, 306 insertions(+), 266 deletions(-)

diff --git a/src/aof.c b/src/aof.c
index 7a80a935..3ab6c85b 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -826,7 +826,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
         dictEntry *de;
 
         while((de = dictNext(di)) != NULL) {
-            robj *eleobj = dictGetKey(de);
+            sds ele = dictGetKey(de);
             if (count == 0) {
                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                     AOF_REWRITE_ITEMS_PER_CMD : items;
@@ -835,7 +835,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
                 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
                 if (rioWriteBulkObject(r,key) == 0) return 0;
             }
-            if (rioWriteBulkObject(r,eleobj) == 0) return 0;
+            if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
             items--;
         }
@@ -892,7 +892,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
         dictEntry *de;
 
         while((de = dictNext(di)) != NULL) {
-            robj *eleobj = dictGetKey(de);
+            sds ele = dictGetKey(de);
             double *score = dictGetVal(de);
 
             if (count == 0) {
@@ -904,7 +904,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
                 if (rioWriteBulkObject(r,key) == 0) return 0;
             }
             if (rioWriteBulkDouble(r,*score) == 0) return 0;
-            if (rioWriteBulkObject(r,eleobj) == 0) return 0;
+            if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
             items--;
         }
diff --git a/src/cluster.c b/src/cluster.c
index 4c87a456..6fdc2217 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4087,7 +4087,10 @@ void clusterCommand(client *c) {
         keys = zmalloc(sizeof(robj*)*maxkeys);
         numkeys = getKeysInSlot(slot, keys, maxkeys);
         addReplyMultiBulkLen(c,numkeys);
-        for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
+        for (j = 0; j < numkeys; j++) {
+            addReplyBulk(c,keys[j]);
+            decrRefCount(keys[j]);
+        }
         zfree(keys);
     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
         /* CLUSTER FORGET <NODE ID> */
diff --git a/src/db.c b/src/db.c
index 92a59082..aa7be75e 100644
--- a/src/db.c
+++ b/src/db.c
@@ -423,8 +423,8 @@ void scanCallback(void *privdata, const dictEntry *de) {
         val = dictGetVal(de);
         incrRefCount(val);
     } else if (o->type == OBJ_ZSET) {
-        key = dictGetKey(de);
-        incrRefCount(key);
+        sds keysds = dictGetKey(de);
+        key = createStringObject(keysds,sdslen(keysds));
         val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
     } else {
         serverPanic("Type not handled in SCAN callback.");
@@ -1181,14 +1181,13 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
 void slotToKeyAdd(robj *key) {
     unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
 
-    zslInsert(server.cluster->slots_to_keys,hashslot,key);
-    incrRefCount(key);
+    sds sdskey = sdsdup(key->ptr);
+    zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
 }
 
 void slotToKeyDel(robj *key) {
     unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
-
-    zslDelete(server.cluster->slots_to_keys,hashslot,key);
+    zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
 }
 
 void slotToKeyFlush(void) {
@@ -1196,6 +1195,9 @@ void slotToKeyFlush(void) {
     server.cluster->slots_to_keys = zslCreate();
 }
 
+/* Pupulate the specified array of objects with keys in the specified slot.
+ * New objects are returned to represent keys, it's up to the caller to
+ * decrement the reference count to release the keys names. */
 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
     zskiplistNode *n;
     zrangespec range;
@@ -1206,7 +1208,7 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
 
     n = zslFirstInRange(server.cluster->slots_to_keys, &range);
     while(n && n->score == hashslot && count--) {
-        keys[j++] = n->obj;
+        keys[j++] = createStringObject(n->ele,sdslen(n->ele));
         n = n->level[0].forward;
     }
     return j;
@@ -1224,9 +1226,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
 
     n = zslFirstInRange(server.cluster->slots_to_keys, &range);
     while(n && n->score == hashslot) {
-        robj *key = n->obj;
+        sds sdskey = n->ele;
+        robj *key = createStringObject(sdskey,sdslen(sdskey));
         n = n->level[0].forward; /* Go to the next item before freeing it. */
-        incrRefCount(key); /* Protect the object while freeing it. */
         dbDelete(&server.db[0],key);
         decrRefCount(key);
         j++;
@@ -1248,7 +1250,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
 
     /* Use rank of first element, if any, to determine preliminary count */
     if (zn != NULL) {
-        rank = zslGetRank(zsl, zn->score, zn->obj);
+        rank = zslGetRank(zsl, zn->score, zn->ele);
         count = (zsl->length - (rank - 1));
 
         /* Find last element in range */
@@ -1256,7 +1258,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
 
         /* Use rank of last element, if any, to determine the actual count */
         if (zn != NULL) {
-            rank = zslGetRank(zsl, zn->score, zn->obj);
+            rank = zslGetRank(zsl, zn->score, zn->ele);
             count -= (zsl->length - rank);
         }
     }
diff --git a/src/debug.c b/src/debug.c
index 61c1c3c8..fb13f798 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -163,12 +163,9 @@ void computeDatasetDigest(unsigned char *final) {
                 listTypeReleaseIterator(li);
             } else if (o->type == OBJ_SET) {
                 setTypeIterator *si = setTypeInitIterator(o);
-                robj *ele;
                 sds sdsele;
                 while((sdsele = setTypeNextObject(si)) != NULL) {
-                    ele = createObject(OBJ_STRING,sdsele);
-                    xorObjectDigest(digest,ele);
-                    decrRefCount(ele);
+                    xorDigest(digest,sdsele,sdslen(sdsele));
                 }
                 setTypeReleaseIterator(si);
             } else if (o->type == OBJ_ZSET) {
@@ -210,12 +207,12 @@ void computeDatasetDigest(unsigned char *final) {
                     dictEntry *de;
 
                     while((de = dictNext(di)) != NULL) {
-                        robj *eleobj = dictGetKey(de);
+                        sds sdsele = dictGetKey(de);
                         double *score = dictGetVal(de);
 
                         snprintf(buf,sizeof(buf),"%.17g",*score);
                         memset(eledigest,0,20);
-                        mixObjectDigest(eledigest,eleobj);
+                        mixDigest(eledigest,sdsele,sdslen(sdsele));
                         mixDigest(eledigest,buf,strlen(buf));
                         xorDigest(digest,eledigest,20);
                     }
diff --git a/src/geo.c b/src/geo.c
index ff507fe7..d0381ce5 100644
--- a/src/geo.c
+++ b/src/geo.c
@@ -112,7 +112,7 @@ int extractLongLatOrReply(client *c, robj **argv,
 int longLatFromMember(robj *zobj, robj *member, double *xy) {
     double score = 0;
 
-    if (zsetScore(zobj, member, &score) == C_ERR) return C_ERR;
+    if (zsetScore(zobj, member->ptr, &score) == C_ERR) return C_ERR;
     if (!decodeGeohash(score, xy)) return C_ERR;
     return C_OK;
 }
@@ -261,16 +261,14 @@ int geoGetPointsInRange(robj *zobj, double min, double max, double lon, double l
         }
 
         while (ln) {
-            robj *o = ln->obj;
+            sds ele = ln->ele;
             /* Abort when the node is no longer in range. */
             if (!zslValueLteMax(ln->score, &range))
                 break;
 
-            member = (o->encoding == OBJ_ENCODING_INT) ?
-                        sdsfromlonglong((long)o->ptr) :
-                        sdsdup(o->ptr);
-            if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,member)
-                == C_ERR) sdsfree(member);
+            ele = sdsdup(ele);
+            if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,ele)
+                == C_ERR) sdsfree(ele);
             ln = ln->level[0].forward;
         }
     }
@@ -606,7 +604,7 @@ void geohashCommand(client *c) {
     addReplyMultiBulkLen(c,c->argc-2);
     for (j = 2; j < c->argc; j++) {
         double score;
-        if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
+        if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
             addReply(c,shared.nullbulk);
         } else {
             /* The internal format we use for geocoding is a bit different
@@ -660,7 +658,7 @@ void geoposCommand(client *c) {
     addReplyMultiBulkLen(c,c->argc-2);
     for (j = 2; j < c->argc; j++) {
         double score;
-        if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
+        if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
             addReply(c,shared.nullmultibulk);
         } else {
             /* Decode... */
@@ -700,8 +698,8 @@ void geodistCommand(client *c) {
 
     /* Get the scores. We need both otherwise NULL is returned. */
     double score1, score2, xyxy[4];
-    if (zsetScore(zobj, c->argv[2], &score1) == C_ERR ||
-        zsetScore(zobj, c->argv[3], &score2) == C_ERR)
+    if (zsetScore(zobj, c->argv[2]->ptr, &score1) == C_ERR ||
+        zsetScore(zobj, c->argv[3]->ptr, &score2) == C_ERR)
     {
         addReply(c,shared.nullbulk);
         return;
diff --git a/src/rdb.c b/src/rdb.c
index 3d98a767..61ac354b 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -622,10 +622,11 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
             nwritten += n;
 
             while((de = dictNext(di)) != NULL) {
-                robj *eleobj = dictGetKey(de);
+                sds ele = dictGetKey(de);
                 double *score = dictGetVal(de);
 
-                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
+                if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
+                    == -1) return -1;
                 nwritten += n;
                 if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
                 nwritten += n;
@@ -992,7 +993,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
                 return NULL;
 
             if (o->encoding == OBJ_ENCODING_INTSET) {
-                /* Fetch integer value from element */
+                /* Fetch integer value from element. */
                 if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
                     o->ptr = intsetAdd(o->ptr,llval,NULL);
                 } else {
@@ -1002,7 +1003,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
             }
 
             /* This will also be called when the set was just converted
-             * to a regular hash table encoded set */
+             * to a regular hash table encoded set. */
             if (o->encoding == OBJ_ENCODING_HT) {
                 dictAdd((dict*)o->ptr,sdsele,NULL);
             } else {
@@ -1010,7 +1011,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
             }
         }
     } else if (rdbtype == RDB_TYPE_ZSET) {
-        /* Read list/set value */
+        /* Read list/set value. */
         size_t zsetlen;
         size_t maxelelen = 0;
         zset *zs;
@@ -1019,23 +1020,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
         o = createZsetObject();
         zs = o->ptr;
 
-        /* Load every single element of the list/set */
+        /* Load every single element of the sorted set. */
         while(zsetlen--) {
-            robj *ele;
+            sds sdsele;
             double score;
             zskiplistNode *znode;
 
-            if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
-            ele = tryObjectEncoding(ele);
+            if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL)
+                return NULL;
             if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
 
             /* Don't care about integer-encoded strings. */
-            if (sdsEncodedObject(ele) && sdslen(ele->ptr) > maxelelen)
-                maxelelen = sdslen(ele->ptr);
+            if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
 
-            znode = zslInsert(zs->zsl,score,ele);
-            dictAdd(zs->dict,ele,&znode->score);
-            incrRefCount(ele); /* added to skiplist */
+            znode = zslInsert(zs->zsl,score,sdsele);
+            dictAdd(zs->dict,sdsele,&znode->score);
         }
 
         /* Convert *after* loading, since sorted sets are not stored ordered. */
diff --git a/src/server.c b/src/server.c
index bd45e7d2..bbec0296 100644
--- a/src/server.c
+++ b/src/server.c
@@ -554,11 +554,11 @@ dictType setDictType = {
 
 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
 dictType zsetDictType = {
-    dictEncObjHash,            /* hash function */
+    dictSdsHash,               /* hash function */
     NULL,                      /* key dup */
     NULL,                      /* val dup */
-    dictEncObjKeyCompare,      /* key compare */
-    dictObjectDestructor,      /* key destructor */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* Note: SDS string shared & freed by skiplist */
     NULL                       /* val destructor */
 };
 
@@ -1428,8 +1428,8 @@ void createSharedObjects(void) {
      * actually used for their value but as a special object meaning
      * respectively the minimum possible string and the maximum possible
      * string in string comparisons for the ZRANGEBYLEX command. */
-    shared.minstring = createStringObject("minstring",9);
-    shared.maxstring = createStringObject("maxstring",9);
+    shared.minstring = sdsnew("minstring");
+    shared.maxstring = sdsnew("maxstring");
 }
 
 void initServerConfig(void) {
diff --git a/src/server.h b/src/server.h
index 50899f8f..ff5d6432 100644
--- a/src/server.h
+++ b/src/server.h
@@ -612,16 +612,17 @@ struct sharedObjectsStruct {
     *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
     *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
     *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
-    *lpush, *emptyscan, *minstring, *maxstring,
+    *lpush, *emptyscan,
     *select[PROTO_SHARED_SELECT_CMDS],
     *integers[OBJ_SHARED_INTEGERS],
     *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
     *bulkhdr[OBJ_SHARED_BULKHDR_LEN];  /* "$<value>\r\n" */
+    sds minstring, maxstring;
 };
 
 /* ZSETs use a specialized version of Skiplists */
 typedef struct zskiplistNode {
-    robj *obj;
+    sds ele;
     double score;
     struct zskiplistNode *backward;
     struct zskiplistLevel {
@@ -1261,15 +1262,15 @@ typedef struct {
 
 /* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */
 typedef struct {
-    robj *min, *max;  /* May be set to shared.(minstring|maxstring) */
+    sds min, max;     /* May be set to shared.(minstring|maxstring) */
     int minex, maxex; /* are min or max exclusive? */
 } zlexrangespec;
 
 zskiplist *zslCreate(void);
 void zslFree(zskiplist *zsl);
-zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
-unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
-int zslDelete(zskiplist *zsl, double score, robj *obj);
+zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele);
+unsigned char *zzlInsert(unsigned char *zl, sds ele, double score);
+int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node);
 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range);
 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range);
 double zzlGetScore(unsigned char *sptr);
@@ -1277,8 +1278,8 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
 unsigned int zsetLength(robj *zobj);
 void zsetConvert(robj *zobj, int encoding);
-int zsetScore(robj *zobj, robj *member, double *score);
-unsigned long zslGetRank(zskiplist *zsl, double score, robj *o);
+int zsetScore(robj *zobj, sds member, double *score);
+unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
 
 /* Core functions */
 int freeMemoryIfNeeded(void);
diff --git a/src/sort.c b/src/sort.c
index af185dbd..536302bd 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -399,7 +399,7 @@ void sortCommand(client *c) {
         zset *zs = sortval->ptr;
         zskiplist *zsl = zs->zsl;
         zskiplistNode *ln;
-        robj *ele;
+        sds sdsele;
         int rangelen = vectorlen;
 
         /* Check if starting point is trivial, before doing log(N) lookup. */
@@ -417,8 +417,8 @@ void sortCommand(client *c) {
 
         while(rangelen--) {
             serverAssertWithInfo(c,sortval,ln != NULL);
-            ele = ln->obj;
-            vector[j].obj = ele;
+            sdsele = ln->ele;
+            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
             vector[j].u.score = 0;
             vector[j].u.cmpobj = NULL;
             j++;
@@ -431,9 +431,11 @@ void sortCommand(client *c) {
         dict *set = ((zset*)sortval->ptr)->dict;
         dictIterator *di;
         dictEntry *setele;
+        sds sdsele;
         di = dictGetIterator(set);
         while((setele = dictNext(di)) != NULL) {
-            vector[j].obj = dictGetKey(setele);
+            sdsele =  dictGetKey(setele);
+            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
             vector[j].u.score = 0;
             vector[j].u.cmpobj = NULL;
             j++;
diff --git a/src/t_zset.c b/src/t_zset.c
index 772b06ef..84bea5aa 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -38,9 +38,16 @@
  *
  * The elements are added to a hash table mapping Redis objects to scores.
  * At the same time the elements are added to a skip list mapping scores
- * to Redis objects (so objects are sorted by scores in this "view"). */
-
-/* This skiplist implementation is almost a C translation of the original
+ * to Redis objects (so objects are sorted by scores in this "view").
+ *
+ * Note that the SDS string representing the element is the same in both
+ * the hash table and skiplist in order to save memory. What we do in order
+ * to manage the shared SDS string more easily is to free the SDS string
+ * only in zslFreeNode(). The dictionary has no value free method set.
+ * So we should always remove an element from the dictionary, and later from
+ * the skiplist.
+ *
+ * This skiplist implementation is almost a C translation of the original
  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
  * Alternative to Balanced Trees", modified in three ways:
  * a) this implementation allows for repeated scores.
@@ -52,16 +59,24 @@
 #include "server.h"
 #include <math.h>
 
-static int zslLexValueGteMin(robj *value, zlexrangespec *spec);
-static int zslLexValueLteMax(robj *value, zlexrangespec *spec);
+/*-----------------------------------------------------------------------------
+ * Skiplist implementation of the low level API
+ *----------------------------------------------------------------------------*/
 
-zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
-    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
+static int zslLexValueGteMin(sds value, zlexrangespec *spec);
+static int zslLexValueLteMax(sds value, zlexrangespec *spec);
+
+/* Create a skiplist node with the specified number of levels.
+ * The SDS string 'ele' is referenced by the node after the call. */
+zskiplistNode *zslCreateNode(int level, double score, sds ele) {
+    zskiplistNode *zn =
+        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
     zn->score = score;
-    zn->obj = obj;
+    zn->ele = ele;
     return zn;
 }
 
+/* Create a new skiplist. */
 zskiplist *zslCreate(void) {
     int j;
     zskiplist *zsl;
@@ -79,11 +94,15 @@ zskiplist *zslCreate(void) {
     return zsl;
 }
 
+/* Free the specified skiplist node. The referenced SDS string representation
+ * of the element is freed too, unless node->ele is set to NULL before calling
+ * this function. */
 void zslFreeNode(zskiplistNode *node) {
-    decrRefCount(node->obj);
+    sdsfree(node->ele);
     zfree(node);
 }
 
+/* Free a whole skiplist. */
 void zslFree(zskiplist *zsl) {
     zskiplistNode *node = zsl->header->level[0].forward, *next;
 
@@ -107,7 +126,10 @@ int zslRandomLevel(void) {
     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
 }
 
-zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
+/* Insert a new node in the skiplist. Assumes the element does not already
+ * exist (up to the caller to enforce that). The skiplist takes ownership
+ * of the passed SDS string 'ele'. */
+zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     unsigned int rank[ZSKIPLIST_MAXLEVEL];
     int i, level;
@@ -118,18 +140,19 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
         /* store rank that is crossed to reach the insert position */
         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
         while (x->level[i].forward &&
-            (x->level[i].forward->score < score ||
-                (x->level[i].forward->score == score &&
-                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
+                (x->level[i].forward->score < score ||
+                    (x->level[i].forward->score == score &&
+                    sdscmp(x->level[i].forward->ele,ele) < 0)))
+        {
             rank[i] += x->level[i].span;
             x = x->level[i].forward;
         }
         update[i] = x;
     }
-    /* we assume the key is not already inside, since we allow duplicated
-     * scores, and the re-insertion of score and redis object should never
-     * happen since the caller of zslInsert() should test in the hash table
-     * if the element is already inside or not. */
+    /* we assume the element is not already inside, since we allow duplicated
+     * scores, reinserting the same element should never happen since the
+     * caller of zslInsert() should test in the hash table if the element is
+     * already inside or not. */
     level = zslRandomLevel();
     if (level > zsl->level) {
         for (i = zsl->level; i < level; i++) {
@@ -139,7 +162,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
         }
         zsl->level = level;
     }
-    x = zslCreateNode(level,score,obj);
+    x = zslCreateNode(level,score,ele);
     for (i = 0; i < level; i++) {
         x->level[i].forward = update[i]->level[i].forward;
         update[i]->level[i].forward = x;
@@ -184,26 +207,38 @@ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
     zsl->length--;
 }
 
-/* Delete an element with matching score/object from the skiplist. */
-int zslDelete(zskiplist *zsl, double score, robj *obj) {
+/* Delete an element with matching score/element from the skiplist.
+ * The function returns 1 if the node was found and deleted, otherwise
+ * 0 is returned.
+ *
+ * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
+ * it is not freed (but just unlinked) and *node is set to the node pointer,
+ * so that it is possible for the caller to reuse the node (including the
+ * referenced SDS string at node->ele). */
+int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     int i;
 
     x = zsl->header;
     for (i = zsl->level-1; i >= 0; i--) {
         while (x->level[i].forward &&
-            (x->level[i].forward->score < score ||
-                (x->level[i].forward->score == score &&
-                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
+                (x->level[i].forward->score < score ||
+                    (x->level[i].forward->score == score &&
+                     sdscmp(x->level[i].forward->ele,ele) < 0)))
+        {
             x = x->level[i].forward;
+        }
         update[i] = x;
     }
     /* We may have multiple elements with the same score, what we need
      * is to find the element with both the right score and object. */
     x = x->level[0].forward;
-    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
+    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
         zslDeleteNode(zsl, x, update);
-        zslFreeNode(x);
+        if (!node)
+            zslFreeNode(x);
+        else
+            *node = x;
         return 1;
     }
     return 0; /* not found */
@@ -312,8 +347,8 @@ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dic
     {
         zskiplistNode *next = x->level[0].forward;
         zslDeleteNode(zsl,x,update);
-        dictDelete(dict,x->obj);
-        zslFreeNode(x);
+        dictDelete(dict,x->ele);
+        zslFreeNode(x); /* Here is where x->ele is actually released. */
         removed++;
         x = next;
     }
@@ -329,7 +364,7 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di
     x = zsl->header;
     for (i = zsl->level-1; i >= 0; i--) {
         while (x->level[i].forward &&
-            !zslLexValueGteMin(x->level[i].forward->obj,range))
+            !zslLexValueGteMin(x->level[i].forward->ele,range))
                 x = x->level[i].forward;
         update[i] = x;
     }
@@ -338,11 +373,11 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di
     x = x->level[0].forward;
 
     /* Delete nodes while in range. */
-    while (x && zslLexValueLteMax(x->obj,range)) {
+    while (x && zslLexValueLteMax(x->ele,range)) {
         zskiplistNode *next = x->level[0].forward;
         zslDeleteNode(zsl,x,update);
-        dictDelete(dict,x->obj);
-        zslFreeNode(x);
+        dictDelete(dict,x->ele);
+        zslFreeNode(x); /* Here is where x->ele is actually released. */
         removed++;
         x = next;
     }
@@ -370,7 +405,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
     while (x && traversed <= end) {
         zskiplistNode *next = x->level[0].forward;
         zslDeleteNode(zsl,x,update);
-        dictDelete(dict,x->obj);
+        dictDelete(dict,x->ele);
         zslFreeNode(x);
         removed++;
         traversed++;
@@ -383,7 +418,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
  * Returns 0 when the element cannot be found, rank otherwise.
  * Note that the rank is 1-based due to the span of zsl->header to the
  * first element. */
-unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
+unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
     zskiplistNode *x;
     unsigned long rank = 0;
     int i;
@@ -393,13 +428,13 @@ unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
         while (x->level[i].forward &&
             (x->level[i].forward->score < score ||
                 (x->level[i].forward->score == score &&
-                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
+                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
             rank += x->level[i].span;
             x = x->level[i].forward;
         }
 
         /* x might be equal to zsl->header, so test if obj is non-NULL */
-        if (x->obj && equalStringObjects(x->obj,o)) {
+        if (x->ele && sdscmp(x->ele,ele) == 0) {
             return rank;
         }
     }
@@ -478,7 +513,7 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
   *
   * If the string is not a valid range C_ERR is returned, and the value
   * of *dest and *ex is undefined. */
-int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
+int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
     char *c = item->ptr;
 
     switch(c[0]) {
@@ -486,27 +521,34 @@ int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
         if (c[1] != '\0') return C_ERR;
         *ex = 0;
         *dest = shared.maxstring;
-        incrRefCount(shared.maxstring);
         return C_OK;
     case '-':
         if (c[1] != '\0') return C_ERR;
         *ex = 0;
         *dest = shared.minstring;
-        incrRefCount(shared.minstring);
         return C_OK;
     case '(':
         *ex = 1;
-        *dest = createStringObject(c+1,sdslen(c)-1);
+        *dest = sdsnewlen(c+1,sdslen(c)-1);
         return C_OK;
     case '[':
         *ex = 0;
-        *dest = createStringObject(c+1,sdslen(c)-1);
+        *dest = sdsnewlen(c+1,sdslen(c)-1);
         return C_OK;
     default:
         return C_ERR;
     }
 }
 
+/* Free a lex range structure, must be called only after zelParseLexRange()
+ * populated the structure with success (C_OK returned). */
+void zslFreeLexRange(zlexrangespec *spec) {
+    if (spec->min != shared.minstring &&
+        spec->min != shared.maxstring) sdsfree(spec->min);
+    if (spec->max != shared.minstring &&
+        spec->max != shared.maxstring) sdsfree(spec->max);
+}
+
 /* Populate the rangespec according to the objects min and max.
  *
  * Return C_OK on success. On error C_ERR is returned.
@@ -521,42 +563,33 @@ static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
     spec->min = spec->max = NULL;
     if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
         zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
-        if (spec->min) decrRefCount(spec->min);
-        if (spec->max) decrRefCount(spec->max);
+        zslFreeLexRange(spec);
         return C_ERR;
     } else {
         return C_OK;
     }
 }
 
-/* Free a lex range structure, must be called only after zelParseLexRange()
- * populated the structure with success (C_OK returned). */
-void zslFreeLexRange(zlexrangespec *spec) {
-    decrRefCount(spec->min);
-    decrRefCount(spec->max);
-}
-
-/* This is just a wrapper to compareStringObjects() that is able to
+/* This is just a wrapper to sdscmp() that is able to
  * handle shared.minstring and shared.maxstring as the equivalent of
  * -inf and +inf for strings */
-int compareStringObjectsForLexRange(robj *a, robj *b) {
-    if (a == b) return 0; /* This makes sure that we handle inf,inf and
-                             -inf,-inf ASAP. One special case less. */
+int sdscmplex(sds a, sds b) {
+    if (a == b) return 0;
     if (a == shared.minstring || b == shared.maxstring) return -1;
     if (a == shared.maxstring || b == shared.minstring) return 1;
-    return compareStringObjects(a,b);
+    return sdscmp(a,b);
 }
 
-static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
+static int zslLexValueGteMin(sds value, zlexrangespec *spec) {
     return spec->minex ?
-        (compareStringObjectsForLexRange(value,spec->min) > 0) :
-        (compareStringObjectsForLexRange(value,spec->min) >= 0);
+        (sdscmplex(value,spec->min) > 0) :
+        (sdscmplex(value,spec->min) >= 0);
 }
 
-static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
+static int zslLexValueLteMax(sds value, zlexrangespec *spec) {
     return spec->maxex ?
-        (compareStringObjectsForLexRange(value,spec->max) < 0) :
-        (compareStringObjectsForLexRange(value,spec->max) <= 0);
+        (sdscmplex(value,spec->max) < 0) :
+        (sdscmplex(value,spec->max) <= 0);
 }
 
 /* Returns if there is a part of the zset is in the lex range. */
@@ -564,15 +597,15 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
     zskiplistNode *x;
 
     /* Test for ranges that will always be empty. */
-    if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
-            (compareStringObjects(range->min,range->max) == 0 &&
+    if (sdscmplex(range->min,range->max) > 1 ||
+            (sdscmp(range->min,range->max) == 0 &&
             (range->minex || range->maxex)))
         return 0;
     x = zsl->tail;
-    if (x == NULL || !zslLexValueGteMin(x->obj,range))
+    if (x == NULL || !zslLexValueGteMin(x->ele,range))
         return 0;
     x = zsl->header->level[0].forward;
-    if (x == NULL || !zslLexValueLteMax(x->obj,range))
+    if (x == NULL || !zslLexValueLteMax(x->ele,range))
         return 0;
     return 1;
 }
@@ -590,7 +623,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
     for (i = zsl->level-1; i >= 0; i--) {
         /* Go forward while *OUT* of range. */
         while (x->level[i].forward &&
-            !zslLexValueGteMin(x->level[i].forward->obj,range))
+            !zslLexValueGteMin(x->level[i].forward->ele,range))
                 x = x->level[i].forward;
     }
 
@@ -599,7 +632,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
     serverAssert(x != NULL);
 
     /* Check if score <= max. */
-    if (!zslLexValueLteMax(x->obj,range)) return NULL;
+    if (!zslLexValueLteMax(x->ele,range)) return NULL;
     return x;
 }
 
@@ -616,7 +649,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
     for (i = zsl->level-1; i >= 0; i--) {
         /* Go forward while *IN* range. */
         while (x->level[i].forward &&
-            zslLexValueLteMax(x->level[i].forward->obj,range))
+            zslLexValueLteMax(x->level[i].forward->ele,range))
                 x = x->level[i].forward;
     }
 
@@ -624,7 +657,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
     serverAssert(x != NULL);
 
     /* Check if score >= min. */
-    if (!zslLexValueGteMin(x->obj,range)) return NULL;
+    if (!zslLexValueGteMin(x->ele,range)) return NULL;
     return x;
 }
 
@@ -653,10 +686,8 @@ double zzlGetScore(unsigned char *sptr) {
     return score;
 }
 
-/* Return a ziplist element as a Redis string object.
- * This simple abstraction can be used to simplifies some code at the
- * cost of some performance. */
-robj *ziplistGetObject(unsigned char *sptr) {
+/* Return a ziplist element as an SDS string. */
+sds ziplistGetObject(unsigned char *sptr) {
     unsigned char *vstr;
     unsigned int vlen;
     long long vlong;
@@ -665,9 +696,9 @@ robj *ziplistGetObject(unsigned char *sptr) {
     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
 
     if (vstr) {
-        return createStringObject((char*)vstr,vlen);
+        return sdsnewlen((char*)vstr,vlen);
     } else {
-        return createStringObjectFromLongLong(vlong);
+        return sdsfromlonglong(vlong);
     }
 }
 
@@ -822,16 +853,16 @@ unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
 }
 
 static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
-    robj *value = ziplistGetObject(p);
+    sds value = ziplistGetObject(p);
     int res = zslLexValueGteMin(value,spec);
-    decrRefCount(value);
+    sdsfree(value);
     return res;
 }
 
 static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
-    robj *value = ziplistGetObject(p);
+    sds value = ziplistGetObject(p);
     int res = zslLexValueLteMax(value,spec);
-    decrRefCount(value);
+    sdsfree(value);
     return res;
 }
 
@@ -841,8 +872,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
     unsigned char *p;
 
     /* Test for ranges that will always be empty. */
-    if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
-            (compareStringObjects(range->min,range->max) == 0 &&
+    if (sdscmplex(range->min,range->max) > 1 ||
+            (sdscmp(range->min,range->max) == 0 &&
             (range->minex || range->maxex)))
         return 0;
 
@@ -912,26 +943,22 @@ unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
     return NULL;
 }
 
-unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
+unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
 
-    ele = getDecodedObject(ele);
     while (eptr != NULL) {
         sptr = ziplistNext(zl,eptr);
-        serverAssertWithInfo(NULL,ele,sptr != NULL);
+        serverAssert(sptr != NULL);
 
-        if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
+        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
             /* Matching element, pull out score. */
             if (score != NULL) *score = zzlGetScore(sptr);
-            decrRefCount(ele);
             return eptr;
         }
 
         /* Move to next element. */
         eptr = ziplistNext(zl,sptr);
     }
-
-    decrRefCount(ele);
     return NULL;
 }
 
@@ -946,41 +973,38 @@ unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
     return zl;
 }
 
-unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
+unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
     unsigned char *sptr;
     char scorebuf[128];
     int scorelen;
     size_t offset;
 
-    serverAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
     if (eptr == NULL) {
-        zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
+        zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
     } else {
         /* Keep offset relative to zl, as it might be re-allocated. */
         offset = eptr-zl;
-        zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
+        zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
         eptr = zl+offset;
 
         /* Insert score after the element. */
-        serverAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
+        serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
     }
-
     return zl;
 }
 
 /* Insert (element,score) pair in ziplist. This function assumes the element is
  * not yet present in the list. */
-unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
+unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
     double s;
 
-    ele = getDecodedObject(ele);
     while (eptr != NULL) {
         sptr = ziplistNext(zl,eptr);
-        serverAssertWithInfo(NULL,ele,sptr != NULL);
+        serverAssert(sptr != NULL);
         s = zzlGetScore(sptr);
 
         if (s > score) {
@@ -991,7 +1015,7 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
             break;
         } else if (s == score) {
             /* Ensure lexicographical ordering for elements. */
-            if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
+            if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
                 zl = zzlInsertAt(zl,eptr,ele,score);
                 break;
             }
@@ -1004,8 +1028,6 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
     /* Push on tail of list when it was not yet inserted. */
     if (eptr == NULL)
         zl = zzlInsertAt(zl,NULL,ele,score);
-
-    decrRefCount(ele);
     return zl;
 }
 
@@ -1093,7 +1115,7 @@ unsigned int zsetLength(robj *zobj) {
 void zsetConvert(robj *zobj, int encoding) {
     zset *zs;
     zskiplistNode *node, *next;
-    robj *ele;
+    sds ele;
     double score;
 
     if (zobj->encoding == encoding) return;
@@ -1120,14 +1142,12 @@ void zsetConvert(robj *zobj, int encoding) {
             score = zzlGetScore(sptr);
             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
             if (vstr == NULL)
-                ele = createStringObjectFromLongLong(vlong);
+                ele = sdsfromlonglong(vlong);
             else
-                ele = createStringObject((char*)vstr,vlen);
+                ele = sdsnewlen((char*)vstr,vlen);
 
-            /* Has incremented refcount since it was just created. */
             node = zslInsert(zs->zsl,score,ele);
-            serverAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
-            incrRefCount(ele); /* Added to dictionary. */
+            serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
             zzlNext(zl,&eptr,&sptr);
         }
 
@@ -1149,10 +1169,7 @@ void zsetConvert(robj *zobj, int encoding) {
         zfree(zs->zsl);
 
         while (node) {
-            ele = getDecodedObject(node->obj);
-            zl = zzlInsertAt(zl,NULL,ele,node->score);
-            decrRefCount(ele);
-
+            zl = zzlInsertAt(zl,NULL,node->ele,node->score);
             next = node->level[0].forward;
             zslFreeNode(node);
             node = next;
@@ -1170,7 +1187,7 @@ void zsetConvert(robj *zobj, int encoding) {
  * storing it into *score. If the element does not exist C_ERR is returned
  * otherwise C_OK is returned and *score is correctly populated.
  * If 'zobj' or 'member' is NULL, C_ERR is returned. */
-int zsetScore(robj *zobj, robj *member, double *score) {
+int zsetScore(robj *zobj, sds member, double *score) {
     if (!zobj || !member) return C_ERR;
 
     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -1199,9 +1216,8 @@ int zsetScore(robj *zobj, robj *member, double *score) {
 void zaddGenericCommand(client *c, int flags) {
     static char *nanerr = "resulting score is not a number (NaN)";
     robj *key = c->argv[1];
-    robj *ele;
     robj *zobj;
-    robj *curobj;
+    sds ele;
     double score = 0, *scores = NULL, curscore = 0.0;
     int j, elements;
     int scoreidx = 0;
@@ -1285,11 +1301,10 @@ void zaddGenericCommand(client *c, int flags) {
     for (j = 0; j < elements; j++) {
         score = scores[j];
 
+        ele = c->argv[scoreidx+1+j*2]->ptr;
         if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
             unsigned char *eptr;
 
-            /* Prefer non-encoded element when dealing with ziplists. */
-            ele = c->argv[scoreidx+1+j*2];
             if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
                 if (nx) continue;
                 if (incr) {
@@ -1314,7 +1329,7 @@ void zaddGenericCommand(client *c, int flags) {
                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
-                if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                if (sdslen(ele) > server.zset_max_ziplist_value)
                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
                 server.dirty++;
                 added++;
@@ -1325,12 +1340,9 @@ void zaddGenericCommand(client *c, int flags) {
             zskiplistNode *znode;
             dictEntry *de;
 
-            ele = c->argv[scoreidx+1+j*2] =
-                tryObjectEncoding(c->argv[scoreidx+1+j*2]);
             de = dictFind(zs->dict,ele);
             if (de != NULL) {
                 if (nx) continue;
-                curobj = dictGetKey(de);
                 curscore = *(double*)dictGetVal(de);
 
                 if (incr) {
@@ -1343,23 +1355,27 @@ void zaddGenericCommand(client *c, int flags) {
                     }
                 }
 
-                /* Remove and re-insert when score changed. We can safely
-                 * delete the key object from the skiplist, since the
-                 * dictionary still has a reference to it. */
+                /* Remove and re-insert when score changes. */
                 if (score != curscore) {
-                    serverAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
-                    znode = zslInsert(zs->zsl,score,curobj);
-                    incrRefCount(curobj); /* Re-inserted in skiplist. */
+                    zskiplistNode *node;
+                    serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
+                    znode = zslInsert(zs->zsl,score,node->ele);
+                    /* We reused the node->ele SDS string, free the node now
+                     * since zslInsert created a new one. */
+                    node->ele = NULL;
+                    zslFreeNode(node);
+                    /* Note that we did not removed the original element from
+                     * the hash table representing the sorted set, so we just
+                     * update the score. */
                     dictGetVal(de) = &znode->score; /* Update score ptr. */
                     server.dirty++;
                     updated++;
                 }
                 processed++;
             } else if (!xx) {
+                ele = sdsdup(ele);
                 znode = zslInsert(zs->zsl,score,ele);
-                incrRefCount(ele); /* Inserted in skiplist. */
-                serverAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
-                incrRefCount(ele); /* Added to dictionary. */
+                serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
                 server.dirty++;
                 added++;
                 processed++;
@@ -1408,7 +1424,7 @@ void zremCommand(client *c) {
         unsigned char *eptr;
 
         for (j = 2; j < c->argc; j++) {
-            if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
+            if ((eptr = zzlFind(zobj->ptr,c->argv[j]->ptr,NULL)) != NULL) {
                 deleted++;
                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
                 if (zzlLength(zobj->ptr) == 0) {
@@ -1424,16 +1440,26 @@ void zremCommand(client *c) {
         double score;
 
         for (j = 2; j < c->argc; j++) {
-            de = dictFind(zs->dict,c->argv[j]);
+            de = dictFind(zs->dict,c->argv[j]->ptr);
             if (de != NULL) {
                 deleted++;
 
-                /* Delete from the skiplist */
+                /* Get the score in order to delete from the skiplist later. */
                 score = *(double*)dictGetVal(de);
-                serverAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
 
-                /* Delete from the hash table */
-                dictDelete(zs->dict,c->argv[j]);
+                /* Delete from the hash table and later from the skiplist.
+                 * Note that the order is important: deleting from the skiplist
+                 * actually releases the SDS string representing the element,
+                 * which is shared between the skiplist and the hash table, so
+                 * we need to delete from the skiplist as the final step. */
+                int retval1 = dictDelete(zs->dict,c->argv[j]->ptr);
+
+                /* Delete from skiplist. */
+                int retval2 = zslDelete(zs->zsl,score,c->argv[j]->ptr,NULL);
+
+                serverAssertWithInfo(c,c->argv[j],
+                    retval1 == DICT_OK && retval2);
+
                 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
                 if (dictSize(zs->dict) == 0) {
                     dbDelete(c->db,key);
@@ -1613,7 +1639,7 @@ typedef struct {
  * we already checked that "ell" holds a long long, or tried to convert another
  * representation into a long long value. When this was successful,
  * OPVAL_VALID_LL is set as well. */
-#define OPVAL_DIRTY_ROBJ 1
+#define OPVAL_DIRTY_SDS 1
 #define OPVAL_DIRTY_LL 2
 #define OPVAL_VALID_LL 4
 
@@ -1621,7 +1647,7 @@ typedef struct {
 typedef struct {
     int flags;
     unsigned char _buf[32]; /* Private buffer. */
-    robj *ele;
+    sds ele;
     unsigned char *estr;
     unsigned int elen;
     long long ell;
@@ -1728,8 +1754,8 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
     if (op->subject == NULL)
         return 0;
 
-    if (val->flags & OPVAL_DIRTY_ROBJ)
-        decrRefCount(val->ele);
+    if (val->flags & OPVAL_DIRTY_SDS)
+        sdsfree(val->ele);
 
     memset(val,0,sizeof(zsetopval));
 
@@ -1770,7 +1796,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
             if (it->sl.node == NULL)
                 return 0;
-            val->ele = it->sl.node->obj;
+            val->ele = it->sl.node->ele;
             val->score = it->sl.node->score;
 
             /* Move to next element. */
@@ -1789,15 +1815,8 @@ int zuiLongLongFromValue(zsetopval *val) {
         val->flags |= OPVAL_DIRTY_LL;
 
         if (val->ele != NULL) {
-            if (val->ele->encoding == OBJ_ENCODING_INT) {
-                val->ell = (long)val->ele->ptr;
+            if (string2ll(val->ele,sdslen(val->ele),&val->ell))
                 val->flags |= OPVAL_VALID_LL;
-            } else if (sdsEncodedObject(val->ele)) {
-                if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
-                    val->flags |= OPVAL_VALID_LL;
-            } else {
-                serverPanic("Unsupported element encoding");
-            }
         } else if (val->estr != NULL) {
             if (string2ll((char*)val->estr,val->elen,&val->ell))
                 val->flags |= OPVAL_VALID_LL;
@@ -1809,30 +1828,41 @@ int zuiLongLongFromValue(zsetopval *val) {
     return val->flags & OPVAL_VALID_LL;
 }
 
-robj *zuiObjectFromValue(zsetopval *val) {
+sds zuiSdsFromValue(zsetopval *val) {
     if (val->ele == NULL) {
         if (val->estr != NULL) {
-            val->ele = createStringObject((char*)val->estr,val->elen);
+            val->ele = sdsnewlen((char*)val->estr,val->elen);
         } else {
-            val->ele = createStringObjectFromLongLong(val->ell);
+            val->ele = sdsfromlonglong(val->ell);
         }
-        val->flags |= OPVAL_DIRTY_ROBJ;
+        val->flags |= OPVAL_DIRTY_SDS;
     }
     return val->ele;
 }
 
+/* This is different from zuiSdsFromValue since returns a new SDS string
+ * which is up to the caller to free. */
+sds zuiNewSdsFromValue(zsetopval *val) {
+    if (val->flags & OPVAL_DIRTY_SDS) {
+        /* We have already one to return! */
+        sds ele = val->ele;
+        val->flags &= ~OPVAL_DIRTY_SDS;
+        val->ele = NULL;
+        return ele;
+    } else if (val->ele) {
+        return sdsdup(val->ele);
+    } else if (val->estr) {
+        return sdsnewlen((char*)val->estr,val->elen);
+    } else {
+        return sdsfromlonglong(val->ell);
+    }
+}
+
 int zuiBufferFromValue(zsetopval *val) {
     if (val->estr == NULL) {
         if (val->ele != NULL) {
-            if (val->ele->encoding == OBJ_ENCODING_INT) {
-                val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
-                val->estr = val->_buf;
-            } else if (sdsEncodedObject(val->ele)) {
-                val->elen = sdslen(val->ele->ptr);
-                val->estr = val->ele->ptr;
-            } else {
-                serverPanic("Unsupported element encoding");
-            }
+            val->elen = sdslen(val->ele);
+            val->estr = (unsigned char*)val->ele;
         } else {
             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
             val->estr = val->_buf;
@@ -1859,7 +1889,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
             }
         } else if (op->encoding == OBJ_ENCODING_HT) {
             dict *ht = op->subject->ptr;
-            zuiObjectFromValue(val);
+            zuiSdsFromValue(val);
             if (dictFind(ht,val->ele) != NULL) {
                 *score = 1.0;
                 return 1;
@@ -1870,7 +1900,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
             serverPanic("Unknown set encoding");
         }
     } else if (op->type == OBJ_ZSET) {
-        zuiObjectFromValue(val);
+        zuiSdsFromValue(val);
 
         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
             if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
@@ -1922,13 +1952,25 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat
     }
 }
 
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+
+dictType setAccumulatorDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    NULL                       /* val destructor */
+};
+
 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
     int i, j;
     long setnum;
     int aggregate = REDIS_AGGR_SUM;
     zsetopsrc *src;
     zsetopval zval;
-    robj *tmp;
+    sds tmp;
     unsigned int maxelelen = 0;
     robj *dstobj;
     zset *dstzset;
@@ -1978,7 +2020,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
         int remaining = c->argc - j;
 
         while (remaining) {
-            if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
+            if (remaining >= (setnum + 1) &&
+                !strcasecmp(c->argv[j]->ptr,"weights"))
+            {
                 j++; remaining--;
                 for (i = 0; i < setnum; i++, j++, remaining--) {
                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
@@ -1988,7 +2032,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
                         return;
                     }
                 }
-            } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
+            } else if (remaining >= 2 &&
+                       !strcasecmp(c->argv[j]->ptr,"aggregate"))
+            {
                 j++; remaining--;
                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
                     aggregate = REDIS_AGGR_SUM;
@@ -2046,22 +2092,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
 
                 /* Only continue when present in every input. */
                 if (j == setnum) {
-                    tmp = zuiObjectFromValue(&zval);
+                    tmp = zuiNewSdsFromValue(&zval);
                     znode = zslInsert(dstzset->zsl,score,tmp);
-                    incrRefCount(tmp); /* added to skiplist */
                     dictAdd(dstzset->dict,tmp,&znode->score);
-                    incrRefCount(tmp); /* added to dictionary */
-
-                    if (sdsEncodedObject(tmp)) {
-                        if (sdslen(tmp->ptr) > maxelelen)
-                            maxelelen = sdslen(tmp->ptr);
-                    }
+                    if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
                 }
             }
             zuiClearIterator(&src[0]);
         }
     } else if (op == SET_OP_UNION) {
-        dict *accumulator = dictCreate(&setDictType,NULL);
+        dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
         dictIterator *di;
         dictEntry *de;
         double score;
@@ -2084,20 +2124,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
                 if (isnan(score)) score = 0;
 
                 /* Search for this element in the accumulating dictionary. */
-                de = dictFind(accumulator,zuiObjectFromValue(&zval));
+                de = dictFind(accumulator,zuiSdsFromValue(&zval));
                 /* If we don't have it, we need to create a new entry. */
                 if (de == NULL) {
-                    tmp = zuiObjectFromValue(&zval);
+                    tmp = zuiNewSdsFromValue(&zval);
                     /* Remember the longest single element encountered,
                      * to understand if it's possible to convert to ziplist
                      * at the end. */
-                    if (sdsEncodedObject(tmp)) {
-                        if (sdslen(tmp->ptr) > maxelelen)
-                            maxelelen = sdslen(tmp->ptr);
-                    }
+                     if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
                     /* Add the element with its initial score. */
                     de = dictAddRaw(accumulator,tmp);
-                    incrRefCount(tmp);
                     dictSetDoubleVal(de,score);
                 } else {
                     /* Update the score with the score of the new instance
@@ -2121,16 +2157,12 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
         dictExpand(dstzset->dict,dictSize(accumulator));
 
         while((de = dictNext(di)) != NULL) {
-            robj *ele = dictGetKey(de);
+            sds ele = dictGetKey(de);
             score = dictGetDoubleVal(de);
             znode = zslInsert(dstzset->zsl,score,ele);
-            incrRefCount(ele); /* added to skiplist */
             dictAdd(dstzset->dict,ele,&znode->score);
-            incrRefCount(ele); /* added to dictionary */
         }
         dictReleaseIterator(di);
-
-        /* We can free the accumulator dictionary now. */
         dictRelease(accumulator);
     } else {
         serverPanic("Unknown operator");
@@ -2247,7 +2279,7 @@ void zrangeGenericCommand(client *c, int reverse) {
         zset *zs = zobj->ptr;
         zskiplist *zsl = zs->zsl;
         zskiplistNode *ln;
-        robj *ele;
+        sds ele;
 
         /* Check if starting point is trivial, before doing log(N) lookup. */
         if (reverse) {
@@ -2262,8 +2294,8 @@ void zrangeGenericCommand(client *c, int reverse) {
 
         while(rangelen--) {
             serverAssertWithInfo(c,zobj,ln != NULL);
-            ele = ln->obj;
-            addReplyBulk(c,ele);
+            ele = ln->ele;
+            addReplyBulkCBuffer(c,ele,sdslen(ele));
             if (withscores)
                 addReplyDouble(c,ln->score);
             ln = reverse ? ln->backward : ln->level[0].forward;
@@ -2317,8 +2349,13 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
                 pos++; remaining--;
                 withscores = 1;
             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
-                if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
-                    (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
+                if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
+                        != C_OK) ||
+                    (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
+                        != C_OK))
+                {
+                    return;
+                }
                 pos += 3; remaining -= 3;
             } else {
                 addReply(c,shared.syntaxerr);
@@ -2444,7 +2481,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
             }
 
             rangelen++;
-            addReplyBulk(c,ln->obj);
+            addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
 
             if (withscores) {
                 addReplyDouble(c,ln->score);
@@ -2534,7 +2571,7 @@ void zcountCommand(client *c) {
 
         /* Use rank of first element, if any, to determine preliminary count */
         if (zn != NULL) {
-            rank = zslGetRank(zsl, zn->score, zn->obj);
+            rank = zslGetRank(zsl, zn->score, zn->ele);
             count = (zsl->length - (rank - 1));
 
             /* Find last element in range */
@@ -2542,7 +2579,7 @@ void zcountCommand(client *c) {
 
             /* Use rank of last element, if any, to determine the actual count */
             if (zn != NULL) {
-                rank = zslGetRank(zsl, zn->score, zn->obj);
+                rank = zslGetRank(zsl, zn->score, zn->ele);
                 count -= (zsl->length - rank);
             }
         }
@@ -2612,7 +2649,7 @@ void zlexcountCommand(client *c) {
 
         /* Use rank of first element, if any, to determine preliminary count */
         if (zn != NULL) {
-            rank = zslGetRank(zsl, zn->score, zn->obj);
+            rank = zslGetRank(zsl, zn->score, zn->ele);
             count = (zsl->length - (rank - 1));
 
             /* Find last element in range */
@@ -2620,7 +2657,7 @@ void zlexcountCommand(client *c) {
 
             /* Use rank of last element, if any, to determine the actual count */
             if (zn != NULL) {
-                rank = zslGetRank(zsl, zn->score, zn->obj);
+                rank = zslGetRank(zsl, zn->score, zn->ele);
                 count -= (zsl->length - rank);
             }
         }
@@ -2786,13 +2823,13 @@ void genericZrangebylexCommand(client *c, int reverse) {
         while (ln && limit--) {
             /* Abort when the node is no longer in range. */
             if (reverse) {
-                if (!zslLexValueGteMin(ln->obj,&range)) break;
+                if (!zslLexValueGteMin(ln->ele,&range)) break;
             } else {
-                if (!zslLexValueLteMax(ln->obj,&range)) break;
+                if (!zslLexValueLteMax(ln->ele,&range)) break;
             }
 
             rangelen++;
-            addReplyBulk(c,ln->obj);
+            addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
 
             /* Move to next node */
             if (reverse) {
@@ -2835,7 +2872,7 @@ void zscoreCommand(client *c) {
     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
         checkType(c,zobj,OBJ_ZSET)) return;
 
-    if (zsetScore(zobj,c->argv[2],&score) == C_ERR) {
+    if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
         addReply(c,shared.nullbulk);
     } else {
         addReplyDouble(c,score);
@@ -2887,11 +2924,12 @@ void zrankGenericCommand(client *c, int reverse) {
         double score;
 
         ele = c->argv[2];
-        de = dictFind(zs->dict,ele);
+        de = dictFind(zs->dict,ele->ptr);
         if (de != NULL) {
             score = *(double*)dictGetVal(de);
-            rank = zslGetRank(zsl,score,ele);
-            serverAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
+            rank = zslGetRank(zsl,score,ele->ptr);
+            /* Existing elements always have a rank. */
+            serverAssertWithInfo(c,ele,rank);
             if (reverse)
                 addReplyLongLong(c,llen-rank);
             else