Cluster: redis-trib rebalance initial implementation.

This commit is contained in:
antirez 2015-12-15 12:54:40 +01:00
parent 3782902bec
commit cba1c29580

View File

@ -25,6 +25,10 @@ require 'rubygems'
require 'redis' require 'redis'
ClusterHashSlots = 16384 ClusterHashSlots = 16384
MigrateDefaultTimeout = 60000
RebalanceDefaultThreshold = 2
$verbose = false
def xputs(s) def xputs(s)
case s[0..2] case s[0..2]
@ -86,7 +90,7 @@ class ClusterNode
def connect(o={}) def connect(o={})
return if @r return if @r
print "Connecting to node #{self}: " print "Connecting to node #{self}: " if $verbose
STDOUT.flush STDOUT.flush
begin begin
@r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60) @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
@ -96,7 +100,7 @@ class ClusterNode
exit 1 if o[:abort] exit 1 if o[:abort]
@r = nil @r = nil
end end
xputs "OK" xputs "OK" if $verbose
end end
def assert_cluster def assert_cluster
@ -288,7 +292,7 @@ class RedisTrib
@nodes = [] @nodes = []
@fix = false @fix = false
@errors = [] @errors = []
@timeout = 60000 @timeout = MigrateDefaultTimeout
end end
def check_arity(req_args, num_args) def check_arity(req_args, num_args)
@ -303,6 +307,10 @@ class RedisTrib
@nodes << node @nodes << node
end end
def reset_nodes
@nodes = []
end
def cluster_error(msg) def cluster_error(msg)
@errors << msg @errors << msg
xputs msg xputs msg
@ -326,9 +334,9 @@ class RedisTrib
sorted[0] sorted[0]
end end
def check_cluster def check_cluster(opt={})
xputs ">>> Performing Cluster Check (using node #{@nodes[0]})" xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
show_nodes show_nodes if !opt[:quiet]
check_config_consistency check_config_consistency
check_open_slots check_open_slots
check_slots_coverage check_slots_coverage
@ -512,7 +520,7 @@ class RedisTrib
# Case 1: The slot is in migrating state in one slot, and in # Case 1: The slot is in migrating state in one slot, and in
# importing state in 1 slot. That's trivial to address. # importing state in 1 slot. That's trivial to address.
if migrating.length == 1 && importing.length == 1 if migrating.length == 1 && importing.length == 1
move_slot(migrating[0],importing[0],slot,:verbose=>true,:fix=>true) move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true)
# Case 2: There are multiple nodes that claim the slot as importing, # Case 2: There are multiple nodes that claim the slot as importing,
# they probably got keys about the slot after a restart so opened # they probably got keys about the slot after a restart so opened
# the slot. In this case we just move all the keys to the owner # the slot. In this case we just move all the keys to the owner
@ -521,7 +529,7 @@ class RedisTrib
xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}" xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}"
importing.each {|node| importing.each {|node|
next if node == owner next if node == owner
move_slot(node,owner,slot,:verbose=>true,:fix=>true,:cold=>true) move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true)
xputs ">>> Setting #{slot} as STABLE in #{node}" xputs ">>> Setting #{slot} as STABLE in #{node}"
node.r.cluster("setslot",slot,"stable") node.r.cluster("setslot",slot,"stable")
} }
@ -819,13 +827,19 @@ class RedisTrib
# Options: # Options:
# :verbose -- Print a dot for every moved key. # :verbose -- Print a dot for every moved key.
# :fix -- We are moving in the context of a fix. Use REPLACE. # :fix -- We are moving in the context of a fix. Use REPLACE.
# :cold -- Move keys without opening / reconfiguring the nodes. # :cold -- Move keys without opening slots / reconfiguring the nodes.
# :update -- Update nodes.info[:slots] for source/target nodes.
# :quiet -- Don't print info messages.
def move_slot(source,target,slot,o={}) def move_slot(source,target,slot,o={})
# We start marking the slot as importing in the destination node, # We start marking the slot as importing in the destination node,
# and the slot as migrating in the target host. Note that the order of # and the slot as migrating in the target host. Note that the order of
# the operations is important, as otherwise a client may be redirected # the operations is important, as otherwise a client may be redirected
# to the target node that does not yet know it is importing this slot. # to the target node that does not yet know it is importing this slot.
print "Moving slot #{slot} from #{source} to #{target}: "; STDOUT.flush if !o[:quiet]
print "Moving slot #{slot} from #{source} to #{target}: "
STDOUT.flush
end
if !o[:cold] if !o[:cold]
target.r.cluster("setslot",slot,"importing",source.info[:name]) target.r.cluster("setslot",slot,"importing",source.info[:name])
source.r.cluster("setslot",slot,"migrating",target.info[:name]) source.r.cluster("setslot",slot,"migrating",target.info[:name])
@ -846,20 +860,26 @@ class RedisTrib
exit 1 exit 1
end end
end end
print "."*keys.length if o[:verbose] print "."*keys.length if o[:dots]
STDOUT.flush STDOUT.flush
end end
puts puts if !o[:quiet]
# Set the new node as the owner of the slot in all the known nodes. # Set the new node as the owner of the slot in all the known nodes.
if !o[:cold] if !o[:cold]
@nodes.each{|n| @nodes.each{|n|
n.r.cluster("setslot",slot,"node",target.info[:name]) n.r.cluster("setslot",slot,"node",target.info[:name])
} }
end end
# Update the node logical config
if o[:update] then
source.info[:slots].delete(slot)
target.info[:slots][slot] = true
end
end end
# redis-trib subcommands implementations # redis-trib subcommands implementations.
def check_cluster_cmd(argv,opt) def check_cluster_cmd(argv,opt)
load_cluster_info_from_node(argv[0]) load_cluster_info_from_node(argv[0])
@ -871,6 +891,106 @@ class RedisTrib
show_cluster_info show_cluster_info
end end
def rebalance_cluster_cmd(argv,opt)
load_cluster_info_from_node(argv[0])
# Options parsing
threshold = opt['threshold'].to_i
autoweights = opt['auto-weights']
weights = {}
opt['weight'].each{|w|
fields = w.split("=")
node = get_node_by_name(fields[0])
if !node || !node.has_flag?("master")
puts "*** No such master node #{fields[0]}"
exit 1
end
weights[fields[0]] = fields[1].to_f
} if opt['weight']
useempty = opt['use-empty-masters']
# Assign a weight to each node, and compute the total cluster weight.
total_weight = 0
nodes_involved = 0
@nodes.each{|n|
if n.has_flag?("master")
next if !useempty && n.slots.length == 0
n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1
total_weight += n.info[:w]
nodes_involved += 1
end
}
# Check cluster, only proceed if it looks sane.
check_cluster(:quiet => true)
if @errors.length != 0
puts "*** Please fix your cluster problems before rebalancing"
exit 1
end
# Calculate the slots balance for each node. It's the number of
# slots the node should lose (if positive) or gain (if negative)
# in order to be balanced.
@nodes.each{|n|
if n.has_flag?("master")
next if !n.info[:w]
expected = ((ClusterHashSlots.to_f / total_weight) *
n.info[:w]).to_i
n.info[:balance] = n.slots.length - expected
puts "#{n} balance is #{n.info[:balance]} slots" if $verbose
end
}
# Sort nodes by their slots balance.
sn = @nodes.select{|n|
n.has_flag?("master")
}.sort{|a,b|
a.info[:balance] <=> b.info[:balance]
}
xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}"
# Now we have at the start of the 'sn' array nodes that should get
# slots, at the end nodes that must give slots.
# We take two indexes, one at the start, and one at the end,
# incrementing or decrementing the indexes accordingly til we
# find nodes that need to get/provide slots.
dst_idx = 0
src_idx = sn.length - 1
while dst_idx < src_idx
dst = sn[dst_idx]
src = sn[src_idx]
numslots = [dst.info[:balance],src.info[:balance]].map{|n|
n.abs
}.min
if numslots > 0
puts "Moving #{numslots} slots from #{src} to #{dst}"
# Actaully move the slots.
reshard_table = compute_reshard_table([src],numslots)
if reshard_table.length != numslots
xputs "*** Assertio failed: Reshard table != number of slots"
exit 1
end
reshard_table.each{|e|
move_slot(e[:source],dst,e[:slot],
:quiet=>true,:dots=>false,:update=>true)
print "#"
STDOUT.flush
}
puts
end
# Update nodes balance.
dst.info[:balance] += numslots
src.info[:balance] -= numslots
dst_idx += 1 if dst.info[:balance] == 0
src_idx -= 1 if src.info[:balance] == 0
end
end
def fix_cluster_cmd(argv,opt) def fix_cluster_cmd(argv,opt)
@fix = true @fix = true
@timeout = opt['timeout'].to_i if opt['timeout'] @timeout = opt['timeout'].to_i if opt['timeout']
@ -992,7 +1112,7 @@ class RedisTrib
exit(1) if (yesno != "yes") exit(1) if (yesno != "yes")
end end
reshard_table.each{|e| reshard_table.each{|e|
move_slot(e[:source],target,e[:slot],:verbose=>true) move_slot(e[:source],target,e[:slot],:dots=>true)
} }
end end
@ -1238,17 +1358,32 @@ class RedisTrib
if ARGV[idx][0..1] == "--" if ARGV[idx][0..1] == "--"
option = ARGV[idx][2..-1] option = ARGV[idx][2..-1]
idx += 1 idx += 1
# --verbose is a global option
if option == "verbose"
$verbose = true
next
end
if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
puts "Unknown option '#{option}' for command '#{cmd}'" puts "Unknown option '#{option}' for command '#{cmd}'"
exit 1 exit 1
end end
if ALLOWED_OPTIONS[cmd][option] if ALLOWED_OPTIONS[cmd][option] != false
value = ARGV[idx] value = ARGV[idx]
idx += 1 idx += 1
else else
value = true value = true
end end
options[option] = value
# If the option is set to [], it's a multiple arguments
# option. We just queue every new value into an array.
if ALLOWED_OPTIONS[cmd][option] == []
options[option] = [] if !options[option]
options[option] << value
else
options[option] = value
end
else else
# Remaining arguments are not options. # Remaining arguments are not options.
break break
@ -1363,6 +1498,7 @@ COMMANDS={
"info" => ["info_cluster_cmd", 2, "host:port"], "info" => ["info_cluster_cmd", 2, "host:port"],
"fix" => ["fix_cluster_cmd", 2, "host:port"], "fix" => ["fix_cluster_cmd", 2, "host:port"],
"reshard" => ["reshard_cluster_cmd", 2, "host:port"], "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
"rebalance" => ["rebalance_cluster_cmd", -2, "host:port"],
"add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"], "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
"del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"], "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
"set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"], "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
@ -1375,8 +1511,9 @@ ALLOWED_OPTIONS={
"create" => {"replicas" => true}, "create" => {"replicas" => true},
"add-node" => {"slave" => false, "master-id" => true}, "add-node" => {"slave" => false, "master-id" => true},
"import" => {"from" => :required, "copy" => false, "replace" => false}, "import" => {"from" => :required, "copy" => false, "replace" => false},
"reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => 15000}, "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => MigrateDefaultTimeout},
"fix" => {"timeout" => 15000}, "rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => MigrateDefaultTimeout},
"fix" => {"timeout" => MigrateDefaultTimeout},
} }
def show_help def show_help