mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 16:10:50 +00:00
Cluster: slots update refactored + UPDATE msg processing.
Now there is a function that handles the update of the local slot configuration every time we have some new info about a node and its set of served slots and configEpoch. Moreoever the UPDATE packets are now processed when received (it was a work in progress in the previous commit).
This commit is contained in:
parent
dc43f66eac
commit
94a07d5901
168
src/cluster.c
168
src/cluster.c
@ -856,6 +856,84 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Reconfigure the specified node 'n' as a master. This function is called when
|
||||
* a node that we believed to be a slave is now acting as master in order to
|
||||
* update the state of the node. */
|
||||
void clusterSetNodeAsMaster(clusterNode *n) {
|
||||
if (n->flags & REDIS_NODE_MASTER) return;
|
||||
|
||||
if (n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
|
||||
n->flags &= ~REDIS_NODE_SLAVE;
|
||||
n->flags |= REDIS_NODE_MASTER;
|
||||
n->slaveof = NULL;
|
||||
|
||||
/* Update config and state. */
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE);
|
||||
}
|
||||
|
||||
/* This function is called when we receive a master configuration via a
|
||||
* PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
|
||||
* node, and the set of slots claimed under this configEpoch.
|
||||
*
|
||||
* What we do is to rebind the slots with newer configuration compared to our
|
||||
* local configuration, and if needed, we turn ourself into a replica of the
|
||||
* node (see the function comments for more info).
|
||||
*
|
||||
* The 'sender' is the node for which we received a configuration update.
|
||||
* Sometimes it is not actaully the "Sender" of the information, like in the case
|
||||
* we receive the info via an UPDATE packet. */
|
||||
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch,
|
||||
unsigned char *slots)
|
||||
{
|
||||
int j;
|
||||
clusterNode *curmaster, *newmaster = NULL;
|
||||
|
||||
/* Here we set curmaster to this node or the node this node
|
||||
* replicates to if it's a slave. In the for loop we are
|
||||
* interested to check if slots are taken away from curmaster. */
|
||||
if (server.cluster->myself->flags & REDIS_NODE_MASTER)
|
||||
curmaster = server.cluster->myself;
|
||||
else
|
||||
curmaster = server.cluster->myself->slaveof;
|
||||
|
||||
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
|
||||
if (bitmapTestBit(slots,j)) {
|
||||
/* We rebind the slot to the new node claiming it if:
|
||||
* 1) The slot was unassigned.
|
||||
* 2) The new node claims it with a greater configEpoch. */
|
||||
if (server.cluster->slots[j] == sender) continue;
|
||||
if (server.cluster->slots[j] == NULL ||
|
||||
server.cluster->slots[j]->configEpoch <
|
||||
senderConfigEpoch)
|
||||
{
|
||||
if (server.cluster->slots[j] == curmaster)
|
||||
newmaster = sender;
|
||||
clusterDelSlot(j);
|
||||
clusterAddSlot(sender,j);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE|
|
||||
CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* If at least one slot was reassigned from a node to another node
|
||||
* with a greater configEpoch, it is possible that:
|
||||
* 1) We are a master left without slots. This means that we were
|
||||
* failed over and we should turn into a replica of the new
|
||||
* master.
|
||||
* 2) We are a slave and our master is left without slots. We need
|
||||
* to replicate to the new slots owner. */
|
||||
if (newmaster && curmaster->numslots == 0) {
|
||||
redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name);
|
||||
clusterSetMaster(sender);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE|
|
||||
CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
}
|
||||
|
||||
/* When this function is called, there is a packet to process starting
|
||||
* at node->rcvbuf. Releasing the buffer is up to the caller, so this
|
||||
* function should just handle the higher level stuff of processing the
|
||||
@ -905,6 +983,11 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
|
||||
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
||||
|
||||
if (totlen != explen) return 1;
|
||||
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
|
||||
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
||||
|
||||
explen += sizeof(clusterMsgDataUpdate);
|
||||
if (totlen != explen) return 1;
|
||||
}
|
||||
|
||||
@ -1036,18 +1119,7 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
sizeof(hdr->slaveof)))
|
||||
{
|
||||
/* Node is a master. */
|
||||
if (sender->flags & REDIS_NODE_SLAVE) {
|
||||
/* Reconfigure node as master. */
|
||||
if (sender->slaveof)
|
||||
clusterNodeRemoveSlave(sender->slaveof,sender);
|
||||
sender->flags &= ~REDIS_NODE_SLAVE;
|
||||
sender->flags |= REDIS_NODE_MASTER;
|
||||
sender->slaveof = NULL;
|
||||
|
||||
/* Update config and state. */
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE);
|
||||
}
|
||||
clusterSetNodeAsMaster(sender);
|
||||
} else {
|
||||
/* Node is a slave. */
|
||||
clusterNode *master = clusterLookupNode(hdr->slaveof);
|
||||
@ -1099,60 +1171,11 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
}
|
||||
}
|
||||
|
||||
/* 1) We check if the sender (master) is claiming slots that we belive to
|
||||
* serve as a master, or replicate as a slave, but with a configEpoch
|
||||
* that is newer: in that case we rebind the slots to the claiming node
|
||||
* in our routing table.
|
||||
*
|
||||
* Moreover if we are left with 0 slots to serve, we reconfigure as
|
||||
* a replica of the sender. */
|
||||
/* 1) If the sender of the message is a master, and we detected that the
|
||||
* set of slots it claims changed, scan the slots to see if we need
|
||||
* to update our configuration. */
|
||||
if (sender && sender->flags & REDIS_NODE_MASTER && dirty_slots) {
|
||||
int j;
|
||||
clusterNode *curmaster, *newmaster = NULL;
|
||||
|
||||
/* Here we set curmaster to this node or the node this node
|
||||
* replicates to if it's a slave. In the for loop we are
|
||||
* interested to check if slots are taken away from curmaster. */
|
||||
if (server.cluster->myself->flags & REDIS_NODE_MASTER)
|
||||
curmaster = server.cluster->myself;
|
||||
else
|
||||
curmaster = server.cluster->myself->slaveof;
|
||||
|
||||
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
|
||||
if (bitmapTestBit(hdr->myslots,j)) {
|
||||
/* We rebind the slot to the new node claiming it if:
|
||||
* 1) The slot was unassigned.
|
||||
* 2) The new node claims it with a greater configEpoch. */
|
||||
if (server.cluster->slots[j] == sender) continue;
|
||||
if (server.cluster->slots[j] == NULL ||
|
||||
server.cluster->slots[j]->configEpoch <
|
||||
senderConfigEpoch)
|
||||
{
|
||||
if (server.cluster->slots[j] == curmaster)
|
||||
newmaster = sender;
|
||||
clusterDelSlot(j);
|
||||
clusterAddSlot(sender,j);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE|
|
||||
CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* If at least one slot was reassigned from a node to another node
|
||||
* with a greater configEpoch, it is possible that:
|
||||
* 1) We are a master is left without slots. This means that we were
|
||||
* failed over and we should turn into a replica of the new
|
||||
* master.
|
||||
* 2) We are a slave and our master is left without slots. We need
|
||||
* to replicate to the new slots owner. */
|
||||
if (newmaster && curmaster->numslots == 0) {
|
||||
redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name);
|
||||
clusterSetMaster(sender);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE|
|
||||
CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
|
||||
}
|
||||
|
||||
/* 2) We also check for the reverse condition, that is, the sender claims
|
||||
@ -1248,6 +1271,21 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
* we check ASAP. */
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
|
||||
}
|
||||
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
|
||||
clusterNode *n; /* The node the update is about. */
|
||||
uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);
|
||||
|
||||
if (!sender) return 1; /* We don't know the sender. */
|
||||
n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
|
||||
if (!n) return 1; /* We don't know the reported node. */
|
||||
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
|
||||
|
||||
/* If in our current config the node is a slave, set it as a master. */
|
||||
if (n->flags & REDIS_NODE_SLAVE) clusterSetNodeAsMaster(n);
|
||||
|
||||
/* Check the bitmap of served slots and udpate our config accordingly. */
|
||||
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
|
||||
hdr->data.update.nodecfg.slots);
|
||||
} else {
|
||||
redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user