diff --git a/src/cluster.c b/src/cluster.c index a5f682ba..4ff8fb4d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -74,27 +74,13 @@ void clusterCloseAllSlots(void); void clusterSetNodeAsMaster(clusterNode *n); void clusterDelNode(clusterNode *delnode); sds representRedisNodeFlags(sds ci, uint16_t flags); +uint64_t clusterGetMaxEpoch(void); +int clusterBumpConfigEpochWithoutConsensus(void); /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ -/* Return the greatest configEpoch found in the cluster. */ -uint64_t clusterGetMaxEpoch(void) { - uint64_t max = 0; - dictIterator *di; - dictEntry *de; - - di = dictGetSafeIterator(server.cluster->nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - if (node->configEpoch > max) max = node->configEpoch; - } - dictReleaseIterator(di); - if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; - return max; -} - /* Load the cluster config from 'filename'. * * If the file does not exist or is zero-length (this may happen because @@ -927,6 +913,137 @@ void clusterRenameNode(clusterNode *node, char *newname) { clusterAddNode(node); } +/* ----------------------------------------------------------------------------- + * CLUSTER config epoch handling + * -------------------------------------------------------------------------- */ + +/* Return the greatest configEpoch found in the cluster. */ +uint64_t clusterGetMaxEpoch(void) { + uint64_t max = 0; + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->configEpoch > max) max = node->configEpoch; + } + dictReleaseIterator(di); + if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; + return max; +} + +/* If this node epoch is zero or is not already the greatest across the + * cluster (from the POV of the local configuration), this function will: + * + * 1) Generate a new config epoch increment the current epoch. + * 2) Assign the new epoch to this node, WITHOUT any consensus. + * 3) Persist the configuration on disk before sending packets with the + * new configuration. + * + * If the new config epoch is generated and assigend, REDIS_OK is returned, + * otherwise REDIS_ERR is returned (since the node has already the greatest + * configuration around) and no operation is performed. + * + * Important note: this function violates the principle that config epochs + * should be generated with consensus and should be unique across the cluster. + * However Redis Cluster uses this auto-generated new config epochs in two + * cases: + * + * 1) When slots are closed after importing. Otherwise resharding would be + * too exansive. + * 2) When CLUSTER FAILOVER is called with options that force a slave to + * failover its master even if there is not master majority able to + * create a new configuration epoch. + * + * Redis Cluster does not explode using this function, even in the case of + * a collision between this node and another node, generating the same + * configuration epoch unilaterally, because the config epoch conflict + * resolution algorithm will eventually move colliding nodes to different + * config epochs. However usign this function may violate the "last failover + * wins" rule, so should only be used with care. */ +int clusterBumpConfigEpochWithoutConsensus(void) { + uint64_t maxEpoch = clusterGetMaxEpoch(); + + if (myself->configEpoch == 0 || + myself->configEpoch != maxEpoch) + { + server.cluster->currentEpoch++; + myself->configEpoch = server.cluster->currentEpoch; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_FSYNC_CONFIG); + redisLog(REDIS_WARNING, + "New configEpoch set to %llu", + (unsigned long long) myself->configEpoch); + return REDIS_OK; + } else { + return REDIS_ERR; + } +} + +/* This function is called when this node is a master, and we receive from + * another master a configuration epoch that is equal to our configuration + * epoch. + * + * BACKGROUND + * + * It is not possible that different slaves get the same config + * epoch during a failover election, because the slaves need to get voted + * by a majority. However when we perform a manual resharding of the cluster + * the node will assign a configuration epoch to itself without to ask + * for agreement. Usually resharding happens when the cluster is working well + * and is supervised by the sysadmin, however it is possible for a failover + * to happen exactly while the node we are resharding a slot to assigns itself + * a new configuration epoch, but before it is able to propagate it. + * + * So technically it is possible in this condition that two nodes end with + * the same configuration epoch. + * + * Another possibility is that there are bugs in the implementation causing + * this to happen. + * + * Moreover when a new cluster is created, all the nodes start with the same + * configEpoch. This collision resolution code allows nodes to automatically + * end with a different configEpoch at startup automatically. + * + * In all the cases, we want a mechanism that resolves this issue automatically + * as a safeguard. The same configuration epoch for masters serving different + * set of slots is not harmful, but it is if the nodes end serving the same + * slots for some reason (manual errors or software bugs) without a proper + * failover procedure. + * + * In general we want a system that eventually always ends with different + * masters having different configuration epochs whatever happened, since + * nothign is worse than a split-brain condition in a distributed system. + * + * BEHAVIOR + * + * When this function gets called, what happens is that if this node + * has the lexicographically smaller Node ID compared to the other node + * with the conflicting epoch (the 'sender' node), it will assign itself + * the greatest configuration epoch currently detected among nodes plus 1. + * + * This means that even if there are multiple nodes colliding, the node + * with the greatest Node ID never moves forward, so eventually all the nodes + * end with a different configuration epoch. + */ +void clusterHandleConfigEpochCollision(clusterNode *sender) { + /* Prerequisites: nodes have the same configEpoch and are both masters. */ + if (sender->configEpoch != myself->configEpoch || + !nodeIsMaster(sender) || !nodeIsMaster(myself)) return; + /* Don't act if the colliding node has a smaller Node ID. */ + if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return; + /* Get the next ID available at the best of this node knowledge. */ + server.cluster->currentEpoch++; + myself->configEpoch = server.cluster->currentEpoch; + clusterSaveConfigOrDie(1); + redisLog(REDIS_VERBOSE, + "WARNING: configEpoch collision with node %.40s." + " configEpoch set to %llu", + sender->name, + (unsigned long long) myself->configEpoch); +} + /* ----------------------------------------------------------------------------- * CLUSTER nodes blacklist * @@ -1399,69 +1516,6 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc } } -/* This function is called when this node is a master, and we receive from - * another master a configuration epoch that is equal to our configuration - * epoch. - * - * BACKGROUND - * - * It is not possible that different slaves get the same config - * epoch during a failover election, because the slaves need to get voted - * by a majority. However when we perform a manual resharding of the cluster - * the node will assign a configuration epoch to itself without to ask - * for agreement. Usually resharding happens when the cluster is working well - * and is supervised by the sysadmin, however it is possible for a failover - * to happen exactly while the node we are resharding a slot to assigns itself - * a new configuration epoch, but before it is able to propagate it. - * - * So technically it is possible in this condition that two nodes end with - * the same configuration epoch. - * - * Another possibility is that there are bugs in the implementation causing - * this to happen. - * - * Moreover when a new cluster is created, all the nodes start with the same - * configEpoch. This collision resolution code allows nodes to automatically - * end with a different configEpoch at startup automatically. - * - * In all the cases, we want a mechanism that resolves this issue automatically - * as a safeguard. The same configuration epoch for masters serving different - * set of slots is not harmful, but it is if the nodes end serving the same - * slots for some reason (manual errors or software bugs) without a proper - * failover procedure. - * - * In general we want a system that eventually always ends with different - * masters having different configuration epochs whatever happened, since - * nothign is worse than a split-brain condition in a distributed system. - * - * BEHAVIOR - * - * When this function gets called, what happens is that if this node - * has the lexicographically smaller Node ID compared to the other node - * with the conflicting epoch (the 'sender' node), it will assign itself - * the greatest configuration epoch currently detected among nodes plus 1. - * - * This means that even if there are multiple nodes colliding, the node - * with the greatest Node ID never moves forward, so eventually all the nodes - * end with a different configuration epoch. - */ -void clusterHandleConfigEpochCollision(clusterNode *sender) { - /* Prerequisites: nodes have the same configEpoch and are both masters. */ - if (sender->configEpoch != myself->configEpoch || - !nodeIsMaster(sender) || !nodeIsMaster(myself)) return; - /* Don't act if the colliding node has a smaller Node ID. */ - if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return; - /* Get the next ID available at the best of this node knowledge. */ - server.cluster->currentEpoch++; - myself->configEpoch = server.cluster->currentEpoch; - clusterSaveConfigOrDie(1); - redisLog(REDIS_VERBOSE, - "WARNING: configEpoch collision with node %.40s." - " configEpoch set to %llu", - sender->name, - (unsigned long long) myself->configEpoch); -} - /* When this function is called, there is a packet to process starting * at node->rcvbuf. Releasing the buffer is up to the caller, so this * function should just handle the higher level stuff of processing the @@ -3547,30 +3601,28 @@ sds representRedisNodeFlags(sds ci, uint16_t flags) { /* Generate a csv-alike representation of the specified cluster node. * See clusterGenNodesDescription() top comment for more information. * - * The function appends the node representation to the SDS string 'ci' and - * returns it (that may point to a different string as usually with the - * SDS-style API). */ -sds clusterGenNodeDescription(sds ci, clusterNode *node) { + * The function returns the string representation as an SDS string. */ +sds clusterGenNodeDescription(clusterNode *node) { int j, start; + sds ci; /* Node coordinates */ - ci = sdscatlen(ci,node->name,40); - ci = sdscatfmt(ci," %s:%i ",node->ip,node->port); + ci = sdscatprintf(sdsempty(),"%.40s %s:%d ", + node->name, + node->ip, + node->port); /* Flags */ ci = representRedisNodeFlags(ci, node->flags); /* Slave of... or just "-" */ - if (node->slaveof) { - ci = sdscatlen(ci," ",1); - ci = sdscatlen(ci,node->slaveof->name,40); - ci = sdscatlen(ci," ",1); - } else { + if (node->slaveof) + ci = sdscatprintf(ci," %.40s ",node->slaveof->name); + else ci = sdscatlen(ci," - ",3); - } /* Latency from the POV of this node, link status */ - ci = sdscatfmt(ci,"%I %I %U %s", + ci = sdscatprintf(ci,"%lld %lld %llu %s", (long long) node->ping_sent, (long long) node->pong_received, (unsigned long long) node->configEpoch, @@ -3582,19 +3634,6 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) { for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { int bit; - /* It is common for a node to have pretty contiguous slots, so - * optimize this loop by skipping whole 32bit words if they have - * no set bits. We stop to the penultimate word because last word - * has special handling when start != -1 (later in the loop). */ - if ((j&31)==0 && j < REDIS_CLUSTER_SLOTS-32) { - uint32_t *slotword = ((uint32_t*)node->slots)+(j/32); - if ((start == -1 && *slotword == 0) || - (start != -1 && *slotword == UINT32_MAX)) { - j += 31; /* The for loop will increment j one more time. */ - continue; - } - } - if ((bit = clusterNodeGetSlotBit(node,j)) != 0) { if (start == -1) start = j; } @@ -3640,19 +3679,18 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) { * of the CLUSTER NODES function, and as format for the cluster * configuration file (nodes.conf) for a given node. */ sds clusterGenNodesDescription(int filter) { - sds ci = sdsempty(); + sds ci = sdsempty(), ni; dictIterator *di; dictEntry *de; - /* Make room to avoid multiple resizes of the buffer. */ - ci = sdsMakeRoomFor(ci,256*dictSize(server.cluster->nodes)); - di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); if (node->flags & filter) continue; - ci = clusterGenNodeDescription(ci,node); + ni = clusterGenNodeDescription(node); + ci = sdscatsds(ci,ni); + sdsfree(ni); ci = sdscatlen(ci,"\n",1); } dictReleaseIterator(di); @@ -3918,17 +3956,9 @@ void clusterCommand(redisClient *c) { * failover happens at the same time we close the slot, the * configEpoch collision resolution will fix it assigning * a different epoch to each node. */ - uint64_t maxEpoch = clusterGetMaxEpoch(); - - if (myself->configEpoch == 0 || - myself->configEpoch != maxEpoch) - { - server.cluster->currentEpoch++; - myself->configEpoch = server.cluster->currentEpoch; - clusterDoBeforeSleep(CLUSTER_TODO_FSYNC_CONFIG); + if (clusterBumpConfigEpochWithoutConsensus() == REDIS_OK) { redisLog(REDIS_WARNING, - "configEpoch set to %llu after importing slot %d", - (unsigned long long) myself->configEpoch, slot); + "configEpoch updated after importing slot %d", slot); } server.cluster->importing_slots_from[slot] = NULL; } @@ -3989,7 +4019,10 @@ void clusterCommand(redisClient *c) { server.cluster->stats_bus_messages_sent, server.cluster->stats_bus_messages_received ); - addReplyBulkSds(c, info); + addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", + (unsigned long)sdslen(info))); + addReplySds(c,info); + addReply(c,shared.crlf); } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { int retval = clusterSaveConfig(1); @@ -4109,7 +4142,7 @@ void clusterCommand(redisClient *c) { addReplyMultiBulkLen(c,n->numslaves); for (j = 0; j < n->numslaves; j++) { - sds ni = clusterGenNodeDescription(sdsempty(),n->slaves[j]); + sds ni = clusterGenNodeDescription(n->slaves[j]); addReplyBulkCString(c,ni); sdsfree(ni); } @@ -4526,7 +4559,7 @@ try_again: /* Check if the key is here. If not we reply with success as there is * nothing to migrate (for instance the key expired in the meantime), but * we include such information in the reply string. */ - if ((o = lookupKeyWrite(c->db,c->argv[3])) == NULL) { + if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { addReplySds(c,sdsnew("+NOKEY\r\n")); return; } @@ -4579,7 +4612,7 @@ try_again: { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; - ssize_t nwritten = 0; + int nwritten = 0; while ((towrite = sdslen(buf)-pos) > 0) { towrite = (towrite > (64*1024) ? (64*1024) : towrite);