From 1f548359cba410f8423d8aba101c43bd9280e489 Mon Sep 17 00:00:00 2001
From: artix <artix2@gmail.com>
Date: Tue, 10 Apr 2018 16:25:25 +0200
Subject: [PATCH] Cluster Manager: import command

---
 src/Makefile    |   2 +-
 src/redis-cli.c | 216 +++++++++++++++++++++++++++++++++++++++++++-----
 2 files changed, 195 insertions(+), 23 deletions(-)

diff --git a/src/Makefile b/src/Makefile
index 14112aa1..269a7093 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -146,7 +146,7 @@ REDIS_SERVER_NAME=redis-server
 REDIS_SENTINEL_NAME=redis-sentinel
 REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o
 REDIS_CLI_NAME=redis-cli
-REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o
+REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
 REDIS_BENCHMARK_NAME=redis-benchmark
 REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o zmalloc.o redis-benchmark.o
 REDIS_CHECK_RDB_NAME=redis-check-rdb
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 8d5732c2..08a356eb 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -74,7 +74,7 @@
 #define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
 
 #define CLUSTER_MANAGER_INVALID_HOST_ARG \
-    "Invalid arguments: you need to pass either a valid " \
+    "[ERR] Invalid arguments: you need to pass either a valid " \
     "address (ie. 120.0.0.1:7000) or space separated IP " \
     "and port (ie. 120.0.0.1 7000)\n"
 #define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL)
@@ -115,7 +115,9 @@
 #define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS    1 << 3
 #define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER    1 << 4
 #define CLUSTER_MANAGER_CMD_FLAG_SIMULATE       1 << 5
-#define CLUSTER_MANAGER_CMD_FLAG_COLOR          1 << 7
+#define CLUSTER_MANAGER_CMD_FLAG_REPLACE        1 << 6
+#define CLUSTER_MANAGER_CMD_FLAG_COPY           1 << 7
+#define CLUSTER_MANAGER_CMD_FLAG_COLOR          1 << 8
 
 #define CLUSTER_MANAGER_OPT_GETFRIENDS  1 << 0
 #define CLUSTER_MANAGER_OPT_COLD        1 << 1
@@ -237,6 +239,8 @@ static long getLongInfoField(char *info, char *field);
  * Utility functions
  *--------------------------------------------------------------------------- */
 
+uint16_t crc16(const char *buf, int len);
+
 static long long ustime(void) {
     struct timeval tv;
     long long ust;
@@ -1325,6 +1329,12 @@ static int parseOptions(int argc, char **argv) {
         } else if (!strcmp(argv[i],"--cluster-simulate")) {
             config.cluster_manager_command.flags |= 
                 CLUSTER_MANAGER_CMD_FLAG_SIMULATE;  
+        } else if (!strcmp(argv[i],"--cluster-replace")) {
+            config.cluster_manager_command.flags |= 
+                CLUSTER_MANAGER_CMD_FLAG_REPLACE;  
+        } else if (!strcmp(argv[i],"--cluster-copy")) {
+            config.cluster_manager_command.flags |= 
+                CLUSTER_MANAGER_CMD_FLAG_COPY;  
         } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
             config.cluster_manager_command.flags |= 
                 CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;  
@@ -1870,6 +1880,7 @@ static int clusterManagerCommandCheck(int argc, char **argv);
 static int clusterManagerCommandFix(int argc, char **argv);
 static int clusterManagerCommandReshard(int argc, char **argv);
 static int clusterManagerCommandRebalance(int argc, char **argv);
+static int clusterManagerCommandImport(int argc, char **argv);
 static int clusterManagerCommandCall(int argc, char **argv);
 static int clusterManagerCommandHelp(int argc, char **argv);
 
@@ -1892,6 +1903,8 @@ clusterManagerCommandDef clusterManagerCommands[] = {
     {"rebalance", clusterManagerCommandRebalance, -1, "host:port", 
      "weight <node1=w1...nodeN=wN>,use-empty-masters,"
      "timeout <arg>,simulate,pipeline <arg>,threshold <arg>"}, 
+    {"import", clusterManagerCommandImport, 1, "host:port", 
+     "from <arg>,copy,replace"},
     {"call", clusterManagerCommandCall, -2, 
         "host:port command arg arg .. arg", NULL},
     {"help", clusterManagerCommandHelp, 0, NULL, NULL}
@@ -2383,6 +2396,37 @@ static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
     return slots;
 }
 
+/* -----------------------------------------------------------------------------
+ * Key space handling
+ * -------------------------------------------------------------------------- */
+
+/* We have 16384 hash slots. The hash slot of a given key is obtained
+ * as the least significant 14 bits of the crc16 of the key.
+ *
+ * However if the key contains the {...} pattern, only the part between
+ * { and } is hashed. This may be useful in the future to force certain
+ * keys to be in the same node (assuming no resharding is in progress). */
+static unsigned int keyHashSlot(char *key, int keylen) {
+    int s, e; /* start-end indexes of { and } */
+
+    for (s = 0; s < keylen; s++)
+        if (key[s] == '{') break;
+
+    /* No '{' ? Hash the whole key. This is the base case. */
+    if (s == keylen) return crc16(key,keylen) & 0x3FFF;
+
+    /* '{' found? Check if we have the corresponding '}'. */
+    for (e = s+1; e < keylen; e++)
+        if (key[e] == '}') break;
+
+    /* No '}' or nothing between {} ? Hash the whole key. */
+    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
+
+    /* If we are here there is both a { and a } on its right. Hash
+     * what is in the middle between { and }. */
+    return crc16(key+s+1,e-s-1) & 0x3FFF;
+}
+
 static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
     sds info = sdsempty();
     sds spaces = sdsempty();
@@ -3533,8 +3577,8 @@ static int clusterManagerFixOpenSlot(int slot) {
         }
 
         // Use ADDSLOTS to assign the slot.
-        printf("*** Configuring %s:%d as the slot owner\n", owner->ip, 
-               owner->port);
+        clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", 
+                              owner->ip, owner->port);
         redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
                                                     "SETSLOT %d %s", 
                                                     slot, "STABLE");
@@ -4527,7 +4571,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
         if (over_threshold) threshold_reached = 1;
     }
     if (!threshold_reached) {
-        clusterManagerLogErr("*** No rebalancing needed! "
+        clusterManagerLogWarn("*** No rebalancing needed! "
                              "All nodes are within the %.2f%% threshold.\n",
                              config.cluster_manager_command.threshold);
         result = 0;
@@ -4586,7 +4630,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
             listRelease(lsrc);
             int table_len = (int) listLength(table);
             if (!table || table_len != numslots) {
-                clusterManagerLogErr("*** Assertio failed: Reshard table "
+                clusterManagerLogErr("*** Assertion failed: Reshard table "
                                      "!= number of slots");
                 result = 0;
                 goto end_move;
@@ -4629,23 +4673,148 @@ invalid_args:
     return 0;
 }
 
-static int clusterManagerCommandCall(int argc, char **argv) {
-    int port = 0;
-    char *ip = NULL;
-    char *addr = argv[0];
-    char *c = strrchr(addr, '@');
-    int i;
-    if (c != NULL) *c = '\0';
-    c = strrchr(addr, ':');
-    if (c != NULL) {
-        *c = '\0';
-        ip = addr;
-        port = atoi(++c);
-    } else {
-        fprintf(stderr, 
-                "Invalid arguments: first agrumnt must be host:port.\n");
-        return 0;
+static int clusterManagerCommandImport(int argc, char **argv) {
+    int success = 1;
+    int port = 0, src_port = 0;
+    char *ip = NULL, *src_ip = NULL;
+    char *invalid_args_msg = NULL;
+    if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) {
+        invalid_args_msg = CLUSTER_MANAGER_INVALID_HOST_ARG;
+        goto invalid_args;
     }
+    if (config.cluster_manager_command.from == NULL) {
+        invalid_args_msg = "[ERR] Option '--cluster-from' is required for "
+                           "subcommand 'import'.\n";
+        goto invalid_args;
+    }
+    char *src_host[] = {config.cluster_manager_command.from};
+    if (!getClusterHostFromCmdArgs(1, src_host, &src_ip, &src_port)) {
+        invalid_args_msg = "[ERR] Invalid --cluster-from host. You need to "
+                           "pass a valid address (ie. 120.0.0.1:7000).\n";
+        goto invalid_args;
+    }
+    clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n", 
+                          src_ip, src_port, ip, port);
+
+    clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
+    if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
+    char *reply_err = NULL;
+    redisReply *src_reply = NULL;
+    // Connect to the source node.
+    redisContext *src_ctx = redisConnect(src_ip, src_port);
+    if (src_ctx->err) {
+        success = 0;
+        fprintf(stderr,"Could not connect to Redis at %s:%d: %s.\n", src_ip, 
+                src_port, src_ctx->errstr);
+        goto cleanup;
+    }
+    src_reply = reconnectingRedisCommand(src_ctx, "INFO");
+    if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
+        if (src_reply && src_reply->str) reply_err = src_reply->str;
+        success = 0;
+        goto cleanup;
+    }
+    if (getLongInfoField(src_reply->str, "cluster_enabled")) {
+        clusterManagerLogErr("[ERR] The source node should not be a "
+                             "cluster node.\n"); 
+        success = 0;
+        goto cleanup;
+    }
+    freeReplyObject(src_reply);
+    src_reply = reconnectingRedisCommand(src_ctx, "DBSIZE");
+    if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
+        if (src_reply && src_reply->str) reply_err = src_reply->str;
+        success = 0;
+        goto cleanup;
+    }
+    int size = src_reply->integer, i;
+    clusterManagerLogWarn("*** Importing %d keys from DB 0\n", size);
+
+    // Build a slot -> node map
+    clusterManagerNode  *slots_map[CLUSTER_MANAGER_SLOTS];
+    memset(slots_map, 0, sizeof(slots_map) / sizeof(clusterManagerNode *));
+    listIter li;
+    listNode *ln;
+    for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
+        listRewind(cluster_manager.nodes, &li);
+        while ((ln = listNext(&li)) != NULL) {
+            clusterManagerNode *n = ln->value;
+            if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
+            if (n->slots_count == 0) continue;
+            if (n->slots[i]) {
+                slots_map[i] = n;
+                break;
+            }
+        } 
+    }
+
+    char cmdfmt[50] = "MIGRATE %s %d %s %d %d";
+    if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COPY)
+        strcat(cmdfmt, " %s");
+    if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_REPLACE)
+        strcat(cmdfmt, " %s");
+
+    /* Use SCAN to iterate over the keys, migrating to the
+     * right node as needed. */
+    int cursor = -999, timeout = config.cluster_manager_command.timeout;
+    while (cursor != 0) {
+        if (cursor < 0) cursor = 0;
+        freeReplyObject(src_reply);
+        src_reply = reconnectingRedisCommand(src_ctx, "SCAN %d COUNT %d", 
+                                             cursor, 1000);
+        if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
+            if (src_reply && src_reply->str) reply_err = src_reply->str;
+            success = 0;
+            goto cleanup;
+        }
+        assert(src_reply->type == REDIS_REPLY_ARRAY);
+        assert(src_reply->elements >= 2);
+        assert(src_reply->element[1]->type == REDIS_REPLY_ARRAY);
+        if (src_reply->element[0]->type == REDIS_REPLY_STRING) 
+            cursor = atoi(src_reply->element[0]->str);
+        else if (src_reply->element[0]->type == REDIS_REPLY_INTEGER)
+            cursor = src_reply->element[0]->integer;
+        int keycount = src_reply->element[1]->elements;
+        for (i = 0; i < keycount; i++) {
+            redisReply *kr = src_reply->element[1]->element[i];
+            assert(kr->type == REDIS_REPLY_STRING);
+            char *key = kr->str;
+            uint16_t slot = keyHashSlot(key, kr->len);
+            clusterManagerNode *target = slots_map[slot];
+            printf("Migrating %s to %s:%d: ", key, target->ip, target->port);
+            redisReply *r = reconnectingRedisCommand(src_ctx, cmdfmt, 
+                                                     target->ip, target->port, 
+                                                     key, 0, timeout, 
+                                                     "COPY", "REPLACE");
+            if (!r || r->type == REDIS_REPLY_ERROR) {
+                if (r && r->str) {
+                    clusterManagerLogErr("Source %s:%d replied with "
+                                         "error:\n%s\n", src_ip, src_port, 
+                                         r->str);
+                }
+                success = 0;
+            }
+            freeReplyObject(r);
+            if (!success) goto cleanup;
+            clusterManagerLogOk("OK\n");
+        }
+    }
+cleanup:
+    if (reply_err) 
+        clusterManagerLogErr("Source %s:%d replied with error:\n%s\n", 
+                             src_ip, src_port, reply_err);
+    if (src_ctx) redisFree(src_ctx);
+    if (src_reply) freeReplyObject(src_reply);
+    return success;
+invalid_args:
+    fprintf(stderr, "%s", invalid_args_msg);
+    return 0;
+}
+
+static int clusterManagerCommandCall(int argc, char **argv) {
+    int port = 0, i;
+    char *ip = NULL;
+    if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
     clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
     if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
     argc--;
@@ -4677,6 +4846,9 @@ static int clusterManagerCommandCall(int argc, char **argv) {
     }
     zfree(argvlen);
     return 1;
+invalid_args:
+    fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
+    return 0;
 }
 
 static int clusterManagerCommandHelp(int argc, char **argv) {