Added check for open slots (clusterManagerCheckCluster)

This commit is contained in:
artix 2018-01-31 17:57:16 +01:00
parent 486c7af7b8
commit 8c7ad80f9f

View File

@ -74,6 +74,13 @@
(reconnectingRedisCommand(n->context, __VA_ARGS__)) (reconnectingRedisCommand(n->context, __VA_ARGS__))
#define CLUSTER_MANAGER_NODE_INFO(n) (CLUSTER_MANAGER_COMMAND(n, "INFO")) #define CLUSTER_MANAGER_NODE_INFO(n) (CLUSTER_MANAGER_COMMAND(n, "INFO"))
#define CLUSTER_MANAGER_ERROR(err) do { \
if (cluster_manager.errors == NULL) \
cluster_manager.errors = listCreate(); \
listAddNodeTail(cluster_manager.errors, err); \
fprintf(stderr, "%s\n", (char *) err); \
} while(0)
#define CLUSTER_MANAGER_RESET_SLOTS(n) do { \ #define CLUSTER_MANAGER_RESET_SLOTS(n) do { \
memset(n->slots, 0, sizeof(n->slots)); \ memset(n->slots, 0, sizeof(n->slots)); \
n->slots_count = 0; \ n->slots_count = 0; \
@ -137,7 +144,14 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette; int *spectrum_palette;
int spectrum_palette_size; int spectrum_palette_size;
/* Cluster Manager command info */ /* Dict Helpers */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(void *privdata, const void *key1,
const void *key2);
static void dictSdsDestructor(void *privdata, void *val);
/* Cluster Manager Command Info */
typedef struct clusterManagerCommand { typedef struct clusterManagerCommand {
char *name; char *name;
int argc; int argc;
@ -196,6 +210,7 @@ static struct config {
static struct clusterManager { static struct clusterManager {
list *nodes; list *nodes;
list *errors;
} cluster_manager; } cluster_manager;
typedef struct clusterManagerNode { typedef struct clusterManagerNode {
@ -212,6 +227,10 @@ typedef struct clusterManagerNode {
uint8_t slots[CLUSTER_MANAGER_SLOTS]; uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count; int slots_count;
list *friends; list *friends;
sds *migrating;
sds *importing;
int migrating_count;
int importing_count;
} clusterManagerNode; } clusterManagerNode;
typedef struct clusterManagerNodeArray { typedef struct clusterManagerNodeArray {
@ -221,6 +240,15 @@ typedef struct clusterManagerNodeArray {
int count; int count;
} clusterManagerNodeArray; } clusterManagerNodeArray;
static dictType clusterManagerDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictSdsDestructor /* val destructor */
};
static clusterManagerNode *clusterManagerNewNode(char *ip, int port); static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err); static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
@ -1810,13 +1838,22 @@ static void freeClusterManagerNode(clusterManagerNode *node) {
if (node->replicate != NULL) sdsfree(node->replicate); if (node->replicate != NULL) sdsfree(node->replicate);
if ((node->flags & CLUSTER_MANAGER_FLAG_FRIEND) && node->ip) if ((node->flags & CLUSTER_MANAGER_FLAG_FRIEND) && node->ip)
sdsfree(node->ip); sdsfree(node->ip);
int i;
if (node->migrating != NULL) {
for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
zfree(node->migrating);
}
if (node->importing != NULL) {
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
zfree(node->importing);
}
zfree(node); zfree(node);
} }
static void freeClusterManager(void) { static void freeClusterManager(void) {
listIter li;
listNode *ln;
if (cluster_manager.nodes != NULL) { if (cluster_manager.nodes != NULL) {
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes,&li); listRewind(cluster_manager.nodes,&li);
while ((ln = listNext(&li)) != NULL) { while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value; clusterManagerNode *n = ln->value;
@ -1825,9 +1862,18 @@ static void freeClusterManager(void) {
listRelease(cluster_manager.nodes); listRelease(cluster_manager.nodes);
cluster_manager.nodes = NULL; cluster_manager.nodes = NULL;
} }
if (cluster_manager.errors != NULL) {
listRewind(cluster_manager.errors,&li);
while ((ln = listNext(&li)) != NULL) {
sds err = ln->value;
sdsfree(err);
}
listRelease(cluster_manager.errors);
cluster_manager.errors = NULL;
}
} }
static clusterManagerNode *clusterManagerNewNode(char * ip, int port) { static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
clusterManagerNode *node = zmalloc(sizeof(*node)); clusterManagerNode *node = zmalloc(sizeof(*node));
node->context = NULL; node->context = NULL;
node->name = NULL; node->name = NULL;
@ -1840,6 +1886,10 @@ static clusterManagerNode *clusterManagerNewNode(char * ip, int port) {
node->replicate = NULL; node->replicate = NULL;
node->dirty = 0; node->dirty = 0;
node->friends = NULL; node->friends = NULL;
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
node->importing_count = 0;
CLUSTER_MANAGER_RESET_SLOTS(node); CLUSTER_MANAGER_RESET_SLOTS(node);
return node; return node;
} }
@ -1902,17 +1952,9 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
int node_len = cluster_manager.nodes->len; int node_len = cluster_manager.nodes->len;
*offending = zcalloc(node_len * sizeof(clusterManagerNode*)); *offending = zcalloc(node_len * sizeof(clusterManagerNode*));
clusterManagerNode **offending_p = *offending; clusterManagerNode **offending_p = *offending;
dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictSdsDestructor /* val destructor */
};
for (i = 0; i < ip_len; i++) { for (i = 0; i < ip_len; i++) {
clusterManagerNodeArray *node_array = &(ipnodes[i]); clusterManagerNodeArray *node_array = &(ipnodes[i]);
dict *related = dictCreate(&dtype, NULL); dict *related = dictCreate(&clusterManagerDictType, NULL);
char *ip = NULL; char *ip = NULL;
for (j = 0; j < node_array->len; j++) { for (j = 0; j < node_array->len; j++) {
clusterManagerNode *node = node_array->nodes[j]; clusterManagerNode *node = node_array->nodes[j];
@ -2291,7 +2333,32 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
if (remaining) line = p + 1; if (remaining) line = p + 1;
else line = p; else line = p;
if (slotsdef[0] == '[') { if (slotsdef[0] == '[') {
//TODO: migrating/importing slotsdef++;
if ((p = strstr(slotsdef, "->-"))) { // Migrating
*p = '\0';
p += 3;
sds slot = sdsnew(slotsdef);
sds dst = sdsnew(p);
node->migrating_count += 2;
node->migrating = zrealloc(node->migrating,
(node->migrating_count * sizeof(sds)));
node->migrating[node->migrating_count - 2] =
slot;
node->migrating[node->migrating_count - 1] =
dst;
} else if ((p = strstr(slotsdef, "-<-"))) {//Importing
*p = '\0';
p += 3;
sds slot = sdsnew(slotsdef);
sds src = sdsnew(p);
node->importing_count += 2;
node->importing = zrealloc(node->importing,
(node->importing_count * sizeof(sds)));
node->importing[node->importing_count - 2] =
slot;
node->importing[node->importing_count - 1] =
src;
}
} else if ((p = strchr(slotsdef, '-')) != NULL) { } else if ((p = strchr(slotsdef, '-')) != NULL) {
int start, stop; int start, stop;
*p = '\0'; *p = '\0';
@ -2529,11 +2596,68 @@ static void clusterManagerCheckCluster(int quiet) {
printf(">>> Performing Cluster Check (using node %s:%d)\n", printf(">>> Performing Cluster Check (using node %s:%d)\n",
node->ip, node->port); node->ip, node->port);
if (!quiet) clusterManagerShowNodes(); if (!quiet) clusterManagerShowNodes();
if (!clusterManagerIsConfigConsistent()) if (!clusterManagerIsConfigConsistent()) {
printf("[ERR] Nodes don't agree about configuration!\n"); //TODO: in redis-trib this error is added to @errors array sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
else CLUSTER_MANAGER_ERROR(err);
printf("[OK] All nodes agree about slots configuration.\n"); } else printf("[OK] All nodes agree about slots configuration.\n");
//TODO:check_open_slots // Check open slots
listIter li;
listRewind(cluster_manager.nodes, &li);
int i;
dict *open_slots = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->migrating != NULL) {
if (open_slots == NULL)
open_slots = dictCreate(&clusterManagerDictType, NULL);
sds errstr = sdsempty();
errstr = sdscatprintf(errstr,
"[WARNING] Node %s:%d has slots in "
"migrating state ",
n->ip,
n->port);
for (i = 0; i < n->migrating_count; i += 2) {
sds slot = n->migrating[i];
dictAdd(open_slots, slot, n->migrating[i + 1]);
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
errstr = sdscat(errstr, ".");
CLUSTER_MANAGER_ERROR(errstr);
}
if (n->importing != NULL) {
if (open_slots == NULL)
open_slots = dictCreate(&clusterManagerDictType, NULL);
sds errstr = sdsempty();
errstr = sdscatprintf(errstr,
"[WARNING] Node %s:%d has slots in "
"importing state ",
n->ip,
n->port);
for (i = 0; i < n->importing_count; i += 2) {
sds slot = n->importing[i];
dictAdd(open_slots, slot, n->importing[i + 1]);
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
errstr = sdscat(errstr, ".");
CLUSTER_MANAGER_ERROR(errstr);
}
}
if (open_slots != NULL) {
dictIterator *iter = dictGetIterator(open_slots);
dictEntry *entry;
sds errstr = sdsnew("[WARNING] The following slots are open: ");
i = 0;
while ((entry = dictNext(iter)) != NULL) {
sds slot = (sds) dictGetKey(entry);
char *fmt = (i++ > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
fprintf(stderr, "%s.\n", (char *) errstr);
sdsfree(errstr);
dictRelease(open_slots);
}
//TODO:check_slots_coverage //TODO:check_slots_coverage
} }