From fb020ab090f6059bc8dbcfae3d333f9997fba5c3 Mon Sep 17 00:00:00 2001
From: artix <artix2@gmail.com>
Date: Fri, 15 Feb 2019 20:37:18 +0100
Subject: [PATCH] Cluster Manager: 'backup' command

---
 src/redis-cli.c | 127 ++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 117 insertions(+), 10 deletions(-)

diff --git a/src/redis-cli.c b/src/redis-cli.c
index d9797284..dc4f8e96 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -177,6 +177,7 @@ typedef struct clusterManagerCommand {
     int timeout;
     int pipeline;
     float threshold;
+    char *backup_dir;
 } clusterManagerCommand;
 
 static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
@@ -2056,6 +2057,7 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv);
 static int clusterManagerCommandImport(int argc, char **argv);
 static int clusterManagerCommandCall(int argc, char **argv);
 static int clusterManagerCommandHelp(int argc, char **argv);
+static int clusterManagerCommandBackup(int argc, char **argv);
 
 typedef struct clusterManagerCommandDef {
     char *name;
@@ -2088,9 +2090,12 @@ clusterManagerCommandDef clusterManagerCommands[] = {
      "host:port milliseconds", NULL},
     {"import", clusterManagerCommandImport, 1, "host:port",
      "from <arg>,copy,replace"},
+    {"backup", clusterManagerCommandBackup, 2,  "host:port backup_directory",
+     NULL},
     {"help", clusterManagerCommandHelp, 0, NULL, NULL}
 };
 
+static void getRDB(clusterManagerNode *node);
 
 static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
     clusterManagerCommand *cmd = &config.cluster_manager_command;
@@ -2260,6 +2265,16 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
     return node;
 }
 
+static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) {
+    assert(config.cluster_manager_command.backup_dir);
+    sds filename = sdsnew(config.cluster_manager_command.backup_dir);
+    if (filename[sdslen(filename)] - 1 != '/')
+        filename = sdscat(filename, "/");
+    filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip,
+                            node->port, node->name);
+    return filename;
+}
+
 /* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
  * latest case, if the 'err' arg is not NULL, it gets allocated with a copy
  * of reply error (it's up to the caller function to free it), elsewhere
@@ -6053,6 +6068,84 @@ invalid_args:
     return 0;
 }
 
+static int clusterManagerCommandBackup(int argc, char **argv) {
+    UNUSED(argc);
+    int success = 1, port = 0;
+    char *ip = NULL;
+    if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
+    clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
+    if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
+    config.cluster_manager_command.backup_dir = argv[1];
+    /* TODO: check if backup_dir is a valid directory. */
+    sds json = sdsnew("[\n");
+    int first_node = 0;
+    listIter li;
+    listNode *ln;
+    listRewind(cluster_manager.nodes, &li);
+    while ((ln = listNext(&li)) != NULL) {
+        if (!first_node) first_node = 1;
+        else json = sdscat(json, ",\n");
+        clusterManagerNode *node = ln->value;
+        sds replicate = sdsempty();
+        if (node->replicate)
+            replicate = sdscatprintf(replicate, "\"%s\"", node->replicate);
+        else
+            replicate = sdscat(replicate, "null");
+        sds slots = clusterManagerNodeSlotsString(node);
+        sds flags = clusterManagerNodeFlagString(node);
+        char *p = slots;
+        while ((p = strchr(p, '-')) != NULL)
+            *(p++) = ',';
+        json = sdscatprintf(json,
+            "  {\n"
+            "    \"name\": \"%s\",\n"
+            "    \"host\": \"%s\",\n"
+            "    \"port\": %d,\n"
+            "    \"replicate\": %s,\n"
+            "    \"slots\": [%s],\n"
+            "    \"flags\": \"%s\",\n"
+            "    \"current_epoch\": %llu\n"
+            "  }",
+            node->name,
+            node->ip,
+            node->port,
+            replicate,
+            slots,
+            flags,
+            node->current_epoch
+        );
+        sdsfree(replicate);
+        sdsfree(slots);
+        sdsfree(flags);
+        if (node->replicate)
+            continue;
+        clusterManagerLogInfo(">>> Node %s:%d -> Saving RDB...\n",
+                              node->ip, node->port);
+        getRDB(node);
+    }
+    json = sdscat(json, "\n]");
+    sds jsonpath = sdsnew(config.cluster_manager_command.backup_dir);
+    if (jsonpath[sdslen(jsonpath) - 1] != '/')
+        jsonpath = sdscat(jsonpath, "/");
+    jsonpath = sdscat(jsonpath, "nodes.json");
+    /*printf("%s\n", json);*/
+    FILE *out = fopen(jsonpath, "w+");
+    if (!out) {
+        clusterManagerLogErr("Could not save nodes to:\n%s\n", jsonpath);
+        success = 0;
+        goto cleanup;
+    }
+    fputs(json, out);
+    fclose(out);
+cleanup:
+    sdsfree(json);
+    sdsfree(jsonpath);
+    return success;
+invalid_args:
+    fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
+    return 0;
+}
+
 static int clusterManagerCommandHelp(int argc, char **argv) {
     UNUSED(argc);
     UNUSED(argv);
@@ -6445,9 +6538,17 @@ static void slaveMode(void) {
 
 /* This function implements --rdb, so it uses the replication protocol in order
  * to fetch the RDB file from a remote server. */
-static void getRDB(void) {
-    int s = context->fd;
-    int fd;
+static void getRDB(clusterManagerNode *node) {
+    int s, fd;
+    char *filename;
+    if (node != NULL) {
+        assert(node->context);
+        s = node->context->fd;
+        filename = clusterManagerGetNodeRDBFilename(node);
+    } else {
+        s = context->fd;
+        filename = config.rdb_filename;
+    }
     static char eofmark[RDB_EOF_MARK_SIZE];
     static char lastbytes[RDB_EOF_MARK_SIZE];
     static int usemark = 0;
@@ -6458,20 +6559,20 @@ static void getRDB(void) {
         payload = ULLONG_MAX;
         memset(lastbytes,0,RDB_EOF_MARK_SIZE);
         usemark = 1;
-        fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
-            config.rdb_filename);
+        fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer "
+                "until EOF marker to '%s'\n", filename);
     } else {
         fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
-            payload, config.rdb_filename);
+            payload, filename);
     }
 
     /* Write to file. */
-    if (!strcmp(config.rdb_filename,"-")) {
+    if (!strcmp(filename,"-")) {
         fd = STDOUT_FILENO;
     } else {
-        fd = open(config.rdb_filename, O_CREAT|O_WRONLY, 0644);
+        fd = open(filename, O_CREAT|O_WRONLY, 0644);
         if (fd == -1) {
-            fprintf(stderr, "Error opening '%s': %s\n", config.rdb_filename,
+            fprintf(stderr, "Error opening '%s': %s\n", filename,
                 strerror(errno));
             exit(1);
         }
@@ -6517,6 +6618,11 @@ static void getRDB(void) {
     close(s); /* Close the file descriptor ASAP as fsync() may take time. */
     fsync(fd);
     close(fd);
+    fprintf(stderr,"Transfer finished with success.\n");
+    if (node) {
+        sdsfree(filename);
+        return;
+    }
     exit(0);
 }
 
@@ -7526,6 +7632,7 @@ int main(int argc, char **argv) {
     config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
     config.cluster_manager_command.threshold =
         CLUSTER_MANAGER_REBALANCE_THRESHOLD;
+    config.cluster_manager_command.backup_dir = NULL;
     pref.hints = 1;
 
     spectrum_palette = spectrum_palette_color;
@@ -7577,7 +7684,7 @@ int main(int argc, char **argv) {
     if (config.getrdb_mode) {
         if (cliConnect(0) == REDIS_ERR) exit(1);
         sendCapa();
-        getRDB();
+        getRDB(NULL);
     }
 
     /* Pipe mode */