mirror of
https://github.com/fluencelabs/redis
synced 2025-04-24 10:02:14 +00:00
Merge pull request #5708 from artix75/cluster_manager_fix_cmd
Cluster Manager: compare key values after BUSYKEY error (migration).
This commit is contained in:
commit
0d166674f9
133
src/redis-cli.c
133
src/redis-cli.c
@ -2934,6 +2934,68 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the hash for the values of the specified keys in *keys_reply for the
|
||||||
|
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
|
||||||
|
* on both nodes. Every key with same name on both nodes but having different
|
||||||
|
* values will be added to the *diffs list. Return 0 in case of reply
|
||||||
|
* error. */
|
||||||
|
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
|
||||||
|
clusterManagerNode *n2,
|
||||||
|
redisReply *keys_reply,
|
||||||
|
list *diffs)
|
||||||
|
{
|
||||||
|
size_t i, argc = keys_reply->elements + 2;
|
||||||
|
static const char *hash_zero = "0000000000000000000000000000000000000000";
|
||||||
|
char **argv = zcalloc(argc * sizeof(char *));
|
||||||
|
size_t *argv_len = zcalloc(argc * sizeof(size_t));
|
||||||
|
argv[0] = "DEBUG";
|
||||||
|
argv_len[0] = 5;
|
||||||
|
argv[1] = "DIGEST-VALUE";
|
||||||
|
argv_len[1] = 12;
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
redisReply *entry = keys_reply->element[i];
|
||||||
|
int idx = i + 2;
|
||||||
|
argv[idx] = entry->str;
|
||||||
|
argv_len[idx] = entry->len;
|
||||||
|
}
|
||||||
|
int success = 0;
|
||||||
|
void *_reply1 = NULL, *_reply2 = NULL;
|
||||||
|
redisReply *r1 = NULL, *r2 = NULL;
|
||||||
|
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r1 = (redisReply *) _reply1;
|
||||||
|
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r2 = (redisReply *) _reply2;
|
||||||
|
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
|
||||||
|
if (r1->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (r2->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
assert(keys_reply->elements == r1->elements &&
|
||||||
|
keys_reply->elements == r2->elements);
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
char *key = keys_reply->element[i]->str;
|
||||||
|
char *hash1 = r1->element[i]->str;
|
||||||
|
char *hash2 = r2->element[i]->str;
|
||||||
|
/* Ignore keys that don't exist in both nodes. */
|
||||||
|
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
|
||||||
|
continue;
|
||||||
|
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
if (r1) freeReplyObject(r1);
|
||||||
|
if (r2) freeReplyObject(r2);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||||
* is not NULL, a dot will be printed for every migrated key. */
|
* is not NULL, a dot will be printed for every migrated key. */
|
||||||
@ -3014,8 +3076,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
char **err)
|
char **err)
|
||||||
{
|
{
|
||||||
int success = 1;
|
int success = 1;
|
||||||
int retry = (config.cluster_manager_command.flags &
|
int do_fix = config.cluster_manager_command.flags &
|
||||||
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
|
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
|
int do_replace = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
|
||||||
while (1) {
|
while (1) {
|
||||||
char *dots = NULL;
|
char *dots = NULL;
|
||||||
redisReply *reply = NULL, *migrate_reply = NULL;
|
redisReply *reply = NULL, *migrate_reply = NULL;
|
||||||
@ -3049,6 +3113,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
||||||
int not_served = 0;
|
int not_served = 0;
|
||||||
if (!is_busy) {
|
if (!is_busy) {
|
||||||
|
/* Check if the slot is unassigned (not served) in the
|
||||||
|
* source node's configuration. */
|
||||||
char *get_owner_err = NULL;
|
char *get_owner_err = NULL;
|
||||||
clusterManagerNode *served_by =
|
clusterManagerNode *served_by =
|
||||||
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
||||||
@ -3061,20 +3127,69 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (retry && (is_busy || not_served)) {
|
/* Try to handle errors. */
|
||||||
/* If the key already exists, try to migrate keys
|
if (is_busy || not_served) {
|
||||||
* adding REPLACE option.
|
/* If the key's slot is not served, try to assign slot
|
||||||
* If the key's slot is not served, try to assign slot
|
|
||||||
* to the target node. */
|
* to the target node. */
|
||||||
if (not_served) {
|
if (do_fix && not_served) {
|
||||||
clusterManagerLogWarn("*** Slot was not served, setting "
|
clusterManagerLogWarn("*** Slot was not served, setting "
|
||||||
"owner to node %s:%d.\n",
|
"owner to node %s:%d.\n",
|
||||||
target->ip, target->port);
|
target->ip, target->port);
|
||||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||||
}
|
}
|
||||||
|
/* If the key already exists in the target node (BUSYKEY),
|
||||||
|
* check whether its value is the same in both nodes.
|
||||||
|
* In case of equal values, retry migration with the
|
||||||
|
* REPLACE option.
|
||||||
|
* In case of different values:
|
||||||
|
* - If the migration is requested by the fix command, stop
|
||||||
|
* and warn the user.
|
||||||
|
* - In other cases (ie. reshard), proceed only if the user
|
||||||
|
* launched the command with the --cluster-replace option.*/
|
||||||
if (is_busy) {
|
if (is_busy) {
|
||||||
clusterManagerLogWarn("*** Target key exists. "
|
clusterManagerLogWarn("\n*** Target key exists, "
|
||||||
"Replacing it for FIX.\n");
|
"checking values...\n");
|
||||||
|
list *diffs = listCreate();
|
||||||
|
success = clusterManagerCompareKeysValues(source,
|
||||||
|
target, reply, diffs);
|
||||||
|
if (!success && (do_fix || !do_replace)) {
|
||||||
|
listRelease(diffs);
|
||||||
|
clusterManagerLogErr("*** Value check failed!\n");
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
if (listLength(diffs) > 0 && (do_fix || !do_replace)) {
|
||||||
|
success = 0;
|
||||||
|
clusterManagerLogErr(
|
||||||
|
"*** Found %d key(s) in both source node and "
|
||||||
|
"target node having different values.\n"
|
||||||
|
" Source node: %s:%d\n"
|
||||||
|
" Target node: %s:%d\n"
|
||||||
|
" Keys(s):\n",
|
||||||
|
listLength(diffs),
|
||||||
|
source->ip, source->port,
|
||||||
|
target->ip, target->port);
|
||||||
|
listIter dli;
|
||||||
|
listNode *dln;
|
||||||
|
listRewind(diffs, &dli);
|
||||||
|
while((dln = listNext(&dli)) != NULL) {
|
||||||
|
char *k = dln->value;
|
||||||
|
clusterManagerLogErr(" - %s\n", k);
|
||||||
|
}
|
||||||
|
clusterManagerLogErr("Please fix the above key(s) "
|
||||||
|
"manually ");
|
||||||
|
if (do_fix)
|
||||||
|
clusterManagerLogErr("and try again!\n");
|
||||||
|
else {
|
||||||
|
clusterManagerLogErr("or relaunch the command "
|
||||||
|
"with --cluster-replace "
|
||||||
|
"option to force key "
|
||||||
|
"overriding.\n");
|
||||||
|
}
|
||||||
|
listRelease(diffs);
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
listRelease(diffs);
|
||||||
|
clusterManagerLogWarn("*** Replacing target keys...\n");
|
||||||
}
|
}
|
||||||
freeReplyObject(migrate_reply);
|
freeReplyObject(migrate_reply);
|
||||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user