Binding multiple IPs done properly with multiple sockets.

This commit is contained in:
antirez 2013-07-05 11:47:20 +02:00
parent 2160effc78
commit 98eecb70eb
5 changed files with 79 additions and 31 deletions

View File

@ -962,8 +962,7 @@ int rewriteAppendOnlyFileBackground(void) {
char tmpfile[256]; char tmpfile[256];
/* Child */ /* Child */
if (server.ipfd > 0) close(server.ipfd); closeListeningSockets(0);
if (server.sofd > 0) close(server.sofd);
redisSetProcTitle("redis-aof-rewrite"); redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {

View File

@ -224,7 +224,7 @@ void clusterSaveConfigOrDie(void) {
} }
void clusterInit(void) { void clusterInit(void) {
int saveconf = 0; int saveconf = 0, j;
server.cluster = zmalloc(sizeof(clusterState)); server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL; server.cluster->myself = NULL;
@ -251,14 +251,24 @@ void clusterInit(void) {
} }
if (saveconf) clusterSaveConfigOrDie(); if (saveconf) clusterSaveConfigOrDie();
/* We need a listening TCP port for our cluster messaging needs */ /* We need a listening TCP port for our cluster messaging needs */
server.cfd = anetTcpServer(server.neterr, server.cfd_count = 0;
server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr); if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
if (server.cfd == -1) { for (j = 0; j < server.bindaddr_count || j == 0; j++) {
redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr); server.cfd[j] = anetTcpServer(
exit(1); server.neterr, server.port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr[j]);
if (server.cfd[j] == -1) {
redisLog(REDIS_WARNING,
"Opening cluster listening TCP socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
server.port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
exit(1);
}
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
server.cfd_count++;
} }
if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
server.cluster->slots_to_keys = zslCreate(); server.cluster->slots_to_keys = zslCreate();
} }

View File

@ -729,8 +729,7 @@ int rdbSaveBackground(char *filename) {
int retval; int retval;
/* Child */ /* Child */
if (server.ipfd > 0) close(server.ipfd); closeListeningSockets(0);
if (server.sofd > 0) close(server.sofd);
redisSetProcTitle("redis-rdb-bgsave"); redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename); retval = rdbSave(filename);
if (retval == REDIS_OK) { if (retval == REDIS_OK) {

View File

@ -1224,7 +1224,7 @@ void initServerConfig() {
server.bindaddr_count = 0; server.bindaddr_count = 0;
server.unixsocket = NULL; server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM; server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd = -1; server.ipfd_count = 0;
server.sofd = -1; server.sofd = -1;
server.dbnum = REDIS_DEFAULT_DBNUM; server.dbnum = REDIS_DEFAULT_DBNUM;
server.verbosity = REDIS_DEFAULT_VERBOSITY; server.verbosity = REDIS_DEFAULT_VERBOSITY;
@ -1425,14 +1425,25 @@ void initServer() {
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum); server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening sockets. */
if (server.port != 0) { if (server.port != 0) {
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr,server.bindaddr_count); /* Force binding of 0.0.0.0 if no bind address is specified, always
if (server.ipfd == ANET_ERR) { * entering the loop if j == 0. */
redisLog(REDIS_WARNING, "Opening port %d: %s", if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
server.port, server.neterr); for (j = 0; j < server.bindaddr_count || j == 0; j++) {
exit(1); server.ipfd[server.ipfd_count] = anetTcpServer(server.neterr,server.port,server.bindaddr[j]);
if (server.ipfd[server.ipfd_count] == ANET_ERR) {
redisLog(REDIS_WARNING,
"Creating Server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
server.port, server.neterr);
exit(1);
}
server.ipfd_count++;
} }
} }
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) { if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */ unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm); server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
@ -1441,10 +1452,14 @@ void initServer() {
exit(1); exit(1);
} }
} }
if (server.ipfd < 0 && server.sofd < 0) {
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1); exit(1);
} }
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL);
@ -1487,15 +1502,28 @@ void initServer() {
server.unixtime = time(NULL); server.unixtime = time(NULL);
server.lastbgsave_status = REDIS_OK; server.lastbgsave_status = REDIS_OK;
server.repl_good_slaves_count = 0; server.repl_good_slaves_count = 0;
/* Create the serverCron() time event, that's our main way to process
* background operations. */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event."); redisPanic("Can't create the serverCron time event.");
exit(1); exit(1);
} }
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); /* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
if (server.aof_state == REDIS_AOF_ON) { if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename, server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644); O_WRONLY|O_APPEND|O_CREAT,0644);
@ -1930,6 +1958,21 @@ int processCommand(redisClient *c) {
/*================================== Shutdown =============================== */ /*================================== Shutdown =============================== */
/* Close listening sockets. Also unlink the unix domain socket if
* unlink_unix_socket is non-zero. */
void closeListeningSockets(int unlink_unix_socket) {
int j;
for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
if (server.sofd != -1) close(server.sofd);
if (server.cluster_enabled)
for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
if (unlink_unix_socket && server.unixsocket) {
redisLog(REDIS_NOTICE,"Removing the unix socket file.");
unlink(server.unixsocket); /* don't care if this fails */
}
}
int prepareForShutdown(int flags) { int prepareForShutdown(int flags) {
int save = flags & REDIS_SHUTDOWN_SAVE; int save = flags & REDIS_SHUTDOWN_SAVE;
int nosave = flags & REDIS_SHUTDOWN_NOSAVE; int nosave = flags & REDIS_SHUTDOWN_NOSAVE;
@ -1973,13 +2016,7 @@ int prepareForShutdown(int flags) {
unlink(server.pidfile); unlink(server.pidfile);
} }
/* Close the listening sockets. Apparently this allows faster restarts. */ /* Close the listening sockets. Apparently this allows faster restarts. */
if (server.ipfd != -1) close(server.ipfd); closeListeningSockets(1);
if (server.sofd != -1) close(server.sofd);
if (server.unixsocket) {
redisLog(REDIS_NOTICE,"Removing the unix socket file.");
unlink(server.unixsocket); /* don't care if this fails */
}
redisLog(REDIS_WARNING,"Redis is now ready to exit, bye bye..."); redisLog(REDIS_WARNING,"Redis is now ready to exit, bye bye...");
return REDIS_OK; return REDIS_OK;
} }
@ -2936,7 +2973,7 @@ int main(int argc, char **argv) {
exit(1); exit(1);
} }
} }
if (server.ipfd > 0) if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
if (server.sofd > 0) if (server.sofd > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);

View File

@ -735,9 +735,11 @@ struct redisServer {
int bindaddr_count; /* Number of addresses in server.bindaddr[] */ int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *unixsocket; /* UNIX socket path */ char *unixsocket; /* UNIX socket path */
mode_t unixsocketperm; /* UNIX socket permission */ mode_t unixsocketperm; /* UNIX socket permission */
int ipfd; /* TCP socket file descriptor */ int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
int ipfd_count; /* Used slots in ipfd[] */
int sofd; /* Unix socket file descriptor */ int sofd; /* Unix socket file descriptor */
int cfd; /* Cluster bus listening socket */ int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
int cfd_count; /* Used slots in cfd[] */
list *clients; /* List of active clients */ list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */ list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
@ -1246,6 +1248,7 @@ void oom(const char *msg);
void populateCommandTable(void); void populateCommandTable(void);
void resetCommandTableStats(void); void resetCommandTableStats(void);
void adjustOpenFilesLimit(void); void adjustOpenFilesLimit(void);
void closeListeningSockets(int unlink_unix_socket);
/* Set data type */ /* Set data type */
robj *setTypeCreate(robj *value); robj *setTypeCreate(robj *value);