Emit SELECT to slaves in a centralized way.

Before this commit every Redis slave had its own selected database ID
state. This was not actually useful as the emitted stream of commands
is identical for all the slaves.

Now the the currently selected database is a global state that is set to
-1 when a new slave is attached, in order to force the SELECT command to
be re-emitted for all the slaves.

This change is useful in order to implement replication partial
resynchronization in the future, as makes sure that the stream of
commands received by slaves, including SELECT commands, are exactly the
same for every slave connected, at any time.

In this way we could have a global offset that can identify a specific
piece of the master -> slaves stream of commands.
This commit is contained in:
antirez 2012-11-02 16:31:28 +01:00
parent 1a27d41156
commit 7465ac7ab1
3 changed files with 5 additions and 5 deletions

View File

@ -1305,6 +1305,7 @@ void initServer() {
server.clients_to_close = listCreate(); server.clients_to_close = listCreate();
server.slaves = listCreate(); server.slaves = listCreate();
server.monitors = listCreate(); server.monitors = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate(); server.unblocked_clients = listCreate();
server.ready_keys = listCreate(); server.ready_keys = listCreate();
@ -2287,7 +2288,6 @@ void monitorCommand(redisClient *c) {
if (c->flags & REDIS_SLAVE) return; if (c->flags & REDIS_SLAVE) return;
c->flags |= (REDIS_SLAVE|REDIS_MONITOR); c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
c->slaveseldb = 0;
listAddNodeTail(server.monitors,c); listAddNodeTail(server.monitors,c);
addReply(c,shared.ok); addReply(c,shared.ok);
} }

View File

@ -412,7 +412,6 @@ typedef struct redisClient {
time_t lastinteraction; /* time of the last interaction, used for timeout */ time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time; time_t obuf_soft_limit_reached_time;
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */ int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */ int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */ int repldbfd; /* replication DB file descriptor */
@ -663,6 +662,7 @@ struct redisServer {
list *clients; /* List of active clients */ list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */ list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
int slaveseldb; /* Last SELECTed DB in replication output */
redisClient *current_client; /* Current client, only used on crash report */ redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */

View File

@ -54,7 +54,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Feed slaves that are waiting for the initial SYNC (so these commands /* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes), * are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */ * or are already in sync with the master. */
if (slave->slaveseldb != dictid) { if (server.slaveseldb != dictid) {
robj *selectcmd; robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
@ -66,11 +66,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
} }
addReply(slave,selectcmd); addReply(slave,selectcmd);
decrRefCount(selectcmd); decrRefCount(selectcmd);
slave->slaveseldb = dictid;
} }
addReplyMultiBulkLen(slave,argc); addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
} }
server.slaveseldb = dictid;
} }
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
@ -177,7 +177,7 @@ void syncCommand(redisClient *c) {
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1; c->repldbfd = -1;
c->flags |= REDIS_SLAVE; c->flags |= REDIS_SLAVE;
c->slaveseldb = 0; server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c); listAddNodeTail(server.slaves,c);
return; return;
} }