diff --git a/src/blocked.c b/src/blocked.c index ae2500aa..8acfb818 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -155,3 +155,27 @@ void replyToBlockedClientTimedOut(redisClient *c) { } } +/* Mass-unblock clients because something changed in the instance that makes + * blocking no longer safe. For example clients blocked in list operations + * in an instance which turns from master to slave is unsafe, so this function + * is called when a master turns into a slave. + * + * The semantics is to send an -UNBLOCKED error to the client, disconnecting + * it at the same time. */ +void disconnectAllBlockedClients(void) { + listNode *ln; + listIter li; + + listRewind(server.clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + if (c->flags & REDIS_BLOCKED) { + addReplySds(c,sdsnew( + "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> slave?)\r\n")); + unblockClient(c); + c->flags |= REDIS_CLOSE_AFTER_REPLY; + } + } +} diff --git a/src/redis.h b/src/redis.h index 34d8b0a4..53f3967d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1395,6 +1395,7 @@ void blockClient(redisClient *c, int btype); void unblockClient(redisClient *c); void replyToBlockedClientTimedOut(redisClient *c); int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit); +void disconnectAllBlockedClients(void); /* Git SHA1 */ char *redisGitSHA1(void); diff --git a/src/replication.c b/src/replication.c index 697acbef..afea75b6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1444,6 +1444,7 @@ void replicationSetMaster(char *ip, int port) { server.masterhost = sdsnew(ip); server.masterport = port; if (server.master) freeClient(server.master); + disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ disconnectSlaves(); /* Force our slaves to resync with us as well. */ replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */