From d512a09c201a9aa2e1c233b30a9732249e9e2cce Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Mar 2013 16:35:16 +0100 Subject: [PATCH] Cluster: a bit more serious node role change handling. --- src/cluster.c | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 78a8b863..7424f2d9 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -48,6 +48,7 @@ clusterNode *clusterLookupNode(char *name); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); +int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -882,14 +883,31 @@ int clusterProcessPacket(clusterLink *link) { if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME, sizeof(hdr->slaveof))) { - if (sender->slaveof) + /* Node is a master. */ + if (sender->flags & REDIS_NODE_SLAVE && + sender->slaveof != NULL) + { + /* If the node changed role and is now a master, remove + * it from the list of slaves of its old master. */ clusterNodeRemoveSlave(sender->slaveof,sender); + update_state = 1; + update_config = 1; + } sender->flags &= ~REDIS_NODE_SLAVE; sender->flags |= REDIS_NODE_MASTER; sender->slaveof = NULL; } else { + /* Node is a slave. */ clusterNode *master = clusterLookupNode(hdr->slaveof); + if (sender->flags & REDIS_NODE_MASTER) { + /* If the node changed role and is now a slave, clear all + * its slots as them are no longer served. */ + clusterDelNodeSlots(sender); + update_state = 1; + update_config = 1; + } + sender->flags &= ~REDIS_NODE_MASTER; sender->flags |= REDIS_NODE_SLAVE; if (sender->numslaves) clusterNodeResetSlaves(sender); @@ -1629,6 +1647,18 @@ int clusterDelSlot(int slot) { return REDIS_OK; } +/* Delete all the slots associated with the specified node. + * The number of deleted slots is returned. */ +int clusterDelNodeSlots(clusterNode *node) { + int deleted = 0, j; + + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { + if (clusterNodeGetSlotBit(node,j)) clusterDelSlot(j); + deleted++; + } + return deleted; +} + /* ----------------------------------------------------------------------------- * Cluster state evaluation function * -------------------------------------------------------------------------- */