From d56f4b4122aa51f97f5284e6e449943dd46ad659 Mon Sep 17 00:00:00 2001
From: Oran Agra <oran@redislabs.com>
Date: Mon, 2 Apr 2018 18:36:17 +0300
Subject: [PATCH 1/2] Add redis-cli support for diskless replication (CAPA EOF)

when setting repl-diskless-sync yes, and sending SYNC.
redis-cli needs to be able to understand the EOF marker protocol
in order to be able to skip or download the rdb file
---
 src/redis-cli.c | 112 +++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 102 insertions(+), 10 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index d80973e7..6fffa902 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1793,9 +1793,31 @@ static void latencyDistMode(void) {
  * Slave mode
  *--------------------------------------------------------------------------- */
 
+#define RDB_EOF_MARK_SIZE 40
+
+void sendReplconf(const char* arg1, const char* arg2) {
+    printf("sending REPLCONF %s %s\n", arg1, arg2);
+    redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
+
+    /* Handle any error conditions */
+    if(reply == NULL) {
+        fprintf(stderr, "\nI/O error\n");
+        exit(1);
+    } else if(reply->type == REDIS_REPLY_ERROR) {
+        fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
+        /* non fatal, old versions may not support it */
+    }
+    freeReplyObject(reply);
+}
+
+void sendCapa() {
+    sendReplconf("capa", "eof");
+}
+
 /* Sends SYNC and reads the number of bytes in the payload. Used both by
- * slaveMode() and getRDB(). */
-unsigned long long sendSync(int fd) {
+ * slaveMode() and getRDB().
+ * returns 0 in case an EOF marker is used. */
+unsigned long long sendSync(int fd, char *out_eof) {
     /* To start we need to send the SYNC command and return the payload.
      * The hiredis client lib does not understand this part of the protocol
      * and we don't want to mess with its buffers, so everything is performed
@@ -1825,17 +1847,33 @@ unsigned long long sendSync(int fd) {
         printf("SYNC with master failed: %s\n", buf);
         exit(1);
     }
+    if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
+        memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
+        return 0;
+    }
     return strtoull(buf+1,NULL,10);
 }
 
 static void slaveMode(void) {
     int fd = context->fd;
-    unsigned long long payload = sendSync(fd);
+    static char eofmark[RDB_EOF_MARK_SIZE];
+    static char lastbytes[RDB_EOF_MARK_SIZE];
+    static int usemark = 0;
+    unsigned long long payload = sendSync(fd, eofmark);
     char buf[1024];
     int original_output = config.output;
 
-    fprintf(stderr,"SYNC with master, discarding %llu "
-                   "bytes of bulk transfer...\n", payload);
+    if (payload == 0) {
+        payload = ULLONG_MAX;
+        memset(lastbytes,0,RDB_EOF_MARK_SIZE);
+        usemark = 1;
+        fprintf(stderr,"SYNC with master, discarding "
+                       "bytes of bulk transfer until EOF marker...\n");
+    } else {
+        fprintf(stderr,"SYNC with master, discarding %llu "
+                       "bytes of bulk transfer...\n", payload);
+    }
+
 
     /* Discard the payload. */
     while(payload) {
@@ -1847,8 +1885,29 @@ static void slaveMode(void) {
             exit(1);
         }
         payload -= nread;
+
+        if (usemark) {
+            /* Update the last bytes array, and check if it matches our delimiter.*/
+            if (nread >= RDB_EOF_MARK_SIZE) {
+                memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
+            } else {
+                int rem = RDB_EOF_MARK_SIZE-nread;
+                memmove(lastbytes,lastbytes+nread,rem);
+                memcpy(lastbytes+rem,buf,nread);
+            }
+            if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
+                break;
+        }
     }
-    fprintf(stderr,"SYNC done. Logging commands from master.\n");
+
+    if (usemark) {
+        unsigned long long offset = ULLONG_MAX - payload;
+        fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
+        /* put the slave online */
+        sleep(1);
+        sendReplconf("ACK", "0");
+    } else
+        fprintf(stderr,"SYNC done. Logging commands from master.\n");
 
     /* Now we can use hiredis to read the incoming protocol. */
     config.output = OUTPUT_CSV;
@@ -1865,11 +1924,22 @@ static void slaveMode(void) {
 static void getRDB(void) {
     int s = context->fd;
     int fd;
-    unsigned long long payload = sendSync(s);
+    static char eofmark[RDB_EOF_MARK_SIZE];
+    static char lastbytes[RDB_EOF_MARK_SIZE];
+    static int usemark = 0;
+    unsigned long long payload = sendSync(s, eofmark);
     char buf[4096];
 
-    fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
-        payload, config.rdb_filename);
+    if (payload == 0) {
+        payload = ULLONG_MAX;
+        memset(lastbytes,0,RDB_EOF_MARK_SIZE);
+        usemark = 1;
+        fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
+            config.rdb_filename);
+    } else {
+        fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
+            payload, config.rdb_filename);
+    }
 
     /* Write to file. */
     if (!strcmp(config.rdb_filename,"-")) {
@@ -1898,11 +1968,31 @@ static void getRDB(void) {
             exit(1);
         }
         payload -= nread;
+
+        if (usemark) {
+            /* Update the last bytes array, and check if it matches our delimiter.*/
+            if (nread >= RDB_EOF_MARK_SIZE) {
+                memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
+            } else {
+                int rem = RDB_EOF_MARK_SIZE-nread;
+                memmove(lastbytes,lastbytes+nread,rem);
+                memcpy(lastbytes+rem,buf,nread);
+            }
+            if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
+                break;
+        }
+    }
+    if (usemark) {
+        payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
+        if (ftruncate(fd, payload) == -1)
+            fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
+        fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
+    } else {
+        fprintf(stderr,"Transfer finished with success.\n");
     }
     close(s); /* Close the file descriptor ASAP as fsync() may take time. */
     fsync(fd);
     close(fd);
-    fprintf(stderr,"Transfer finished with success.\n");
     exit(0);
 }
 
@@ -2893,12 +2983,14 @@ int main(int argc, char **argv) {
     /* Slave mode */
     if (config.slave_mode) {
         if (cliConnect(0) == REDIS_ERR) exit(1);
+        sendCapa();
         slaveMode();
     }
 
     /* Get RDB mode. */
     if (config.getrdb_mode) {
         if (cliConnect(0) == REDIS_ERR) exit(1);
+        sendCapa();
         getRDB();
     }
 

From b6de51206e8fd79269d05f0fed3f396683d75446 Mon Sep 17 00:00:00 2001
From: Oran Agra <oran@redislabs.com>
Date: Thu, 21 Feb 2019 12:06:18 +0200
Subject: [PATCH 2/2] redis-cli add support for --memkeys, fix --bigkeys for
 module types

* bigkeys used to fail on databases with module type keys
* new code adds more types when it discovers them, but has no way to know element count in modules types yet
* bigkeys was missing XLEN command for streams
* adding --memkeys and --memkeys-samples to make use of the MEMORY USAGE command

see #5167, #5175
---
 src/redis-cli.c | 213 ++++++++++++++++++++++++++++++------------------
 1 file changed, 132 insertions(+), 81 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index 93290e5e..884b23e6 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -210,6 +210,8 @@ static struct config {
     char *pattern;
     char *rdb_filename;
     int bigkeys;
+    int memkeys;
+    unsigned memkeys_samples;
     int hotkeys;
     int stdinarg; /* get last arg from stdin. (-x option) */
     char *auth;
@@ -1336,6 +1338,12 @@ static int parseOptions(int argc, char **argv) {
             config.pipe_timeout = atoi(argv[++i]);
         } else if (!strcmp(argv[i],"--bigkeys")) {
             config.bigkeys = 1;
+        } else if (!strcmp(argv[i],"--memkeys")) {
+            config.memkeys = 1;
+            config.memkeys_samples = 0; /* use redis default */
+        } else if (!strcmp(argv[i],"--memkeys-samples")) {
+            config.memkeys = 1;
+            config.memkeys_samples = atoi(argv[++i]);
         } else if (!strcmp(argv[i],"--hotkeys")) {
             config.hotkeys = 1;
         } else if (!strcmp(argv[i],"--eval") && !lastarg) {
@@ -1534,7 +1542,10 @@ static void usage(void) {
 "  --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n"
 "                     no reply is received within <n> seconds.\n"
 "                     Default timeout: %d. Use 0 to wait forever.\n"
-"  --bigkeys          Sample Redis keys looking for big keys.\n"
+"  --bigkeys          Sample Redis keys looking for keys with many elements (complexity).\n"
+"  --memkeys          Sample Redis keys looking for keys consuming a lot of memory.\n"
+"  --memkeys-samples <n> Sample Redis keys looking for keys consuming a lot of memory.\n"
+"                     And define number of key elements to sample\n"
 "  --hotkeys          Sample Redis keys looking for hot keys.\n"
 "                     only works when maxmemory-policy is *lfu.\n"
 "  --scan             List all keys using the SCAN command.\n"
@@ -6418,15 +6429,6 @@ static void pipeMode(void) {
  * Find big keys
  *--------------------------------------------------------------------------- */
 
-#define TYPE_STRING 0
-#define TYPE_LIST   1
-#define TYPE_SET    2
-#define TYPE_HASH   3
-#define TYPE_ZSET   4
-#define TYPE_STREAM 5
-#define TYPE_NONE   6
-#define TYPE_COUNT  7
-
 static redisReply *sendScan(unsigned long long *it) {
     redisReply *reply = redisCommand(context, "SCAN %llu", *it);
 
@@ -6473,28 +6475,51 @@ static int getDbSize(void) {
     return size;
 }
 
-static int toIntType(char *key, char *type) {
-    if(!strcmp(type, "string")) {
-        return TYPE_STRING;
-    } else if(!strcmp(type, "list")) {
-        return TYPE_LIST;
-    } else if(!strcmp(type, "set")) {
-        return TYPE_SET;
-    } else if(!strcmp(type, "hash")) {
-        return TYPE_HASH;
-    } else if(!strcmp(type, "zset")) {
-        return TYPE_ZSET;
-    } else if(!strcmp(type, "stream")) {
-        return TYPE_STREAM;
-    } else if(!strcmp(type, "none")) {
-        return TYPE_NONE;
-    } else {
-        fprintf(stderr, "Unknown type '%s' for key '%s'\n", type, key);
-        exit(1);
-    }
+typedef struct {
+    char *name;
+    char *sizecmd;
+    char *sizeunit;
+    unsigned long long biggest;
+    unsigned long long count;
+    unsigned long long totalsize;
+    sds biggest_key;
+} typeinfo;
+
+typeinfo type_string = { "string", "STRLEN", "bytes" };
+typeinfo type_list = { "list", "LLEN", "items" };
+typeinfo type_set = { "set", "SCARD", "members" };
+typeinfo type_hash = { "hash", "HLEN", "fields" };
+typeinfo type_zset = { "zset", "ZCARD", "members" };
+typeinfo type_stream = { "stream", "XLEN", "entries" };
+typeinfo type_other = { "other", NULL, "?" };
+
+static typeinfo* typeinfo_add(dict *types, char* name, typeinfo* type_template) {
+    typeinfo *info = zmalloc(sizeof(typeinfo));
+    *info = *type_template;
+    info->name = sdsnew(name);
+    dictAdd(types, info->name, info);
+    return info;
 }
 
-static void getKeyTypes(redisReply *keys, int *types) {
+void type_free(void* priv_data, void* val) {
+    typeinfo *info = val;
+    UNUSED(priv_data);
+    if (info->biggest_key)
+        sdsfree(info->biggest_key);
+    sdsfree(info->name);
+    zfree(info);
+}
+
+static dictType typeinfoDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor (owned by the value)*/
+    type_free                  /* val destructor */
+};
+
+static void getKeyTypes(dict *types_dict, redisReply *keys, typeinfo **types) {
     redisReply *reply;
     unsigned int i;
 
@@ -6520,32 +6545,47 @@ static void getKeyTypes(redisReply *keys, int *types) {
             exit(1);
         }
 
-        types[i] = toIntType(keys->element[i]->str, reply->str);
+        sds typereply = sdsnew(reply->str);
+        dictEntry *de = dictFind(types_dict, typereply);
+        sdsfree(typereply);
+        typeinfo *type = NULL;
+        if (de)
+            type = dictGetVal(de);
+        else if (strcmp(reply->str, "none")) /* create new types for modules, (but not for deleted keys) */
+            type = typeinfo_add(types_dict, reply->str, &type_other);
+        types[i] = type;
         freeReplyObject(reply);
     }
 }
 
-static void getKeySizes(redisReply *keys, int *types,
-                        unsigned long long *sizes)
+static void getKeySizes(redisReply *keys, typeinfo **types,
+                        unsigned long long *sizes, int memkeys,
+                        unsigned memkeys_samples)
 {
     redisReply *reply;
-    char *sizecmds[] = {"STRLEN","LLEN","SCARD","HLEN","ZCARD"};
     unsigned int i;
 
     /* Pipeline size commands */
     for(i=0;i<keys->elements;i++) {
-        /* Skip keys that were deleted */
-        if(types[i]==TYPE_NONE)
+        /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */
+        if(!types[i] || (!types[i]->sizecmd && !memkeys))
             continue;
 
-        redisAppendCommand(context, "%s %s", sizecmds[types[i]],
-            keys->element[i]->str);
+        if (!memkeys)
+            redisAppendCommand(context, "%s %s",
+                types[i]->sizecmd, keys->element[i]->str);
+        else if (memkeys_samples==0)
+            redisAppendCommand(context, "%s %s %s",
+                "MEMORY", "USAGE", keys->element[i]->str);
+        else
+            redisAppendCommand(context, "%s %s %s SAMPLES %u",
+                "MEMORY", "USAGE", keys->element[i]->str, memkeys_samples);
     }
 
     /* Retrieve sizes */
     for(i=0;i<keys->elements;i++) {
-        /* Skip keys that disappeared between SCAN and TYPE */
-        if(types[i] == TYPE_NONE) {
+        /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */
+        if(!types[i] || (!types[i]->sizecmd && !memkeys)) {
             sizes[i] = 0;
             continue;
         }
@@ -6560,7 +6600,8 @@ static void getKeySizes(redisReply *keys, int *types,
              * added as a different type between TYPE and SIZE */
             fprintf(stderr,
                 "Warning:  %s on '%s' failed (may have changed type)\n",
-                 sizecmds[types[i]], keys->element[i]->str);
+                !memkeys? types[i]->sizecmd: "MEMORY USAGE",
+                keys->element[i]->str);
             sizes[i] = 0;
         } else {
             sizes[i] = reply->integer;
@@ -6570,17 +6611,23 @@ static void getKeySizes(redisReply *keys, int *types,
     }
 }
 
-static void findBigKeys(void) {
-    unsigned long long biggest[TYPE_COUNT] = {0}, counts[TYPE_COUNT] = {0}, totalsize[TYPE_COUNT] = {0};
+static void findBigKeys(int memkeys, unsigned memkeys_samples) {
     unsigned long long sampled = 0, total_keys, totlen=0, *sizes=NULL, it=0;
-    sds maxkeys[TYPE_COUNT] = {0};
-    char *typename[] = {"string","list","set","hash","zset","stream","none"};
-    char *typeunit[] = {"bytes","items","members","fields","members","entries",""};
     redisReply *reply, *keys;
     unsigned int arrsize=0, i;
-    int type, *types=NULL;
+    dictIterator *di;
+    dictEntry *de;
+    typeinfo **types = NULL;
     double pct;
 
+    dict *types_dict = dictCreate(&typeinfoDictType, NULL);
+    typeinfo_add(types_dict, "string", &type_string);
+    typeinfo_add(types_dict, "list", &type_list);
+    typeinfo_add(types_dict, "set", &type_set);
+    typeinfo_add(types_dict, "hash", &type_hash);
+    typeinfo_add(types_dict, "zset", &type_zset);
+    typeinfo_add(types_dict, "stream", &type_stream);
+
     /* Total keys pre scanning */
     total_keys = getDbSize();
 
@@ -6589,15 +6636,6 @@ static void findBigKeys(void) {
     printf("# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec\n");
     printf("# per 100 SCAN commands (not usually needed).\n\n");
 
-    /* New up sds strings to keep track of overall biggest per type */
-    for(i=0;i<TYPE_NONE; i++) {
-        maxkeys[i] = sdsempty();
-        if(!maxkeys[i]) {
-            fprintf(stderr, "Failed to allocate memory for largest key names!\n");
-            exit(1);
-        }
-    }
-
     /* SCAN loop */
     do {
         /* Calculate approximate percentage completion */
@@ -6609,7 +6647,7 @@ static void findBigKeys(void) {
 
         /* Reallocate our type and size array if we need to */
         if(keys->elements > arrsize) {
-            types = zrealloc(types, sizeof(int)*keys->elements);
+            types = zrealloc(types, sizeof(typeinfo*)*keys->elements);
             sizes = zrealloc(sizes, sizeof(unsigned long long)*keys->elements);
 
             if(!types || !sizes) {
@@ -6621,34 +6659,38 @@ static void findBigKeys(void) {
         }
 
         /* Retrieve types and then sizes */
-        getKeyTypes(keys, types);
-        getKeySizes(keys, types, sizes);
+        getKeyTypes(types_dict, keys, types);
+        getKeySizes(keys, types, sizes, memkeys, memkeys_samples);
 
         /* Now update our stats */
         for(i=0;i<keys->elements;i++) {
-            if((type = types[i]) == TYPE_NONE)
+            typeinfo *type = types[i];
+            /* Skip keys that disappeared between SCAN and TYPE */
+            if(!type)
                 continue;
 
-            totalsize[type] += sizes[i];
-            counts[type]++;
+            type->totalsize += sizes[i];
+            type->count++;
             totlen += keys->element[i]->len;
             sampled++;
 
-            if(biggest[type]<sizes[i]) {
+            if(type->biggest<sizes[i]) {
                 printf(
                    "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n",
-                   pct, typename[type], keys->element[i]->str, sizes[i],
-                   typeunit[type]);
+                   pct, type->name, keys->element[i]->str, sizes[i],
+                   !memkeys? type->sizeunit: "bytes");
 
                 /* Keep track of biggest key name for this type */
-                maxkeys[type] = sdscpy(maxkeys[type], keys->element[i]->str);
-                if(!maxkeys[type]) {
+                if (type->biggest_key)
+                    sdsfree(type->biggest_key);
+                type->biggest_key = sdsnew(keys->element[i]->str);
+                if(!type->biggest_key) {
                     fprintf(stderr, "Failed to allocate memory for key!\n");
                     exit(1);
                 }
 
                 /* Keep track of the biggest size for this type */
-                biggest[type] = sizes[i];
+                type->biggest = sizes[i];
             }
 
             /* Update overall progress */
@@ -6676,26 +6718,29 @@ static void findBigKeys(void) {
        totlen, totlen ? (double)totlen/sampled : 0);
 
     /* Output the biggest keys we found, for types we did find */
-    for(i=0;i<TYPE_NONE;i++) {
-        if(sdslen(maxkeys[i])>0) {
-            printf("Biggest %6s found '%s' has %llu %s\n", typename[i], maxkeys[i],
-               biggest[i], typeunit[i]);
+    di = dictGetIterator(types_dict);
+    while ((de = dictNext(di))) {
+        typeinfo *type = dictGetVal(de);
+        if(type->biggest_key) {
+            printf("Biggest %6s found '%s' has %llu %s\n", type->name, type->biggest_key,
+               type->biggest, !memkeys? type->sizeunit: "bytes");
         }
     }
+    dictReleaseIterator(di);
 
     printf("\n");
 
-    for(i=0;i<TYPE_NONE;i++) {
+    di = dictGetIterator(types_dict);
+    while ((de = dictNext(di))) {
+        typeinfo *type = dictGetVal(de);
         printf("%llu %ss with %llu %s (%05.2f%% of keys, avg size %.2f)\n",
-           counts[i], typename[i], totalsize[i], typeunit[i],
-           sampled ? 100 * (double)counts[i]/sampled : 0,
-           counts[i] ? (double)totalsize[i]/counts[i] : 0);
+           type->count, type->name, type->totalsize, !memkeys? type->sizeunit: "bytes",
+           sampled ? 100 * (double)type->count/sampled : 0,
+           type->count ? (double)type->totalsize/type->count : 0);
     }
+    dictReleaseIterator(di);
 
-    /* Free sds strings containing max keys */
-    for(i=0;i<TYPE_NONE;i++) {
-        sdsfree(maxkeys[i]);
-    }
+    dictRelease(types_dict);
 
     /* Success! */
     exit(0);
@@ -7287,7 +7332,13 @@ int main(int argc, char **argv) {
     /* Find big keys */
     if (config.bigkeys) {
         if (cliConnect(0) == REDIS_ERR) exit(1);
-        findBigKeys();
+        findBigKeys(0, 0);
+    }
+
+    /* Find large keys */
+    if (config.memkeys) {
+        if (cliConnect(0) == REDIS_ERR) exit(1);
+        findBigKeys(1, config.memkeys_samples);
     }
 
     /* Find hot keys */