diff --git a/redis.conf b/redis.conf index 8d69f929..8ad5cc2e 100644 --- a/redis.conf +++ b/redis.conf @@ -20,7 +20,7 @@ daemonize no # default. You can specify a custom pid file location here. pidfile /var/run/redis.pid -# Accept connections on the specified port, default is 6379 +# Accept connections on the specified port, default is 6379. port 6379 # If you want you can bind a single interface, if the bind option is not @@ -28,6 +28,12 @@ port 6379 # # bind 127.0.0.1 +# Specify the path for the unix socket that will be used to listen for +# incoming connections. There is no default, so Redis will not listen +# on a unix socket when not specified. +# +# unixsocket /tmp/redis.sock + # Close the connection after a client is idle for N seconds (0 to disable) timeout 300 diff --git a/src/anet.c b/src/anet.c index 4fe811a1..e7763e4c 100644 --- a/src/anet.c +++ b/src/anet.c @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -123,20 +124,31 @@ int anetResolve(char *err, char *host, char *ipbuf) return ANET_OK; } +static int anetCreateSocket(char *err, int domain) { + int s, on = 1; + if ((s = socket(domain, SOCK_STREAM, 0)) == -1) { + anetSetError(err, "creating socket: %s\n", strerror(errno)); + return ANET_ERR; + } + + /* Make sure connection-intensive things like the redis benckmark + * will be able to close/open sockets a zillion of times */ + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { + anetSetError(err, "setsockopt SO_REUSEADDR: %s\n", strerror(errno)); + return ANET_ERR; + } + return s; +} + #define ANET_CONNECT_NONE 0 #define ANET_CONNECT_NONBLOCK 1 static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) { - int s, on = 1; + int s; struct sockaddr_in sa; - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - anetSetError(err, "creating socket: %s\n", strerror(errno)); + if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) return ANET_ERR; - } - /* Make sure connection-intensive things like the redis benckmark - * will be able to close/open sockets a zillion of times */ - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); sa.sin_family = AF_INET; sa.sin_port = htons(port); @@ -177,6 +189,42 @@ int anetTcpNonBlockConnect(char *err, char *addr, int port) return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK); } +int anetUnixGenericConnect(char *err, char *path, int flags) +{ + int s; + struct sockaddr_un sa; + + if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR) + return ANET_ERR; + + sa.sun_family = AF_LOCAL; + strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); + if (flags & ANET_CONNECT_NONBLOCK) { + if (anetNonBlock(err,s) != ANET_OK) + return ANET_ERR; + } + if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) { + if (errno == EINPROGRESS && + flags & ANET_CONNECT_NONBLOCK) + return s; + + anetSetError(err, "connect: %s\n", strerror(errno)); + close(s); + return ANET_ERR; + } + return s; +} + +int anetUnixConnect(char *err, char *path) +{ + return anetUnixGenericConnect(err,path,ANET_CONNECT_NONE); +} + +int anetUnixNonBlockConnect(char *err, char *path) +{ + return anetUnixGenericConnect(err,path,ANET_CONNECT_NONBLOCK); +} + /* Like read(2) but make sure 'count' is read before to return * (unless error or EOF condition is encountered) */ int anetRead(int fd, char *buf, int count) @@ -207,32 +255,8 @@ int anetWrite(int fd, char *buf, int count) return totlen; } -int anetTcpServer(char *err, int port, char *bindaddr) -{ - int s, on = 1; - struct sockaddr_in sa; - - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - anetSetError(err, "socket: %s\n", strerror(errno)); - return ANET_ERR; - } - if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { - anetSetError(err, "setsockopt SO_REUSEADDR: %s\n", strerror(errno)); - close(s); - return ANET_ERR; - } - memset(&sa,0,sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = htonl(INADDR_ANY); - if (bindaddr) { - if (inet_aton(bindaddr, &sa.sin_addr) == 0) { - anetSetError(err, "Invalid bind address\n"); - close(s); - return ANET_ERR; - } - } - if (bind(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) { +static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) { + if (bind(s,sa,len) == -1) { anetSetError(err, "bind: %s\n", strerror(errno)); close(s); return ANET_ERR; @@ -242,18 +266,51 @@ int anetTcpServer(char *err, int port, char *bindaddr) close(s); return ANET_ERR; } + return ANET_OK; +} + +int anetTcpServer(char *err, int port, char *bindaddr) +{ + int s; + struct sockaddr_in sa; + + if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) + return ANET_ERR; + + memset(&sa,0,sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = htonl(INADDR_ANY); + if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) { + anetSetError(err, "Invalid bind address\n"); + close(s); + return ANET_ERR; + } + if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) + return ANET_ERR; return s; } -int anetAccept(char *err, int serversock, char *ip, int *port) +int anetUnixServer(char *err, char *path) { - int fd; - struct sockaddr_in sa; - unsigned int saLen; + int s; + struct sockaddr_un sa; + if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR) + return ANET_ERR; + + memset(&sa,0,sizeof(sa)); + sa.sun_family = AF_LOCAL; + strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); + if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) + return ANET_ERR; + return s; +} + +static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { + int fd; while(1) { - saLen = sizeof(sa); - fd = accept(serversock, (struct sockaddr*)&sa, &saLen); + fd = accept(s,sa,len); if (fd == -1) { if (errno == EINTR) continue; @@ -264,7 +321,27 @@ int anetAccept(char *err, int serversock, char *ip, int *port) } break; } + return fd; +} + +int anetTcpAccept(char *err, int s, char *ip, int *port) { + int fd; + struct sockaddr_in sa; + socklen_t salen = sizeof(sa); + if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) + return ANET_ERR; + if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); if (port) *port = ntohs(sa.sin_port); return fd; } + +int anetUnixAccept(char *err, int s) { + int fd; + struct sockaddr_un sa; + socklen_t salen = sizeof(sa); + if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) + return ANET_ERR; + + return fd; +} diff --git a/src/anet.h b/src/anet.h index ce0f4778..bef0adcf 100644 --- a/src/anet.h +++ b/src/anet.h @@ -37,10 +37,14 @@ int anetTcpConnect(char *err, char *addr, int port); int anetTcpNonBlockConnect(char *err, char *addr, int port); +int anetUnixConnect(char *err, char *path); +int anetUnixNonBlockConnect(char *err, char *path); int anetRead(int fd, char *buf, int count); int anetResolve(char *err, char *host, char *ipbuf); int anetTcpServer(char *err, int port, char *bindaddr); -int anetAccept(char *err, int serversock, char *ip, int *port); +int anetUnixServer(char *err, char *path); +int anetTcpAccept(char *err, int serversock, char *ip, int *port); +int anetUnixAccept(char *err, int serversock); int anetWrite(int fd, char *buf, int count); int anetNonBlock(char *err, int fd); int anetTcpNoDelay(char *err, int fd); diff --git a/src/aof.c b/src/aof.c index 4dbce394..2396ba2c 100644 --- a/src/aof.c +++ b/src/aof.c @@ -549,7 +549,8 @@ int rewriteAppendOnlyFileBackground(void) { char tmpfile[256]; if (server.vm_enabled) vmReopenSwapFile(); - close(server.fd); + if (server.ipfd > 0) close(server.ipfd); + if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { _exit(0); diff --git a/src/config.c b/src/config.c index db58a236..bbe9d402 100644 --- a/src/config.c +++ b/src/config.c @@ -71,6 +71,8 @@ void loadServerConfig(char *filename) { } } else if (!strcasecmp(argv[0],"bind") && argc == 2) { server.bindaddr = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"unixsocket") && argc == 2) { + server.unixsocket = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"save") && argc == 3) { int seconds = atoi(argv[1]); int changes = atoi(argv[2]); diff --git a/src/networking.c b/src/networking.c index 6181799a..d2eb2543 100644 --- a/src/networking.c +++ b/src/networking.c @@ -339,23 +339,11 @@ void addReplyBulkCString(redisClient *c, char *s) { } } -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; +static void acceptCommonHandler(int fd) { redisClient *c; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); - - cfd = anetAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); - if ((c = createClient(cfd)) == NULL) { + if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); - close(cfd); /* May be already closed, just ingore errors */ + close(fd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the @@ -375,6 +363,38 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { server.stat_numconnections++; } +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetTcpAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(cfd); +} + +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetUnixAccept(server.neterr, fd); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(cfd); +} + + static void freeClientArgv(redisClient *c) { int j; for (j = 0; j < c->argc; j++) diff --git a/src/rdb.c b/src/rdb.c index a401a5b9..589b536a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -461,7 +461,8 @@ int rdbSaveBackground(char *filename) { if ((childpid = fork()) == 0) { /* Child */ if (server.vm_enabled) vmReopenSwapFile(); - close(server.fd); + if (server.ipfd > 0) close(server.ipfd); + if (server.sofd > 0) close(server.sofd); if (rdbSave(filename) == REDIS_OK) { _exit(0); } else { diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index c5ababf2..dcc13286 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -71,6 +71,7 @@ static struct config { aeEventLoop *el; char *hostip; int hostport; + char *hostsocket; int keepalive; long long start; long long totlatency; @@ -359,7 +360,11 @@ static client createClient(void) { client c = zmalloc(sizeof(struct _client)); char err[ANET_ERR_LEN]; - c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) + c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + else + c->fd = anetUnixNonBlockConnect(err,config.hostsocket); + if (c->fd == ANET_ERR) { zfree(c); fprintf(stderr,"Connect: %s\n",err); @@ -455,6 +460,9 @@ void parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-d") && !lastarg) { config.datasize = atoi(argv[i+1]); i++; @@ -478,7 +486,8 @@ void parseOptions(int argc, char **argv) { printf("Wrong option '%s' or option argument missing\n\n",argv[i]); printf("Usage: redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]\n\n"); printf(" -h Server hostname (default 127.0.0.1)\n"); - printf(" -p Server port (default 6379)\n"); + printf(" -p Server port (default 6379)\n"); + printf(" -s Server socket (overrides host and port)\n"); printf(" -c Number of parallel connections (default 50)\n"); printf(" -n Total number of requests (default 10000)\n"); printf(" -d Data size of SET/GET value in bytes (default 2)\n"); @@ -537,6 +546,7 @@ int main(int argc, char **argv) { config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; parseOptions(argc,argv); diff --git a/src/redis-cli.c b/src/redis-cli.c index bc405c91..2aad25b3 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -51,6 +51,7 @@ static struct config { char *hostip; int hostport; + char *hostsocket; long repeat; int dbnum; int interactive; @@ -119,9 +120,17 @@ static int cliConnect(int force) { if (fd == ANET_ERR || force) { if (force) close(fd); - fd = anetTcpConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) { + fd = anetTcpConnect(err,config.hostip,config.hostport); + } else { + fd = anetUnixConnect(err,config.hostsocket); + } if (fd == ANET_ERR) { - fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err); + fprintf(stderr,"Could not connect to Redis at "); + if (config.hostsocket == NULL) + fprintf(stderr,"%s:%d: %s",config.hostip,config.hostport,err); + else + fprintf(stderr,"%s: %s",config.hostsocket,err); return -1; } anetTcpNoDelay(NULL,fd); @@ -362,6 +371,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-r") && !lastarg) { config.repeat = strtoll(argv[i+1],NULL,10); i++; @@ -410,7 +422,7 @@ static sds readArgFromStdin(void) { } static void usage() { - fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); + fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); fprintf(stderr, "usage: echo \"argN\" | redis-cli -x [options] cmd arg1 arg2 ... arg(N-1)\n\n"); fprintf(stderr, "example: cat /etc/passwd | redis-cli -x set my_passwd\n"); fprintf(stderr, "example: redis-cli get my_passwd\n"); @@ -497,6 +509,7 @@ int main(int argc, char **argv) { config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; config.repeat = 1; config.dbnum = 0; config.interactive = 0; diff --git a/src/redis.c b/src/redis.c index 2cbfc689..f65901c7 100644 --- a/src/redis.c +++ b/src/redis.c @@ -714,13 +714,16 @@ void createSharedObjects(void) { } void initServerConfig() { - server.dbnum = REDIS_DEFAULT_DBNUM; server.port = REDIS_SERVERPORT; + server.bindaddr = NULL; + server.unixsocket = NULL; + server.ipfd = -1; + server.sofd = -1; + server.dbnum = REDIS_DEFAULT_DBNUM; server.verbosity = REDIS_VERBOSE; server.maxidletime = REDIS_MAXIDLETIME; server.saveparams = NULL; server.logfile = NULL; /* NULL = log on standard output */ - server.bindaddr = NULL; server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; @@ -795,9 +798,21 @@ void initServer() { createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); - server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); - if (server.fd == -1) { - redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); + server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); + if (server.ipfd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening port: %s", server.neterr); + exit(1); + } + if (server.unixsocket != NULL) { + unlink(server.unixsocket); /* don't care if this fails */ + server.sofd = anetUnixServer(server.neterr,server.unixsocket); + if (server.sofd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); + exit(1); + } + } + if (server.ipfd < 0 && server.sofd < 0) { + redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } for (j = 0; j < server.dbnum; j++) { @@ -828,8 +843,10 @@ void initServer() { server.stat_keyspace_hits = 0; server.unixtime = time(NULL); aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); - if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, - acceptHandler, NULL) == AE_ERR) oom("creating file event"); + if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, + acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); + if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, + acceptUnixHandler,NULL) == AE_ERR) oom("creating file event"); if (server.appendonly) { server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); @@ -1453,7 +1470,10 @@ int main(int argc, char **argv) { if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start); } - redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.ipfd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.sofd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); diff --git a/src/redis.h b/src/redis.h index cf21447d..6fa3e49f 100644 --- a/src/redis.h +++ b/src/redis.h @@ -352,7 +352,10 @@ struct sharedObjectsStruct { struct redisServer { pthread_t mainthread; int port; - int fd; + char *bindaddr; + char *unixsocket; + int ipfd; + int sofd; redisDb *db; long long dirty; /* changes to DB from the last save */ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ @@ -391,7 +394,6 @@ struct redisServer { struct saveparam *saveparams; int saveparamslen; char *logfile; - char *bindaddr; char *dbfilename; char *appendfilename; char *requirepass; @@ -617,7 +619,8 @@ void *addDeferredMultiBulkLength(redisClient *c); void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReplyBulk(redisClient *c, robj *obj); void addReplyBulkCString(redisClient *c, char *s);