Cluster: refactoring around configEpoch handling.

This commit moves the process of generating a new config epoch without
consensus out of the clusterCommand() implementation, in order to make
it reusable for other reasons (current target is to have a CLUSTER
FAILOVER option forcing the failover when no master majority is
reachable).

Moreover the commit moves other functions which are similarly related to
config epochs in a new logical section of the cluster.c file, just for
clarity.
This commit is contained in:
antirez 2015-03-20 16:42:49 +01:00
parent 25c0f5ac63
commit 4f2555aa17

View File

@ -74,27 +74,13 @@ void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representRedisNodeFlags(sds ci, uint16_t flags);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
/* Return the greatest configEpoch found in the cluster. */
uint64_t clusterGetMaxEpoch(void) {
uint64_t max = 0;
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->configEpoch > max) max = node->configEpoch;
}
dictReleaseIterator(di);
if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
return max;
}
/* Load the cluster config from 'filename'.
*
* If the file does not exist or is zero-length (this may happen because
@ -927,6 +913,137 @@ void clusterRenameNode(clusterNode *node, char *newname) {
clusterAddNode(node);
}
/* -----------------------------------------------------------------------------
* CLUSTER config epoch handling
* -------------------------------------------------------------------------- */
/* Return the greatest configEpoch found in the cluster. */
uint64_t clusterGetMaxEpoch(void) {
uint64_t max = 0;
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->configEpoch > max) max = node->configEpoch;
}
dictReleaseIterator(di);
if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
return max;
}
/* If this node epoch is zero or is not already the greatest across the
* cluster (from the POV of the local configuration), this function will:
*
* 1) Generate a new config epoch increment the current epoch.
* 2) Assign the new epoch to this node, WITHOUT any consensus.
* 3) Persist the configuration on disk before sending packets with the
* new configuration.
*
* If the new config epoch is generated and assigend, REDIS_OK is returned,
* otherwise REDIS_ERR is returned (since the node has already the greatest
* configuration around) and no operation is performed.
*
* Important note: this function violates the principle that config epochs
* should be generated with consensus and should be unique across the cluster.
* However Redis Cluster uses this auto-generated new config epochs in two
* cases:
*
* 1) When slots are closed after importing. Otherwise resharding would be
* too exansive.
* 2) When CLUSTER FAILOVER is called with options that force a slave to
* failover its master even if there is not master majority able to
* create a new configuration epoch.
*
* Redis Cluster does not explode using this function, even in the case of
* a collision between this node and another node, generating the same
* configuration epoch unilaterally, because the config epoch conflict
* resolution algorithm will eventually move colliding nodes to different
* config epochs. However usign this function may violate the "last failover
* wins" rule, so should only be used with care. */
int clusterBumpConfigEpochWithoutConsensus(void) {
uint64_t maxEpoch = clusterGetMaxEpoch();
if (myself->configEpoch == 0 ||
myself->configEpoch != maxEpoch)
{
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
redisLog(REDIS_WARNING,
"New configEpoch set to %llu",
(unsigned long long) myself->configEpoch);
return REDIS_OK;
} else {
return REDIS_ERR;
}
}
/* This function is called when this node is a master, and we receive from
* another master a configuration epoch that is equal to our configuration
* epoch.
*
* BACKGROUND
*
* It is not possible that different slaves get the same config
* epoch during a failover election, because the slaves need to get voted
* by a majority. However when we perform a manual resharding of the cluster
* the node will assign a configuration epoch to itself without to ask
* for agreement. Usually resharding happens when the cluster is working well
* and is supervised by the sysadmin, however it is possible for a failover
* to happen exactly while the node we are resharding a slot to assigns itself
* a new configuration epoch, but before it is able to propagate it.
*
* So technically it is possible in this condition that two nodes end with
* the same configuration epoch.
*
* Another possibility is that there are bugs in the implementation causing
* this to happen.
*
* Moreover when a new cluster is created, all the nodes start with the same
* configEpoch. This collision resolution code allows nodes to automatically
* end with a different configEpoch at startup automatically.
*
* In all the cases, we want a mechanism that resolves this issue automatically
* as a safeguard. The same configuration epoch for masters serving different
* set of slots is not harmful, but it is if the nodes end serving the same
* slots for some reason (manual errors or software bugs) without a proper
* failover procedure.
*
* In general we want a system that eventually always ends with different
* masters having different configuration epochs whatever happened, since
* nothign is worse than a split-brain condition in a distributed system.
*
* BEHAVIOR
*
* When this function gets called, what happens is that if this node
* has the lexicographically smaller Node ID compared to the other node
* with the conflicting epoch (the 'sender' node), it will assign itself
* the greatest configuration epoch currently detected among nodes plus 1.
*
* This means that even if there are multiple nodes colliding, the node
* with the greatest Node ID never moves forward, so eventually all the nodes
* end with a different configuration epoch.
*/
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
redisLog(REDIS_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
/* -----------------------------------------------------------------------------
* CLUSTER nodes blacklist
*
@ -1399,69 +1516,6 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
/* This function is called when this node is a master, and we receive from
* another master a configuration epoch that is equal to our configuration
* epoch.
*
* BACKGROUND
*
* It is not possible that different slaves get the same config
* epoch during a failover election, because the slaves need to get voted
* by a majority. However when we perform a manual resharding of the cluster
* the node will assign a configuration epoch to itself without to ask
* for agreement. Usually resharding happens when the cluster is working well
* and is supervised by the sysadmin, however it is possible for a failover
* to happen exactly while the node we are resharding a slot to assigns itself
* a new configuration epoch, but before it is able to propagate it.
*
* So technically it is possible in this condition that two nodes end with
* the same configuration epoch.
*
* Another possibility is that there are bugs in the implementation causing
* this to happen.
*
* Moreover when a new cluster is created, all the nodes start with the same
* configEpoch. This collision resolution code allows nodes to automatically
* end with a different configEpoch at startup automatically.
*
* In all the cases, we want a mechanism that resolves this issue automatically
* as a safeguard. The same configuration epoch for masters serving different
* set of slots is not harmful, but it is if the nodes end serving the same
* slots for some reason (manual errors or software bugs) without a proper
* failover procedure.
*
* In general we want a system that eventually always ends with different
* masters having different configuration epochs whatever happened, since
* nothign is worse than a split-brain condition in a distributed system.
*
* BEHAVIOR
*
* When this function gets called, what happens is that if this node
* has the lexicographically smaller Node ID compared to the other node
* with the conflicting epoch (the 'sender' node), it will assign itself
* the greatest configuration epoch currently detected among nodes plus 1.
*
* This means that even if there are multiple nodes colliding, the node
* with the greatest Node ID never moves forward, so eventually all the nodes
* end with a different configuration epoch.
*/
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
redisLog(REDIS_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
/* When this function is called, there is a packet to process starting
* at node->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
@ -3547,30 +3601,28 @@ sds representRedisNodeFlags(sds ci, uint16_t flags) {
/* Generate a csv-alike representation of the specified cluster node.
* See clusterGenNodesDescription() top comment for more information.
*
* The function appends the node representation to the SDS string 'ci' and
* returns it (that may point to a different string as usually with the
* SDS-style API). */
sds clusterGenNodeDescription(sds ci, clusterNode *node) {
* The function returns the string representation as an SDS string. */
sds clusterGenNodeDescription(clusterNode *node) {
int j, start;
sds ci;
/* Node coordinates */
ci = sdscatlen(ci,node->name,40);
ci = sdscatfmt(ci," %s:%i ",node->ip,node->port);
ci = sdscatprintf(sdsempty(),"%.40s %s:%d ",
node->name,
node->ip,
node->port);
/* Flags */
ci = representRedisNodeFlags(ci, node->flags);
/* Slave of... or just "-" */
if (node->slaveof) {
ci = sdscatlen(ci," ",1);
ci = sdscatlen(ci,node->slaveof->name,40);
ci = sdscatlen(ci," ",1);
} else {
if (node->slaveof)
ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
else
ci = sdscatlen(ci," - ",3);
}
/* Latency from the POV of this node, link status */
ci = sdscatfmt(ci,"%I %I %U %s",
ci = sdscatprintf(ci,"%lld %lld %llu %s",
(long long) node->ping_sent,
(long long) node->pong_received,
(unsigned long long) node->configEpoch,
@ -3582,19 +3634,6 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) {
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
int bit;
/* It is common for a node to have pretty contiguous slots, so
* optimize this loop by skipping whole 32bit words if they have
* no set bits. We stop to the penultimate word because last word
* has special handling when start != -1 (later in the loop). */
if ((j&31)==0 && j < REDIS_CLUSTER_SLOTS-32) {
uint32_t *slotword = ((uint32_t*)node->slots)+(j/32);
if ((start == -1 && *slotword == 0) ||
(start != -1 && *slotword == UINT32_MAX)) {
j += 31; /* The for loop will increment j one more time. */
continue;
}
}
if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
if (start == -1) start = j;
}
@ -3640,19 +3679,18 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) {
* of the CLUSTER NODES function, and as format for the cluster
* configuration file (nodes.conf) for a given node. */
sds clusterGenNodesDescription(int filter) {
sds ci = sdsempty();
sds ci = sdsempty(), ni;
dictIterator *di;
dictEntry *de;
/* Make room to avoid multiple resizes of the buffer. */
ci = sdsMakeRoomFor(ci,256*dictSize(server.cluster->nodes));
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & filter) continue;
ci = clusterGenNodeDescription(ci,node);
ni = clusterGenNodeDescription(node);
ci = sdscatsds(ci,ni);
sdsfree(ni);
ci = sdscatlen(ci,"\n",1);
}
dictReleaseIterator(di);
@ -3918,17 +3956,9 @@ void clusterCommand(redisClient *c) {
* failover happens at the same time we close the slot, the
* configEpoch collision resolution will fix it assigning
* a different epoch to each node. */
uint64_t maxEpoch = clusterGetMaxEpoch();
if (myself->configEpoch == 0 ||
myself->configEpoch != maxEpoch)
{
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_FSYNC_CONFIG);
if (clusterBumpConfigEpochWithoutConsensus() == REDIS_OK) {
redisLog(REDIS_WARNING,
"configEpoch set to %llu after importing slot %d",
(unsigned long long) myself->configEpoch, slot);
"configEpoch updated after importing slot %d", slot);
}
server.cluster->importing_slots_from[slot] = NULL;
}
@ -3989,7 +4019,10 @@ void clusterCommand(redisClient *c) {
server.cluster->stats_bus_messages_sent,
server.cluster->stats_bus_messages_received
);
addReplyBulkSds(c, info);
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
(unsigned long)sdslen(info)));
addReplySds(c,info);
addReply(c,shared.crlf);
} else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
int retval = clusterSaveConfig(1);
@ -4109,7 +4142,7 @@ void clusterCommand(redisClient *c) {
addReplyMultiBulkLen(c,n->numslaves);
for (j = 0; j < n->numslaves; j++) {
sds ni = clusterGenNodeDescription(sdsempty(),n->slaves[j]);
sds ni = clusterGenNodeDescription(n->slaves[j]);
addReplyBulkCString(c,ni);
sdsfree(ni);
}
@ -4526,7 +4559,7 @@ try_again:
/* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but
* we include such information in the reply string. */
if ((o = lookupKeyWrite(c->db,c->argv[3])) == NULL) {
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
@ -4579,7 +4612,7 @@ try_again:
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
ssize_t nwritten = 0;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);