Cluster: redis-trib reshard / rebalance --pipeline support.

This commit is contained in:
antirez 2015-12-18 12:27:14 +01:00
parent 77f849b52f
commit 628af70214

View File

@ -26,6 +26,7 @@ require 'redis'
ClusterHashSlots = 16384
MigrateDefaultTimeout = 60000
MigrateDefaultPipeline = 10
RebalanceDefaultThreshold = 2
$verbose = false
@ -36,6 +37,8 @@ def xputs(s)
color="29;1"
when "[ER"
color="31;1"
when "[WA"
color="31;1"
when "[OK"
color="32"
when "[FA","***"
@ -847,6 +850,8 @@ class RedisTrib
# :update -- Update nodes.info[:slots] for source/target nodes.
# :quiet -- Don't print info messages.
def move_slot(source,target,slot,o={})
o = {:pipeline => MigrateDefaultPipeline}.merge(o)
# We start marking the slot as importing in the destination node,
# and the slot as migrating in the target host. Note that the order of
# the operations is important, as otherwise a client may be redirected
@ -862,7 +867,7 @@ class RedisTrib
end
# Migrate all the keys from source to target using the MIGRATE command
while true
keys = source.r.cluster("getkeysinslot",slot,10)
keys = source.r.cluster("getkeysinslot",slot,o[:pipeline])
break if keys.length == 0
begin
source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys])
@ -908,6 +913,10 @@ class RedisTrib
end
def rebalance_cluster_cmd(argv,opt)
opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
# Load nodes info before parsing options, otherwise we can't
# handle --weight.
load_cluster_info_from_node(argv[0])
# Options parsing
@ -995,7 +1004,10 @@ class RedisTrib
else
reshard_table.each{|e|
move_slot(e[:source],dst,e[:slot],
:quiet=>true,:dots=>false,:update=>true)
:quiet=>true,
:dots=>false,
:update=>true,
:pipeline=>opt['pipeline'])
print "#"
STDOUT.flush
}
@ -1020,6 +1032,8 @@ class RedisTrib
end
def reshard_cluster_cmd(argv,opt)
opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
load_cluster_info_from_node(argv[0])
check_cluster
if @errors.length != 0
@ -1132,7 +1146,9 @@ class RedisTrib
exit(1) if (yesno != "yes")
end
reshard_table.each{|e|
move_slot(e[:source],target,e[:slot],:dots=>true)
move_slot(e[:source],target,e[:slot],
:dots=>true,
:pipeline=>opt['pipeline'])
}
end
@ -1531,8 +1547,8 @@ ALLOWED_OPTIONS={
"create" => {"replicas" => true},
"add-node" => {"slave" => false, "master-id" => true},
"import" => {"from" => :required, "copy" => false, "replace" => false},
"reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => MigrateDefaultTimeout},
"rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => MigrateDefaultTimeout, "simulate" => false},
"reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
"rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true},
"fix" => {"timeout" => MigrateDefaultTimeout},
}