PSYNC2: Save replication ID/offset on RDB file.

This means that stopping a slave and restarting it will still make it
able to PSYNC with the master. Moreover the master itself will retain
its ID/offset, in case it gets turned into a slave, or if a slave will
try to PSYNC with it with an exactly updated offset (otherwise there is
no backlog).

This change was possible thanks to PSYNC v2 that makes saving the current
replication state much simpler.
This commit is contained in:
antirez 2016-11-10 12:35:29 +01:00
parent 4e5e366ed2
commit 28c96d73b2
4 changed files with 29 additions and 3 deletions

View File

@ -855,6 +855,8 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
} }
} }
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1;
return 1; return 1;
} }
@ -1513,6 +1515,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
(char*)auxval->ptr); (char*)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) { } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
if (rsi) rsi->repl_stream_db = atoi(auxval->ptr); if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-id")) {
if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
rsi->repl_id_is_set = 1;
}
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
} else { } else {
/* We ignore fields we don't understand, as by AUX field /* We ignore fields we don't understand, as by AUX field
* contract. */ * contract. */

View File

@ -39,7 +39,6 @@
void replicationDiscardCachedMaster(void); void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd); void replicationResurrectCachedMaster(int newfd);
void replicationCacheMasterUsingMyself(void);
void replicationSendAck(void); void replicationSendAck(void);
void putSlaveOnline(client *slave); void putSlaveOnline(client *slave);
int cancelReplicationHandshake(void); int cancelReplicationHandshake(void);

View File

@ -3423,9 +3423,20 @@ void loadDataFromDisk(void) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK) if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else { } else {
if (rdbLoad(server.rdb_filename,NULL) == C_OK) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000); (float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if (rsi.repl_id_is_set && rsi.repl_offset != -1) {
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
if (server.masterhost) replicationCacheMasterUsingMyself();
}
} else if (errno != ENOENT) { } else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1); exit(1);

View File

@ -813,10 +813,16 @@ struct redisMemOverhead {
* select the correct DB and are able to accept the stream coming from the * select the correct DB and are able to accept the stream coming from the
* top-level master. */ * top-level master. */
typedef struct rdbSaveInfo { typedef struct rdbSaveInfo {
/* Used saving and loading. */
int repl_stream_db; /* DB to select in server.master client. */ int repl_stream_db; /* DB to select in server.master client. */
/* Used only loading. */
int repl_id_is_set; /* True if repl_id field is set. */
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */
} rdbSaveInfo; } rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1} #define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Global server state * Global server state
@ -1441,6 +1447,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void); void changeReplicationId(void);
void clearReplicationId2(void); void clearReplicationId2(void);
void chopReplicationBacklog(void); void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoading(FILE *fp); void startLoading(FILE *fp);