Cluster Manager: 'backup' command

This commit is contained in:
artix 2019-02-15 20:37:18 +01:00
parent b19e8b9a2c
commit fb020ab090

View File

@ -177,6 +177,7 @@ typedef struct clusterManagerCommand {
int timeout;
int pipeline;
float threshold;
char *backup_dir;
} clusterManagerCommand;
static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
@ -2056,6 +2057,7 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv);
static int clusterManagerCommandImport(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv);
static int clusterManagerCommandBackup(int argc, char **argv);
typedef struct clusterManagerCommandDef {
char *name;
@ -2088,9 +2090,12 @@ clusterManagerCommandDef clusterManagerCommands[] = {
"host:port milliseconds", NULL},
{"import", clusterManagerCommandImport, 1, "host:port",
"from <arg>,copy,replace"},
{"backup", clusterManagerCommandBackup, 2, "host:port backup_directory",
NULL},
{"help", clusterManagerCommandHelp, 0, NULL, NULL}
};
static void getRDB(clusterManagerNode *node);
static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
clusterManagerCommand *cmd = &config.cluster_manager_command;
@ -2260,6 +2265,16 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
return node;
}
static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) {
assert(config.cluster_manager_command.backup_dir);
sds filename = sdsnew(config.cluster_manager_command.backup_dir);
if (filename[sdslen(filename)] - 1 != '/')
filename = sdscat(filename, "/");
filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip,
node->port, node->name);
return filename;
}
/* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
* latest case, if the 'err' arg is not NULL, it gets allocated with a copy
* of reply error (it's up to the caller function to free it), elsewhere
@ -6053,6 +6068,84 @@ invalid_args:
return 0;
}
static int clusterManagerCommandBackup(int argc, char **argv) {
UNUSED(argc);
int success = 1, port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
config.cluster_manager_command.backup_dir = argv[1];
/* TODO: check if backup_dir is a valid directory. */
sds json = sdsnew("[\n");
int first_node = 0;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
if (!first_node) first_node = 1;
else json = sdscat(json, ",\n");
clusterManagerNode *node = ln->value;
sds replicate = sdsempty();
if (node->replicate)
replicate = sdscatprintf(replicate, "\"%s\"", node->replicate);
else
replicate = sdscat(replicate, "null");
sds slots = clusterManagerNodeSlotsString(node);
sds flags = clusterManagerNodeFlagString(node);
char *p = slots;
while ((p = strchr(p, '-')) != NULL)
*(p++) = ',';
json = sdscatprintf(json,
" {\n"
" \"name\": \"%s\",\n"
" \"host\": \"%s\",\n"
" \"port\": %d,\n"
" \"replicate\": %s,\n"
" \"slots\": [%s],\n"
" \"flags\": \"%s\",\n"
" \"current_epoch\": %llu\n"
" }",
node->name,
node->ip,
node->port,
replicate,
slots,
flags,
node->current_epoch
);
sdsfree(replicate);
sdsfree(slots);
sdsfree(flags);
if (node->replicate)
continue;
clusterManagerLogInfo(">>> Node %s:%d -> Saving RDB...\n",
node->ip, node->port);
getRDB(node);
}
json = sdscat(json, "\n]");
sds jsonpath = sdsnew(config.cluster_manager_command.backup_dir);
if (jsonpath[sdslen(jsonpath) - 1] != '/')
jsonpath = sdscat(jsonpath, "/");
jsonpath = sdscat(jsonpath, "nodes.json");
/*printf("%s\n", json);*/
FILE *out = fopen(jsonpath, "w+");
if (!out) {
clusterManagerLogErr("Could not save nodes to:\n%s\n", jsonpath);
success = 0;
goto cleanup;
}
fputs(json, out);
fclose(out);
cleanup:
sdsfree(json);
sdsfree(jsonpath);
return success;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandHelp(int argc, char **argv) {
UNUSED(argc);
UNUSED(argv);
@ -6445,9 +6538,17 @@ static void slaveMode(void) {
/* This function implements --rdb, so it uses the replication protocol in order
* to fetch the RDB file from a remote server. */
static void getRDB(void) {
int s = context->fd;
int fd;
static void getRDB(clusterManagerNode *node) {
int s, fd;
char *filename;
if (node != NULL) {
assert(node->context);
s = node->context->fd;
filename = clusterManagerGetNodeRDBFilename(node);
} else {
s = context->fd;
filename = config.rdb_filename;
}
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
@ -6458,20 +6559,20 @@ static void getRDB(void) {
payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
config.rdb_filename);
fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer "
"until EOF marker to '%s'\n", filename);
} else {
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
payload, config.rdb_filename);
payload, filename);
}
/* Write to file. */
if (!strcmp(config.rdb_filename,"-")) {
if (!strcmp(filename,"-")) {
fd = STDOUT_FILENO;
} else {
fd = open(config.rdb_filename, O_CREAT|O_WRONLY, 0644);
fd = open(filename, O_CREAT|O_WRONLY, 0644);
if (fd == -1) {
fprintf(stderr, "Error opening '%s': %s\n", config.rdb_filename,
fprintf(stderr, "Error opening '%s': %s\n", filename,
strerror(errno));
exit(1);
}
@ -6517,6 +6618,11 @@ static void getRDB(void) {
close(s); /* Close the file descriptor ASAP as fsync() may take time. */
fsync(fd);
close(fd);
fprintf(stderr,"Transfer finished with success.\n");
if (node) {
sdsfree(filename);
return;
}
exit(0);
}
@ -7526,6 +7632,7 @@ int main(int argc, char **argv) {
config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
config.cluster_manager_command.threshold =
CLUSTER_MANAGER_REBALANCE_THRESHOLD;
config.cluster_manager_command.backup_dir = NULL;
pref.hints = 1;
spectrum_palette = spectrum_palette_color;
@ -7577,7 +7684,7 @@ int main(int argc, char **argv) {
if (config.getrdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
getRDB();
getRDB(NULL);
}
/* Pipe mode */