From 74dcd14d1333bc312703de5ba143f41d6973815d Mon Sep 17 00:00:00 2001
From: artix <artix2@gmail.com>
Date: Wed, 31 Jan 2018 17:57:16 +0100
Subject: [PATCH] Added check for open slots (clusterManagerCheckCluster)

---
 src/redis-cli.c | 162 ++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 143 insertions(+), 19 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index ef917cca..456751f5 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -74,6 +74,13 @@
         (reconnectingRedisCommand(n->context, __VA_ARGS__))
 #define CLUSTER_MANAGER_NODE_INFO(n) (CLUSTER_MANAGER_COMMAND(n, "INFO"))
 
+#define CLUSTER_MANAGER_ERROR(err) do {             \
+    if (cluster_manager.errors == NULL)             \
+        cluster_manager.errors = listCreate();      \
+    listAddNodeTail(cluster_manager.errors, err);   \
+    fprintf(stderr, "%s\n", (char *) err);          \
+} while(0)
+
 #define CLUSTER_MANAGER_RESET_SLOTS(n) do { \
     memset(n->slots, 0, sizeof(n->slots));  \
     n->slots_count = 0;                     \
@@ -137,7 +144,14 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
 int *spectrum_palette;
 int spectrum_palette_size;
 
-/* Cluster Manager command info */
+/* Dict Helpers */
+
+static uint64_t dictSdsHash(const void *key);
+static int dictSdsKeyCompare(void *privdata, const void *key1, 
+    const void *key2);
+static void dictSdsDestructor(void *privdata, void *val);
+
+/* Cluster Manager Command Info */
 typedef struct clusterManagerCommand {
     char *name;
     int argc;
@@ -196,6 +210,7 @@ static struct config {
 
 static struct clusterManager {
     list *nodes; 
+    list *errors;
 } cluster_manager;
 
 typedef struct clusterManagerNode {
@@ -212,6 +227,10 @@ typedef struct clusterManagerNode {
     uint8_t slots[CLUSTER_MANAGER_SLOTS];
     int slots_count;
     list *friends;
+    sds *migrating;
+    sds *importing;
+    int migrating_count;
+    int importing_count;
 } clusterManagerNode;
 
 typedef struct clusterManagerNodeArray {
@@ -221,6 +240,15 @@ typedef struct clusterManagerNodeArray {
     int count;
 } clusterManagerNodeArray;
 
+static dictType clusterManagerDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    dictSdsDestructor          /* val destructor */
+};
+
 static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
 static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
 static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
@@ -1810,13 +1838,22 @@ static void freeClusterManagerNode(clusterManagerNode *node) {
     if (node->replicate != NULL) sdsfree(node->replicate);
     if ((node->flags & CLUSTER_MANAGER_FLAG_FRIEND) && node->ip)
         sdsfree(node->ip);
+    int i;
+    if (node->migrating != NULL) {
+        for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
+        zfree(node->migrating);
+    }
+    if (node->importing != NULL) {
+        for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
+        zfree(node->importing);
+    }
     zfree(node);
 }
 
 static void freeClusterManager(void) {
+    listIter li; 
+    listNode *ln;
     if (cluster_manager.nodes != NULL) {
-        listIter li; 
-        listNode *ln;
         listRewind(cluster_manager.nodes,&li);
         while ((ln = listNext(&li)) != NULL) {
             clusterManagerNode *n = ln->value; 
@@ -1825,9 +1862,18 @@ static void freeClusterManager(void) {
         listRelease(cluster_manager.nodes);
         cluster_manager.nodes = NULL;
     }
+    if (cluster_manager.errors != NULL) {
+        listRewind(cluster_manager.errors,&li);
+        while ((ln = listNext(&li)) != NULL) {
+            sds err = ln->value; 
+            sdsfree(err);
+        }
+        listRelease(cluster_manager.errors);
+        cluster_manager.errors = NULL;
+    }
 }
 
-static clusterManagerNode *clusterManagerNewNode(char * ip, int port) {
+static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
     clusterManagerNode *node = zmalloc(sizeof(*node));
     node->context = NULL;
     node->name = NULL;
@@ -1840,6 +1886,10 @@ static clusterManagerNode *clusterManagerNewNode(char * ip, int port) {
     node->replicate = NULL;
     node->dirty = 0;
     node->friends = NULL;
+    node->migrating = NULL;
+    node->importing = NULL;
+    node->migrating_count = 0;
+    node->importing_count = 0;
     CLUSTER_MANAGER_RESET_SLOTS(node);
     return node;
 }
@@ -1902,17 +1952,9 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
     int node_len = cluster_manager.nodes->len;
     *offending = zcalloc(node_len * sizeof(clusterManagerNode*));
     clusterManagerNode **offending_p = *offending;
-    dictType dtype = {
-        dictSdsHash,               /* hash function */
-        NULL,                      /* key dup */
-        NULL,                      /* val dup */
-        dictSdsKeyCompare,         /* key compare */
-        NULL,                      /* key destructor */
-        dictSdsDestructor          /* val destructor */
-    };
     for (i = 0; i < ip_len; i++) {
         clusterManagerNodeArray *node_array = &(ipnodes[i]);
-        dict *related = dictCreate(&dtype, NULL);
+        dict *related = dictCreate(&clusterManagerDictType, NULL);
         char *ip = NULL;
         for (j = 0; j < node_array->len; j++) {
             clusterManagerNode *node = node_array->nodes[j];
@@ -2291,7 +2333,32 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
                     if (remaining) line = p + 1;
                     else line = p;
                     if (slotsdef[0] == '[') {
-                        //TODO: migrating/importing
+                        slotsdef++;
+                        if ((p = strstr(slotsdef, "->-"))) { // Migrating
+                            *p = '\0';
+                            p += 3;
+                            sds slot = sdsnew(slotsdef);
+                            sds dst = sdsnew(p);
+                            node->migrating_count += 2;
+                            node->migrating = zrealloc(node->migrating, 
+                                (node->migrating_count * sizeof(sds)));
+                            node->migrating[node->migrating_count - 2] = 
+                                slot;
+                            node->migrating[node->migrating_count - 1] = 
+                                dst;
+                        }  else if ((p = strstr(slotsdef, "-<-"))) {//Importing
+                            *p = '\0';
+                            p += 3;
+                            sds slot = sdsnew(slotsdef);
+                            sds src = sdsnew(p);
+                            node->importing_count += 2;
+                            node->importing = zrealloc(node->importing, 
+                                (node->importing_count * sizeof(sds)));
+                            node->importing[node->importing_count - 2] = 
+                                slot;
+                            node->importing[node->importing_count - 1] = 
+                                src;
+                        } 
                     } else if ((p = strchr(slotsdef, '-')) != NULL) {
                         int start, stop;
                         *p = '\0';
@@ -2529,11 +2596,68 @@ static void clusterManagerCheckCluster(int quiet) {
     printf(">>> Performing Cluster Check (using node %s:%d)\n", 
             node->ip, node->port);
     if (!quiet) clusterManagerShowNodes();
-    if (!clusterManagerIsConfigConsistent())
-        printf("[ERR] Nodes don't agree about configuration!\n"); //TODO: in redis-trib this error is added to @errors array
-    else
-        printf("[OK] All nodes agree about slots configuration.\n");
-    //TODO:check_open_slots
+    if (!clusterManagerIsConfigConsistent()) {
+        sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
+        CLUSTER_MANAGER_ERROR(err);
+    } else printf("[OK] All nodes agree about slots configuration.\n");
+    // Check open slots
+    listIter li;
+    listRewind(cluster_manager.nodes, &li);
+    int i;
+    dict *open_slots = NULL;
+    while ((ln = listNext(&li)) != NULL) {
+        clusterManagerNode *n = ln->value; 
+        if (n->migrating != NULL) {
+            if (open_slots == NULL) 
+                open_slots = dictCreate(&clusterManagerDictType, NULL);
+            sds errstr = sdsempty();
+            errstr = sdscatprintf(errstr, 
+                                "[WARNING] Node %s:%d has slots in "
+                                "migrating state ",
+                                n->ip,
+                                n->port);
+            for (i = 0; i < n->migrating_count; i += 2) {
+                sds slot = n->migrating[i];
+                dictAdd(open_slots, slot, n->migrating[i + 1]);
+                char *fmt = (i > 0 ? ",%S" : "%S");
+                errstr = sdscatfmt(errstr, fmt, slot);
+            } 
+            errstr = sdscat(errstr, ".");
+            CLUSTER_MANAGER_ERROR(errstr);
+        }
+        if (n->importing != NULL) {
+            if (open_slots == NULL) 
+                open_slots = dictCreate(&clusterManagerDictType, NULL);
+            sds errstr = sdsempty();
+            errstr = sdscatprintf(errstr, 
+                                "[WARNING] Node %s:%d has slots in "
+                                "importing state ",
+                                n->ip,
+                                n->port);
+            for (i = 0; i < n->importing_count; i += 2) {
+                sds slot = n->importing[i];
+                dictAdd(open_slots, slot, n->importing[i + 1]);
+                char *fmt = (i > 0 ? ",%S" : "%S");
+                errstr = sdscatfmt(errstr, fmt, slot);
+            } 
+            errstr = sdscat(errstr, ".");
+            CLUSTER_MANAGER_ERROR(errstr);
+        }
+    }
+    if (open_slots != NULL) {
+        dictIterator *iter = dictGetIterator(open_slots);
+        dictEntry *entry;
+        sds errstr = sdsnew("[WARNING] The following slots are open: ");
+        i = 0;
+        while ((entry = dictNext(iter)) != NULL) {
+            sds slot = (sds) dictGetKey(entry);
+            char *fmt = (i++ > 0 ? ",%S" : "%S");
+            errstr = sdscatfmt(errstr, fmt, slot);
+        }
+        fprintf(stderr, "%s.\n", (char *) errstr);
+        sdsfree(errstr);
+        dictRelease(open_slots);
+    }
     //TODO:check_slots_coverage
 }