mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
redis-trib: All output wrapped by a specific function.
This is needed in order to colorize it as next step. We use conventions in output messages such as >>> This is an action *** This is a warning [ERR] This is an error [OK] That's fine And so forth, so that a color will be associated checking the first three chars.
This commit is contained in:
parent
be7bdd376e
commit
c195289e5e
@ -27,8 +27,7 @@ require 'redis'
|
||||
ClusterHashSlots = 16384
|
||||
|
||||
def xputs(s)
|
||||
printf s
|
||||
STDOUT.flush
|
||||
puts s
|
||||
end
|
||||
|
||||
class ClusterNode
|
||||
@ -67,23 +66,23 @@ class ClusterNode
|
||||
|
||||
def connect(o={})
|
||||
return if @r
|
||||
xputs "Connecting to node #{self}: "
|
||||
print "Connecting to node #{self}: "
|
||||
STDOUT.flush
|
||||
begin
|
||||
@r = Redis.new(:host => @info[:host], :port => @info[:port])
|
||||
@r.ping
|
||||
rescue
|
||||
puts "ERROR"
|
||||
puts "Sorry, can't connect to node #{self}"
|
||||
xputs "[ERR] Sorry, can't connect to node #{self}"
|
||||
exit 1 if o[:abort]
|
||||
@r = nil
|
||||
end
|
||||
puts "OK"
|
||||
xputs "OK"
|
||||
end
|
||||
|
||||
def assert_cluster
|
||||
info = @r.info
|
||||
if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
|
||||
puts "Error: Node #{self} is not configured as a cluster node."
|
||||
xputs "[ERR] Node #{self} is not configured as a cluster node."
|
||||
exit 1
|
||||
end
|
||||
end
|
||||
@ -91,7 +90,7 @@ class ClusterNode
|
||||
def assert_empty
|
||||
if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
|
||||
(@r.info['db0'])
|
||||
puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
|
||||
xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
|
||||
exit 1
|
||||
end
|
||||
end
|
||||
@ -241,7 +240,7 @@ class RedisTrib
|
||||
def check_arity(req_args, num_args)
|
||||
if ((req_args > 0 and num_args != req_args) ||
|
||||
(req_args < 0 and num_args < req_args.abs))
|
||||
puts "Wrong number of arguments for specified sub command"
|
||||
xputs "[ERR] Wrong number of arguments for specified sub command"
|
||||
exit 1
|
||||
end
|
||||
end
|
||||
@ -252,7 +251,7 @@ class RedisTrib
|
||||
|
||||
def cluster_error(msg)
|
||||
@errors << msg
|
||||
puts msg
|
||||
xputs msg
|
||||
end
|
||||
|
||||
def get_node_by_name(name)
|
||||
@ -263,7 +262,7 @@ class RedisTrib
|
||||
end
|
||||
|
||||
def check_cluster
|
||||
puts ">>> Performing Cluster Check (using node #{@nodes[0]})"
|
||||
xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
|
||||
show_nodes
|
||||
check_config_consistency
|
||||
check_open_slots
|
||||
@ -281,10 +280,10 @@ class RedisTrib
|
||||
end
|
||||
|
||||
def check_slots_coverage
|
||||
puts ">>> Check slots coverage..."
|
||||
xputs ">>> Check slots coverage..."
|
||||
slots = covered_slots
|
||||
if slots.length == ClusterHashSlots
|
||||
puts "[OK] All #{ClusterHashSlots} slots covered."
|
||||
xputs "[OK] All #{ClusterHashSlots} slots covered."
|
||||
else
|
||||
cluster_error \
|
||||
"[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
|
||||
@ -293,7 +292,7 @@ class RedisTrib
|
||||
end
|
||||
|
||||
def check_open_slots
|
||||
puts ">>> Check for open slots..."
|
||||
xputs ">>> Check for open slots..."
|
||||
open_slots = []
|
||||
@nodes.each{|n|
|
||||
if n.info[:migrating].size > 0
|
||||
@ -308,7 +307,7 @@ class RedisTrib
|
||||
}
|
||||
open_slots.uniq!
|
||||
if open_slots.length > 0
|
||||
puts "[WARNING] The following slots are open: #{open_slots.join(",")}"
|
||||
xputs "[WARNING] The following slots are open: #{open_slots.join(",")}"
|
||||
end
|
||||
if @fix
|
||||
open_slots.each{|slot| fix_open_slot slot}
|
||||
@ -325,8 +324,8 @@ class RedisTrib
|
||||
|
||||
def fix_slots_coverage
|
||||
not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
|
||||
puts "\nFixing slots coverage..."
|
||||
puts "List of not covered slots: " + not_covered.join(",")
|
||||
xputs ">>> Fixing slots coverage..."
|
||||
xputs "List of not covered slots: " + not_covered.join(",")
|
||||
|
||||
# For every slot, take action depending on the actual condition:
|
||||
# 1) No node has keys for this slot.
|
||||
@ -336,7 +335,7 @@ class RedisTrib
|
||||
not_covered.each{|slot|
|
||||
nodes = nodes_with_keys_in_slot(slot)
|
||||
slots[slot] = nodes
|
||||
puts "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join}"
|
||||
xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join}"
|
||||
}
|
||||
|
||||
none = slots.select {|k,v| v.length == 0}
|
||||
@ -345,34 +344,34 @@ class RedisTrib
|
||||
|
||||
# Handle case "1": keys in no node.
|
||||
if none.length > 0
|
||||
puts "The folowing uncovered slots have no keys across the cluster:"
|
||||
puts none.keys.join(",")
|
||||
xputs "The folowing uncovered slots have no keys across the cluster:"
|
||||
xputs none.keys.join(",")
|
||||
yes_or_die "Fix these slots by covering with a random node?"
|
||||
none.each{|slot,nodes|
|
||||
node = @nodes.sample
|
||||
puts "Covering slot #{slot} with #{node}"
|
||||
xputs ">>> Covering slot #{slot} with #{node}"
|
||||
node.r.cluster("addslots",slot)
|
||||
}
|
||||
end
|
||||
|
||||
# Handle case "2": keys only in one node.
|
||||
if single.length > 0
|
||||
puts "The folowing uncovered slots have keys in just one node:"
|
||||
xputs "The folowing uncovered slots have keys in just one node:"
|
||||
puts single.keys.join(",")
|
||||
yes_or_die "Fix these slots by covering with those nodes?"
|
||||
single.each{|slot,nodes|
|
||||
puts "Covering slot #{slot} with #{nodes[0]}"
|
||||
xputs ">>> Covering slot #{slot} with #{nodes[0]}"
|
||||
nodes[0].r.cluster("addslots",slot)
|
||||
}
|
||||
end
|
||||
|
||||
# Handle case "3": keys in multiple nodes.
|
||||
if multi.length > 0
|
||||
puts "The folowing uncovered slots have keys in multiple nodes:"
|
||||
puts multi.keys.join(",")
|
||||
xputs "The folowing uncovered slots have keys in multiple nodes:"
|
||||
xputs multi.keys.join(",")
|
||||
yes_or_die "Fix these slots by moving keys into a single node?"
|
||||
multi.each{|slot,nodes|
|
||||
puts "Covering slot #{slot} moving keys to #{nodes[0]}"
|
||||
xputs ">>> Covering slot #{slot} moving keys to #{nodes[0]}"
|
||||
# TODO
|
||||
# 1) Set all nodes as "MIGRATING" for this slot, so that we
|
||||
# can access keys in the hash slot using ASKING.
|
||||
@ -397,10 +396,10 @@ class RedisTrib
|
||||
elsif n.info[:importing][slot]
|
||||
importing << n
|
||||
elsif n.r.cluster("countkeysinslot",slot) > 0
|
||||
puts "Found keys about slot #{slot} in node #{n}!"
|
||||
xputs "*** Found keys about slot #{slot} in node #{n}!"
|
||||
end
|
||||
}
|
||||
puts "Fixing open slot 0:"
|
||||
puts ">>> Fixing open slot #{slot}"
|
||||
puts "Set as migrating in: #{migrating.join(",")}"
|
||||
puts "Set as importing in: #{importing.join(",")}"
|
||||
|
||||
@ -409,7 +408,7 @@ class RedisTrib
|
||||
if migrating.length == 1 && importing.length == 1
|
||||
move_slot(migrating[0],importing[0],slot,:verbose=>true)
|
||||
else
|
||||
puts "Sorry, Redis-trib can't fix this slot yet (work in progress)"
|
||||
xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress)"
|
||||
end
|
||||
end
|
||||
|
||||
@ -420,9 +419,9 @@ class RedisTrib
|
||||
signatures << n.get_config_signature
|
||||
}
|
||||
if signatures.uniq.length != 1
|
||||
puts "[ERR] Nodes don't agree about configuration!"
|
||||
cluster_error "[ERR] Nodes don't agree about configuration!"
|
||||
else
|
||||
puts "[OK] All nodes agree about slots configuration."
|
||||
xputs "[OK] All nodes agree about slots configuration."
|
||||
end
|
||||
end
|
||||
|
||||
@ -446,7 +445,7 @@ class RedisTrib
|
||||
|
||||
def show_nodes
|
||||
@nodes.each{|n|
|
||||
puts n.info_string
|
||||
xputs n.info_string
|
||||
}
|
||||
end
|
||||
|
||||
@ -467,7 +466,7 @@ class RedisTrib
|
||||
print "#{msg} (type 'yes' to accept): "
|
||||
STDOUT.flush
|
||||
if !(STDIN.gets.chomp.downcase == "yes")
|
||||
puts "Aborting..."
|
||||
xputs "*** Aborting..."
|
||||
exit 1
|
||||
end
|
||||
end
|
||||
@ -571,7 +570,7 @@ class RedisTrib
|
||||
load_cluster_info_from_node(ARGV[1])
|
||||
check_cluster
|
||||
if @errors.length != 0
|
||||
puts "\n--- Please fix your cluster problems before resharding ---"
|
||||
puts "*** Please fix your cluster problems before resharding"
|
||||
exit 1
|
||||
end
|
||||
numslots = 0
|
||||
@ -584,14 +583,14 @@ class RedisTrib
|
||||
print "What is the receiving node ID? "
|
||||
target = get_node_by_name(STDIN.gets.chop)
|
||||
if !target || target.has_flag?("slave")
|
||||
puts "The specified node is not known or not a master, please retry."
|
||||
xputs "*** The specified node is not known or not a master, please retry."
|
||||
target = nil
|
||||
end
|
||||
end
|
||||
sources = []
|
||||
puts "Please enter all the source node IDs."
|
||||
puts " Type 'all' to use all the nodes as source nodes for the hash slots."
|
||||
puts " Type 'done' once you entered all the source nodes IDs."
|
||||
xputs "Please enter all the source node IDs."
|
||||
xputs " Type 'all' to use all the nodes as source nodes for the hash slots."
|
||||
xputs " Type 'done' once you entered all the source nodes IDs."
|
||||
while true
|
||||
print "Source node ##{sources.length+1}:"
|
||||
line = STDIN.gets.chop
|
||||
@ -611,9 +610,9 @@ class RedisTrib
|
||||
}
|
||||
break
|
||||
elsif !src || src.has_flag?("slave")
|
||||
puts "The specified node is not known or is not a master, please retry."
|
||||
xputs "*** The specified node is not known or is not a master, please retry."
|
||||
elsif src.info[:name] == target.info[:name]
|
||||
puts "It is not possible to use the target node as source node."
|
||||
xputs "*** It is not possible to use the target node as source node."
|
||||
else
|
||||
sources << src
|
||||
end
|
||||
@ -635,7 +634,7 @@ class RedisTrib
|
||||
end
|
||||
|
||||
def create_cluster_cmd
|
||||
puts "Creating cluster"
|
||||
xputs ">>> Creating cluster"
|
||||
ARGV[1..-1].each{|n|
|
||||
node = ClusterNode.new(n)
|
||||
node.connect(:abort => true)
|
||||
@ -644,19 +643,19 @@ class RedisTrib
|
||||
node.assert_empty
|
||||
add_node(node)
|
||||
}
|
||||
puts "Performing hash slots allocation on #{@nodes.length} nodes..."
|
||||
xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
|
||||
alloc_slots
|
||||
show_nodes
|
||||
yes_or_die "Can I set the above configuration?"
|
||||
flush_nodes_config
|
||||
puts "** Nodes configuration updated"
|
||||
puts "** Sending CLUSTER MEET messages to join the cluster"
|
||||
xputs ">>> Nodes configuration updated"
|
||||
xputs ">>> Sending CLUSTER MEET messages to join the cluster"
|
||||
join_cluster
|
||||
check_cluster
|
||||
end
|
||||
|
||||
def addnode_cluster_cmd
|
||||
puts "Adding node #{ARGV[1]} to cluster #{ARGV[2]}"
|
||||
xputs ">>> Adding node #{ARGV[1]} to cluster #{ARGV[2]}"
|
||||
|
||||
# Check the existing cluster
|
||||
load_cluster_info_from_node(ARGV[2])
|
||||
@ -671,7 +670,7 @@ class RedisTrib
|
||||
first = @nodes.first.info
|
||||
|
||||
# Send CLUSTER MEET command to the new node
|
||||
puts "Send CLUSTER MEET to node #{new} to make it join the cluster."
|
||||
xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster."
|
||||
new.r.cluster("meet",first[:host],first[:port])
|
||||
end
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user