Merge branch 'clusterfixes' into unstable

This commit is contained in:
antirez 2015-01-21 19:30:22 +01:00
commit af8d1b4bda
5 changed files with 59 additions and 19 deletions

View File

@ -783,8 +783,11 @@ int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
for (j = 0; j < master->numslaves; j++) { for (j = 0; j < master->numslaves; j++) {
if (master->slaves[j] == slave) { if (master->slaves[j] == slave) {
memmove(master->slaves+j,master->slaves+(j+1), if ((j+1) < master->numslaves) {
(master->numslaves-1)-j); int remaining_slaves = (master->numslaves - j) - 1;
memmove(master->slaves+j,master->slaves+(j+1),
(sizeof(*master->slaves) * remaining_slaves));
}
master->numslaves--; master->numslaves--;
return REDIS_OK; return REDIS_OK;
} }
@ -819,15 +822,30 @@ int clusterCountNonFailingSlaves(clusterNode *n) {
return okslaves; return okslaves;
} }
/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
void freeClusterNode(clusterNode *n) { void freeClusterNode(clusterNode *n) {
sds nodename; sds nodename;
int j;
/* If the node is a master with associated slaves, we have to set
* all the slaves->slaveof fields to NULL (unknown). */
if (nodeIsMaster(n)) {
for (j = 0; j < n->numslaves; j++)
n->slaves[j]->slaveof = NULL;
}
/* Remove this node from the list of slaves of its master. */
if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
/* Unlink from the set of nodes. */
nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN); nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
redisAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK); redisAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename); sdsfree(nodename);
if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
/* Release link and associated data structures. */
if (n->link) freeClusterLink(n->link); if (n->link) freeClusterLink(n->link);
listRelease(n->fail_reports); listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n); zfree(n);
} }
@ -840,11 +858,16 @@ int clusterAddNode(clusterNode *node) {
return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR; return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
} }
/* Remove a node from the cluster: /* Remove a node from the cluster. The functio performs the high level
* 1) Mark all the nodes handled by it as unassigned. * cleanup, calling freeClusterNode() for the low level cleanup.
* 2) Remove all the failure reports sent by this node. * Here we do the following:
* 3) Free the node, that will in turn remove it from the hash table *
* and from the list of slaves of its master, if it is a slave node. * 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/ */
void clusterDelNode(clusterNode *delnode) { void clusterDelNode(clusterNode *delnode) {
int j; int j;
@ -871,11 +894,7 @@ void clusterDelNode(clusterNode *delnode) {
} }
dictReleaseIterator(di); dictReleaseIterator(di);
/* 3) Remove this node from its master's slaves if needed. */ /* 3) Free the node, unlinking it from the cluster. */
if (nodeIsSlave(delnode) && delnode->slaveof)
clusterNodeRemoveSlave(delnode->slaveof,delnode);
/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode); freeClusterNode(delnode);
} }
@ -1234,7 +1253,7 @@ void nodeIp2String(char *buf, clusterLink *link) {
* The function returns 0 if the node address is still the same, * The function returns 0 if the node address is still the same,
* otherwise 1 is returned. */ * otherwise 1 is returned. */
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) { int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) {
char ip[REDIS_IP_STR_LEN]; char ip[REDIS_IP_STR_LEN] = {0};
/* We don't proceed if the link is the same as the sender link, as this /* We don't proceed if the link is the same as the sender link, as this
* function is designed to see if the node link is consistent with the * function is designed to see if the node link is consistent with the
@ -1611,7 +1630,7 @@ int clusterProcessPacket(clusterLink *link) {
} }
/* Free this node as we already have it. This will /* Free this node as we already have it. This will
* cause the link to be freed as well. */ * cause the link to be freed as well. */
freeClusterNode(link->node); clusterDelNode(link->node);
return 0; return 0;
} }
@ -2784,6 +2803,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
} }
} }
} }
dictReleaseIterator(di);
/* Step 4: perform the migration if there is a target, and if I'm the /* Step 4: perform the migration if there is a target, and if I'm the
* candidate. */ * candidate. */
@ -2905,7 +2925,7 @@ void clusterCron(void) {
/* A Node in HANDSHAKE state has a limited lifespan equal to the /* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */ * configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node); clusterDelNode(node);
continue; continue;
} }

View File

@ -21,6 +21,7 @@ proc main {} {
if {[catch main e]} { if {[catch main e]} {
puts $::errorInfo puts $::errorInfo
if {$::pause_on_error} pause_on_error
cleanup cleanup
exit 1 exit 1
} }

View File

@ -28,8 +28,10 @@ test "Cluster nodes are reachable" {
test "Cluster nodes hard reset" { test "Cluster nodes hard reset" {
foreach_redis_id id { foreach_redis_id id {
catch {R $id flushall} ; # May fail for readonly slaves. catch {R $id flushall} ; # May fail for readonly slaves.
R $id MULTI
R $id cluster reset hard R $id cluster reset hard
R $id cluster set-config-epoch [expr {$id+1}] R $id cluster set-config-epoch [expr {$id+1}]
R $id EXEC
R $id config set cluster-node-timeout 3000 R $id config set cluster-node-timeout 3000
R $id config set cluster-slave-validity-factor 10 R $id config set cluster-slave-validity-factor 10
R $id config rewrite R $id config rewrite

View File

@ -68,7 +68,7 @@ proc spawn_instance {type base_port count {conf {}}} {
} }
if {$::valgrind} { if {$::valgrind} {
set pid [exec valgrind --suppressions=../../../src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full ../../../src/${prgname} $cfgfile &] set pid [exec valgrind --track-origins=yes --suppressions=../../../src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full ../../../src/${prgname} $cfgfile &]
} else { } else {
set pid [exec ../../../src/${prgname} $cfgfile &] set pid [exec ../../../src/${prgname} $cfgfile &]
} }
@ -105,6 +105,7 @@ proc cleanup {} {
proc abort_sentinel_test msg { proc abort_sentinel_test msg {
puts "WARNING: Aborting the test." puts "WARNING: Aborting the test."
puts ">>>>>>>> $msg" puts ">>>>>>>> $msg"
if {$::pause_on_error} pause_on_error
cleanup cleanup
exit 1 exit 1
} }
@ -369,15 +370,31 @@ proc get_instance_id_by_port {type port} {
# The instance can be restarted with restart-instance. # The instance can be restarted with restart-instance.
proc kill_instance {type id} { proc kill_instance {type id} {
set pid [get_instance_attrib $type $id pid] set pid [get_instance_attrib $type $id pid]
set port [get_instance_attrib $type $id port]
if {$pid == -1} { if {$pid == -1} {
error "You tried to kill $type $id twice." error "You tried to kill $type $id twice."
} }
exec kill -9 $pid exec kill -9 $pid
set_instance_attrib $type $id pid -1 set_instance_attrib $type $id pid -1
set_instance_attrib $type $id link you_tried_to_talk_with_killed_instance set_instance_attrib $type $id link you_tried_to_talk_with_killed_instance
# Remove the PID from the list of pids to kill at exit. # Remove the PID from the list of pids to kill at exit.
set ::pids [lsearch -all -inline -not -exact $::pids $pid] set ::pids [lsearch -all -inline -not -exact $::pids $pid]
# Wait for the port it was using to be available again, so that's not
# an issue to start a new server ASAP with the same port.
set retry 10
while {[incr retry -1]} {
set port_is_free [catch {set s [socket 127.0.01 $port]}]
if {$port_is_free} break
catch {close $s}
after 1000
}
if {$retry == 0} {
error "Port $port does not return available after killing instance."
}
} }
# Return true of the instance of the specified type/id is killed. # Return true of the instance of the specified type/id is killed.
@ -401,7 +418,7 @@ proc restart_instance {type id} {
} }
if {$::valgrind} { if {$::valgrind} {
set pid [exec valgrind --suppressions=../../../src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full ../../../src/${prgname} $cfgfile &] set pid [exec valgrind --track-origins=yes --suppressions=../../../src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full ../../../src/${prgname} $cfgfile &]
} else { } else {
set pid [exec ../../../src/${prgname} $cfgfile &] set pid [exec ../../../src/${prgname} $cfgfile &]
} }

View File

@ -207,7 +207,7 @@ proc start_server {options {code undefined}} {
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
if {$::valgrind} { if {$::valgrind} {
set pid [exec valgrind --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
} else { } else {
set pid [exec src/redis-server $config_file > $stdout 2> $stderr &] set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
} }