diff --git a/src/cluster.c b/src/cluster.c index 81eff944..046a36fc 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1212,6 +1212,15 @@ void clusterSetNodeAsMaster(clusterNode *n) { void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) { int j; clusterNode *curmaster, *newmaster = NULL; + /* The dirty slots list is a list of slots for which we lose the ownership + * while having still keys inside. This usually happens after a failover + * or after a manual cluster reconfiguration operated by the admin. + * + * If the update message is not able to demote a master to slave (in this + * case we'll resync with the master updating the whole key space), we + * need to delete all the keys in the slots we lost ownership. */ + uint16_t dirty_slots[REDIS_CLUSTER_SLOTS]; + int dirty_slots_count = 0; /* Here we set curmaster to this node or the node this node * replicates to if it's a slave. In the for loop we are @@ -1241,25 +1250,14 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->configEpoch < senderConfigEpoch) { - /* Was this slot mine, and still contains keys? Something - * odd happened, put the slot in importing state so that - * redis-trib fix can detect the condition (and no further - * updates will be processed before the slot gets fixed). */ + /* Was this slot mine, and still contains keys? Mark it as + * a dirty slot. */ if (server.cluster->slots[j] == myself && countKeysInSlot(j) && sender != myself) { - redisLog(REDIS_WARNING, - "I received an update for slot %d. " - "%.40s claims it with config %llu, " - "I've it assigned to myself with config %llu. " - "I've still keys about this slot! " - "Putting the slot in IMPORTING state. " - "Please run the 'redis-trib fix' command.", - j, sender->name, - (unsigned long long) senderConfigEpoch, - (unsigned long long) myself->configEpoch); - server.cluster->importing_slots_from[j] = sender; + dirty_slots[dirty_slots_count] = j; + dirty_slots_count++; } if (server.cluster->slots[j] == curmaster) @@ -1288,6 +1286,16 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); + } else if (dirty_slots_count) { + /* If we are here, we received an update message which removed + * ownership for certain slots we still have keys about, but still + * we are serving some slots, so this master node was not demoted to + * a slave. + * + * In order to maintain a consistent state between keys and slots + * we need to remove all the keys from the slots we lost. */ + for (j = 0; j < dirty_slots_count; j++) + delKeysInSlot(dirty_slots[j]); } } diff --git a/src/db.c b/src/db.c index 72fc2be0..f849e524 100644 --- a/src/db.c +++ b/src/db.c @@ -1143,7 +1143,7 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun range.min = range.max = hashslot; range.minex = range.maxex = 0; - + n = zslFirstInRange(server.cluster->slots_to_keys, &range); while(n && n->score == hashslot && count--) { keys[j++] = n->obj; @@ -1152,6 +1152,28 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun return j; } +/* Remove all the keys in the specified hash slot. + * The number of removed items is returned. */ +unsigned int delKeysInSlot(unsigned int hashslot) { + zskiplistNode *n; + zrangespec range; + int j = 0; + + range.min = range.max = hashslot; + range.minex = range.maxex = 0; + + n = zslFirstInRange(server.cluster->slots_to_keys, &range); + while(n && n->score == hashslot) { + robj *key = n->obj; + n = n->level[0].forward; /* Go to the next item before freeing it. */ + incrRefCount(key); /* Protect the object while freeing it. */ + dbDelete(&server.db[0],key); + decrRefCount(key); + j++; + } + return j; +} + unsigned int countKeysInSlot(unsigned int hashslot) { zskiplist *zsl = server.cluster->slots_to_keys; zskiplistNode *zn; diff --git a/src/redis.h b/src/redis.h index 9c5507bb..e6b7ea93 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1271,6 +1271,7 @@ void signalModifiedKey(redisDb *db, robj *key); void signalFlushedDb(int dbid); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); unsigned int countKeysInSlot(unsigned int hashslot); +unsigned int delKeysInSlot(unsigned int hashslot); int verifyClusterConfigWithData(void); void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor); int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor);