mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
Merge pull request #5691 from artix75/cluster_manager_fix_cmd
Cluster Manager: avoid using reply error messages to check slot status.
This commit is contained in:
commit
81008bf99e
130
src/redis-cli.c
130
src/redis-cli.c
@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
typedef int clusterManagerCommandProc(int argc, char **argv);
|
typedef int clusterManagerCommandProc(int argc, char **argv);
|
||||||
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
|
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
|
||||||
|
clusterManagerNode *n, int bulk_idx);
|
||||||
|
|
||||||
/* Cluster Manager helper functions */
|
/* Cluster Manager helper functions */
|
||||||
|
|
||||||
@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Execute MULTI command on a cluster node. */
|
/* Call MULTI command on a cluster node. */
|
||||||
static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
||||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
|
||||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||||
@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Execute EXEC command on a cluster node. */
|
/* Call EXEC command on a cluster node. */
|
||||||
static int clusterManagerExecTransaction(clusterManagerNode *node,
|
static int clusterManagerExecTransaction(clusterManagerNode *node,
|
||||||
clusterManagerOnReplyError onerror)
|
clusterManagerOnReplyError onerror)
|
||||||
{
|
{
|
||||||
@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node,
|
|||||||
redisReply *r = reply->element[i];
|
redisReply *r = reply->element[i];
|
||||||
char *err = NULL;
|
char *err = NULL;
|
||||||
success = clusterManagerCheckRedisReply(node, r, &err);
|
success = clusterManagerCheckRedisReply(node, r, &err);
|
||||||
if (!success && onerror) success = onerror(r, i);
|
if (!success && onerror) success = onerror(r, node, i);
|
||||||
if (err) {
|
if (err) {
|
||||||
if (!success)
|
if (!success)
|
||||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||||
@ -2768,6 +2769,55 @@ cleanup:
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the node the slot is assigned to from the point of view of node *n.
|
||||||
|
* If the slot is unassigned or if the reply is an error, return NULL.
|
||||||
|
* Use the **err argument in order to check wether the slot is unassigned
|
||||||
|
* or the reply resulted in an error. */
|
||||||
|
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
|
||||||
|
int slot, char **err)
|
||||||
|
{
|
||||||
|
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
|
||||||
|
clusterManagerNode *owner = NULL;
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
|
||||||
|
if (clusterManagerCheckRedisReply(n, reply, err)) {
|
||||||
|
assert(reply->type == REDIS_REPLY_ARRAY);
|
||||||
|
size_t i;
|
||||||
|
for (i = 0; i < reply->elements; i++) {
|
||||||
|
redisReply *r = reply->element[i];
|
||||||
|
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
|
||||||
|
int from, to;
|
||||||
|
from = r->element[0]->integer;
|
||||||
|
to = r->element[1]->integer;
|
||||||
|
if (slot < from || slot > to) continue;
|
||||||
|
redisReply *nr = r->element[2];
|
||||||
|
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
|
||||||
|
char *name = NULL;
|
||||||
|
if (nr->elements >= 3)
|
||||||
|
name = nr->element[2]->str;
|
||||||
|
if (name != NULL)
|
||||||
|
owner = clusterManagerNodeByName(name);
|
||||||
|
else {
|
||||||
|
char *ip = nr->element[0]->str;
|
||||||
|
assert(ip != NULL);
|
||||||
|
int port = (int) nr->element[1]->integer;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *nd = ln->value;
|
||||||
|
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
|
||||||
|
owner = nd;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (owner) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
return owner;
|
||||||
|
}
|
||||||
|
|
||||||
/* Set slot status to "importing" or "migrating" */
|
/* Set slot status to "importing" or "migrating" */
|
||||||
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
||||||
clusterManagerNode *node2,
|
clusterManagerNode *node2,
|
||||||
@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
|
|||||||
char *err = NULL;
|
char *err = NULL;
|
||||||
int success = clusterManagerCheckRedisReply(node, reply, &err);
|
int success = clusterManagerCheckRedisReply(node, reply, &err);
|
||||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||||
ignore_unassigned_err &&
|
ignore_unassigned_err)
|
||||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
{
|
||||||
|
char *get_owner_err = NULL;
|
||||||
|
clusterManagerNode *assigned_to =
|
||||||
|
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
|
||||||
|
if (!assigned_to) {
|
||||||
|
if (get_owner_err == NULL) success = 1;
|
||||||
|
else {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
|
||||||
|
zfree(get_owner_err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!success && err != NULL) {
|
if (!success && err != NULL) {
|
||||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||||
zfree(err);
|
zfree(err);
|
||||||
@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) {
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
|
/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
|
||||||
if (bulk_idx == 0 && reply) {
|
* errors except for ADDSLOTS errors.
|
||||||
if (reply->type == REDIS_REPLY_ERROR)
|
* Return 1 if the error should be ignored. */
|
||||||
return strstr(reply->str, "already unassigned") != NULL;
|
static int clusterManagerOnSetOwnerErr(redisReply *reply,
|
||||||
}
|
clusterManagerNode *n, int bulk_idx)
|
||||||
return 0;
|
{
|
||||||
|
UNUSED(reply);
|
||||||
|
UNUSED(n);
|
||||||
|
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
|
||||||
|
return (bulk_idx != 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
||||||
@ -2865,8 +2930,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
|||||||
clusterManagerAddSlot(owner, slot);
|
clusterManagerAddSlot(owner, slot);
|
||||||
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
|
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
|
||||||
clusterManagerBumpEpoch(owner);
|
clusterManagerBumpEpoch(owner);
|
||||||
success = clusterManagerExecTransaction(owner,
|
success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
|
||||||
clusterManagerIgnoreUnassignedErr);
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2950,7 +3014,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
char **err)
|
char **err)
|
||||||
{
|
{
|
||||||
int success = 1;
|
int success = 1;
|
||||||
int replace_existing_keys = (config.cluster_manager_command.flags &
|
int retry = (config.cluster_manager_command.flags &
|
||||||
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
|
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
|
||||||
while (1) {
|
while (1) {
|
||||||
char *dots = NULL;
|
char *dots = NULL;
|
||||||
@ -2983,16 +3047,35 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
if (migrate_reply == NULL) goto next;
|
if (migrate_reply == NULL) goto next;
|
||||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||||
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
||||||
int not_served = strstr(migrate_reply->str, "slot not served") != NULL;
|
int not_served = 0;
|
||||||
if (replace_existing_keys && (is_busy || not_served)) {
|
if (!is_busy) {
|
||||||
|
char *get_owner_err = NULL;
|
||||||
|
clusterManagerNode *served_by =
|
||||||
|
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
||||||
|
if (!served_by) {
|
||||||
|
if (get_owner_err == NULL) not_served = 1;
|
||||||
|
else {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
|
||||||
|
get_owner_err);
|
||||||
|
zfree(get_owner_err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (retry && (is_busy || not_served)) {
|
||||||
/* If the key already exists, try to migrate keys
|
/* If the key already exists, try to migrate keys
|
||||||
* adding REPLACE option.
|
* adding REPLACE option.
|
||||||
* If the key's slot is not served, try to assign slot
|
* If the key's slot is not served, try to assign slot
|
||||||
* to the target node. */
|
* to the target node. */
|
||||||
if (not_served)
|
if (not_served) {
|
||||||
|
clusterManagerLogWarn("*** Slot was not served, setting "
|
||||||
|
"owner to node %s:%d.\n",
|
||||||
|
target->ip, target->port);
|
||||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||||
|
}
|
||||||
|
if (is_busy) {
|
||||||
clusterManagerLogWarn("*** Target key exists. "
|
clusterManagerLogWarn("*** Target key exists. "
|
||||||
"Replacing it for FIX.\n");
|
"Replacing it for FIX.\n");
|
||||||
|
}
|
||||||
freeReplyObject(migrate_reply);
|
freeReplyObject(migrate_reply);
|
||||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
target,
|
target,
|
||||||
@ -4252,7 +4335,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
n->port);
|
n->port);
|
||||||
for (i = 0; i < n->migrating_count; i += 2) {
|
for (i = 0; i < n->migrating_count; i += 2) {
|
||||||
sds slot = n->migrating[i];
|
sds slot = n->migrating[i];
|
||||||
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
||||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||||
errstr = sdscatfmt(errstr, fmt, slot);
|
errstr = sdscatfmt(errstr, fmt, slot);
|
||||||
}
|
}
|
||||||
@ -4270,7 +4353,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
n->port);
|
n->port);
|
||||||
for (i = 0; i < n->importing_count; i += 2) {
|
for (i = 0; i < n->importing_count; i += 2) {
|
||||||
sds slot = n->importing[i];
|
sds slot = n->importing[i];
|
||||||
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
|
dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
|
||||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||||
errstr = sdscatfmt(errstr, fmt, slot);
|
errstr = sdscatfmt(errstr, fmt, slot);
|
||||||
}
|
}
|
||||||
@ -4333,7 +4416,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
/* Check whether there are multiple owners, even when slots are
|
/* Check whether there are multiple owners, even when slots are
|
||||||
* fully covered and there are no open slots. */
|
* fully covered and there are no open slots. */
|
||||||
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
||||||
int slot = 0;
|
int slot = 0, slots_with_multiple_owners = 0;
|
||||||
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
@ -4359,6 +4442,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
||||||
}
|
}
|
||||||
|
slots_with_multiple_owners++;
|
||||||
if (do_fix) {
|
if (do_fix) {
|
||||||
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
@ -4366,11 +4450,13 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
"for slot %d\n", slot);
|
"for slot %d\n", slot);
|
||||||
listRelease(owners);
|
listRelease(owners);
|
||||||
break;
|
break;
|
||||||
}
|
} else slots_with_multiple_owners--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
listRelease(owners);
|
listRelease(owners);
|
||||||
}
|
}
|
||||||
|
if (slots_with_multiple_owners == 0)
|
||||||
|
clusterManagerLogOk("[OK] No multiple owners found.\n");
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user