mirror of
https://github.com/fluencelabs/redis
synced 2025-04-08 18:38:04 +00:00
Initial support for --replicas in redis-trib.
This commit is contained in:
parent
9fba193a59
commit
6cf230ea91
@ -61,6 +61,7 @@ class ClusterNode
|
|||||||
@info[:slots] = {}
|
@info[:slots] = {}
|
||||||
@info[:migrating] = {}
|
@info[:migrating] = {}
|
||||||
@info[:importing] = {}
|
@info[:importing] = {}
|
||||||
|
@info[:replicate] = false
|
||||||
@dirty = false # True if we need to flush slots info into node.
|
@dirty = false # True if we need to flush slots info into node.
|
||||||
@friends = []
|
@friends = []
|
||||||
end
|
end
|
||||||
@ -172,8 +173,24 @@ class ClusterNode
|
|||||||
@dirty = true
|
@dirty = true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def set_as_replica(node_id)
|
||||||
|
@info[:replicate] = node_id
|
||||||
|
@dirty = true
|
||||||
|
end
|
||||||
|
|
||||||
def flush_node_config
|
def flush_node_config
|
||||||
return if !@dirty
|
return if !@dirty
|
||||||
|
if @info[:replicate]
|
||||||
|
begin
|
||||||
|
@r.cluster("replicate",@info[:replicate])
|
||||||
|
rescue
|
||||||
|
# If the cluster did not already joined it is possible that
|
||||||
|
# the slave does not know the master node yet. So on errors
|
||||||
|
# we return ASAP leaving the dirty flag set, to flush the
|
||||||
|
# config later.
|
||||||
|
return
|
||||||
|
end
|
||||||
|
else
|
||||||
new = []
|
new = []
|
||||||
@info[:slots].each{|s,val|
|
@info[:slots].each{|s,val|
|
||||||
if val == :new
|
if val == :new
|
||||||
@ -182,6 +199,7 @@ class ClusterNode
|
|||||||
end
|
end
|
||||||
}
|
}
|
||||||
@r.cluster("addslots",*new)
|
@r.cluster("addslots",*new)
|
||||||
|
end
|
||||||
@dirty = false
|
@dirty = false
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -218,10 +236,15 @@ class ClusterNode
|
|||||||
}.join(",")
|
}.join(",")
|
||||||
|
|
||||||
role = self.has_flag?("master") ? "M" : "S"
|
role = self.has_flag?("master") ? "M" : "S"
|
||||||
|
|
||||||
|
if self.info[:replicate] and @dirty
|
||||||
|
"S: #{self.info[:name]} #{self.to_s}"
|
||||||
|
else
|
||||||
"#{role}: #{self.info[:name]} #{self.to_s}\n"+
|
"#{role}: #{self.info[:name]} #{self.to_s}\n"+
|
||||||
" slots:#{slots} (#{self.slots.length} slots) "+
|
" slots:#{slots} (#{self.slots.length} slots) "+
|
||||||
"#{(self.info[:flags]-["myself"]).join(",")}"
|
"#{(self.info[:flags]-["myself"]).join(",")}"
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# Return a single string representing nodes and associated slots.
|
# Return a single string representing nodes and associated slots.
|
||||||
# TODO: remove slaves from config when slaves will be handled
|
# TODO: remove slaves from config when slaves will be handled
|
||||||
@ -460,15 +483,68 @@ class RedisTrib
|
|||||||
end
|
end
|
||||||
|
|
||||||
def alloc_slots
|
def alloc_slots
|
||||||
slots_per_node = ClusterHashSlots/@nodes.length
|
nodes_count = @nodes.length
|
||||||
i = 0
|
masters_count = @nodes.length / (@replicas+1)
|
||||||
|
slots_per_node = ClusterHashSlots / masters_count
|
||||||
|
masters = []
|
||||||
|
slaves = []
|
||||||
|
|
||||||
|
# The first step is to split instances by IP. This is useful as
|
||||||
|
# we'll try to allocate master nodes in different physical machines
|
||||||
|
# (as much as possible) and to allocate slaves of a given master in
|
||||||
|
# different physical machines as well.
|
||||||
|
#
|
||||||
|
# This code assumes just that if the IP is different, than it is more
|
||||||
|
# likely that the instance is running in a different physical host
|
||||||
|
# or at least a different virtual machine.
|
||||||
|
ips = {}
|
||||||
@nodes.each{|n|
|
@nodes.each{|n|
|
||||||
|
ips[n.info[:host]] = [] if !ips[n.info[:host]]
|
||||||
|
ips[n.info[:host]] << n
|
||||||
|
}
|
||||||
|
|
||||||
|
# Select master instances
|
||||||
|
puts "Using #{masters_count} masters:"
|
||||||
|
while masters.length < masters_count
|
||||||
|
ips.each{|ip,nodes_list|
|
||||||
|
next if nodes_list.length == 0
|
||||||
|
masters << nodes_list.shift
|
||||||
|
puts masters[-1]
|
||||||
|
nodes_count -= 1
|
||||||
|
break if masters.length == masters_count
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
# Alloc slots on masters
|
||||||
|
i = 0
|
||||||
|
masters.each{|n|
|
||||||
first = i*slots_per_node
|
first = i*slots_per_node
|
||||||
last = first+slots_per_node-1
|
last = first+slots_per_node-1
|
||||||
last = ClusterHashSlots-1 if i == @nodes.length-1
|
last = ClusterHashSlots-1 if i == @nodes.length-1
|
||||||
n.add_slots first..last
|
n.add_slots first..last
|
||||||
i += 1
|
i += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Select N replicas for every master.
|
||||||
|
# We try to split the replicas among all the IPs with spare nodes
|
||||||
|
# trying to avoid the host where the master is running, if possible.
|
||||||
|
masters.each{|m|
|
||||||
|
i = 0
|
||||||
|
while i < @replicas
|
||||||
|
ips.each{|ip,nodes_list|
|
||||||
|
next if nodes_list.length == 0
|
||||||
|
# Skip instances with the same IP as the master if we
|
||||||
|
# have some more IPs available.
|
||||||
|
next if ip == m.info[:host] && nodes_count > nodes_list.length
|
||||||
|
slave = nodes_list.shift
|
||||||
|
slave.set_as_replica(m.info[:name])
|
||||||
|
nodes_count -= 1
|
||||||
|
i += 1
|
||||||
|
puts "#{m} replica ##{i} is #{slave}"
|
||||||
|
break if masters.length == masters_count
|
||||||
|
}
|
||||||
|
end
|
||||||
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def flush_nodes_config
|
def flush_nodes_config
|
||||||
@ -667,7 +743,24 @@ class RedisTrib
|
|||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# This is an helper function for create_cluster_cmd that verifies if
|
||||||
|
# the number of nodes and the specified replicas have a valid configuration
|
||||||
|
# where there are at least three master nodes and enough replicas per node.
|
||||||
|
def check_create_parameters
|
||||||
|
masters = @nodes.length/(@replicas+1)
|
||||||
|
if masters < 3
|
||||||
|
puts "*** ERROR: Invalid configuration for cluster creation."
|
||||||
|
puts "*** Redis Cluster requires at least 3 master nodes."
|
||||||
|
puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node."
|
||||||
|
puts "*** At least #{3*(@replicas+1)} nodes are required."
|
||||||
|
exit 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def create_cluster_cmd(argv,opt)
|
def create_cluster_cmd(argv,opt)
|
||||||
|
opt = {'replicas' => 0}.merge(opt)
|
||||||
|
@replicas = opt['replicas'].to_i
|
||||||
|
|
||||||
xputs ">>> Creating cluster"
|
xputs ">>> Creating cluster"
|
||||||
argv[0..-1].each{|n|
|
argv[0..-1].each{|n|
|
||||||
node = ClusterNode.new(n)
|
node = ClusterNode.new(n)
|
||||||
@ -677,6 +770,7 @@ class RedisTrib
|
|||||||
node.assert_empty
|
node.assert_empty
|
||||||
add_node(node)
|
add_node(node)
|
||||||
}
|
}
|
||||||
|
check_create_parameters
|
||||||
xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
|
xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
|
||||||
alloc_slots
|
alloc_slots
|
||||||
show_nodes
|
show_nodes
|
||||||
@ -690,6 +784,7 @@ class RedisTrib
|
|||||||
# they are still empty with unassigned slots.
|
# they are still empty with unassigned slots.
|
||||||
sleep 1
|
sleep 1
|
||||||
wait_cluster_join
|
wait_cluster_join
|
||||||
|
flush_nodes_config # Useful for the replicas
|
||||||
check_cluster
|
check_cluster
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -758,11 +853,11 @@ COMMANDS={
|
|||||||
}
|
}
|
||||||
|
|
||||||
ALLOWED_OPTIONS={
|
ALLOWED_OPTIONS={
|
||||||
"create" => {"slaves" => false}
|
"create" => {"replicas" => true}
|
||||||
}
|
}
|
||||||
|
|
||||||
def show_help
|
def show_help
|
||||||
puts "Usage: redis-trib <command> <arguments ...>"
|
puts "Usage: redis-trib <command> <options> <arguments ...>"
|
||||||
puts
|
puts
|
||||||
COMMANDS.each{|k,v|
|
COMMANDS.each{|k,v|
|
||||||
puts " #{k.ljust(10)} #{v[2]}"
|
puts " #{k.ljust(10)} #{v[2]}"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user